From 612e1a611f73a070268d88452f7f20bfa3d1e438 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Wed, 28 Feb 2018 16:42:48 -0800 Subject: [PATCH] wire replicator transmission to domain APIs --- cmd/server/cadence.go | 2 +- common/mocks/KafkaProducer.go | 76 ++++++++ common/persistence/dataInterfaces.go | 2 +- common/service/service.go | 7 + common/service/serviceinterfaces.go | 4 + host/onebox.go | 8 +- service/frontend/handler.go | 36 +++- service/frontend/service.go | 7 +- service/history/historyEngine.go | 2 +- service/history/historyEngine_test.go | 6 +- .../worker/domainReplicationTaskHandler.go | 83 +++++++- .../domainReplicationTaskHandler_test.go | 177 ++++++++++++++++-- service/worker/processor.go | 8 +- 13 files changed, 375 insertions(+), 43 deletions(-) create mode 100644 common/mocks/KafkaProducer.go diff --git a/cmd/server/cadence.go b/cmd/server/cadence.go index 24d60847638..0ee87d2c667 100644 --- a/cmd/server/cadence.go +++ b/cmd/server/cadence.go @@ -61,7 +61,7 @@ func startHandler(c *cli.Context) { log.Fatal("Unable to get current directory") } if err := cassandra.VerifyCompatibleVersion(cassCfg, dir); err != nil { - log.Fatalf("Incompatible versions", err) + log.Fatal("Incompatible versions", err) } services := getServices(c) diff --git a/common/mocks/KafkaProducer.go b/common/mocks/KafkaProducer.go new file mode 100644 index 00000000000..e485be2e264 --- /dev/null +++ b/common/mocks/KafkaProducer.go @@ -0,0 +1,76 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/common/messaging" +) + +// KafkaProducer is an autogenerated mock type for the KafkaProducer type +type KafkaProducer struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *KafkaProducer) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Publish provides a mock function with given fields: msg +func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error { + ret := _m.Called(msg) + + var r0 error + if rf, ok := ret.Get(0).(func(*replicator.ReplicationTask) error); ok { + r0 = rf(msg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PublishBatch provides a mock function with given fields: msgs +func (_m *KafkaProducer) PublishBatch(msgs []*replicator.ReplicationTask) error { + ret := _m.Called(msgs) + + var r0 error + if rf, ok := ret.Get(0).(func([]*replicator.ReplicationTask) error); ok { + r0 = rf(msgs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +var _ messaging.Producer = (*KafkaProducer)(nil) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index b7f7fbbfffe..68e96257350 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1010,7 +1010,7 @@ func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *History } func (b *HistoryEventBatch) String() string { - return fmt.Sprint("[version:%v, events:%v]", b.Version, b.Events) + return fmt.Sprintf("[version:%v, events:%v]", b.Version, b.Events) } // NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch diff --git a/common/service/service.go b/common/service/service.go index 60338b8da40..e148c3f2b44 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -89,6 +89,7 @@ type ( runtimeMetricsReporter *metrics.RuntimeMetricsReporter metricsClient metrics.Client clusterMetadata cluster.Metadata + messagingClient messaging.Client dynamicCollection *dynamicconfig.Collection } ) @@ -105,6 +106,7 @@ func New(params *BootstrapParams) Service { metricsScope: params.MetricScope, numberOfHistoryShards: params.CassandraConfig.NumHistoryShards, clusterMetadata: params.ClusterMetadata, + messagingClient: params.MessagingClient, dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig), } sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger) @@ -227,6 +229,11 @@ func (h *serviceImpl) GetClusterMetadata() cluster.Metadata { return h.clusterMetadata } +// GetMessagingClient returns the messaging client against Kafka +func (h *serviceImpl) GetMessagingClient() messaging.Client { + return h.messagingClient +} + func getMetricsServiceIdx(serviceName string, logger bark.Logger) metrics.ServiceIdx { switch serviceName { case common.FrontendServiceName: diff --git a/common/service/serviceinterfaces.go b/common/service/serviceinterfaces.go index 237f8a1b9ce..8a17f5d0ce4 100644 --- a/common/service/serviceinterfaces.go +++ b/common/service/serviceinterfaces.go @@ -25,6 +25,7 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "go.uber.org/yarpc" ) @@ -55,5 +56,8 @@ type ( // GetClusterMetadata returns the service cluster metadata GetClusterMetadata() cluster.Metadata + + // GetMessagingClient returns the messaging client against Kafka + GetMessagingClient() messaging.Client } ) diff --git a/host/onebox.go b/host/onebox.go index 33322c6cb3d..02f2717f8a4 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -28,12 +28,14 @@ import ( "errors" + "github.com/stretchr/testify/mock" "github.com/uber-common/bark" "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" fecli "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/config" @@ -197,9 +199,13 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards params.CassandraConfig.Hosts = "127.0.0.1" + // TODO when cross DC is public, remove this temporary override + kafkaProducer := &mocks.KafkaProducer{} + kafkaProducer.On("Publish", mock.Anything).Return(nil) + c.frontEndService = service.New(params) c.frontendHandler = frontend.NewWorkflowHandler( - c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr) + c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr, kafkaProducer) err := c.frontendHandler.Start() if err != nil { c.logger.WithField("error", err).Fatal("Failed to start frontend") diff --git a/service/frontend/handler.go b/service/frontend/handler.go index d8e03c25165..f64ea03d9d3 100644 --- a/service/frontend/handler.go +++ b/service/frontend/handler.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/uber/cadence/common/messaging" + "github.com/pborman/uuid" "github.com/uber-common/bark" "github.com/uber-go/tally" @@ -35,6 +37,7 @@ import ( "github.com/uber/cadence/.gen/go/health/metaserver" h "github.com/uber/cadence/.gen/go/history" m "github.com/uber/cadence/.gen/go/matching" + "github.com/uber/cadence/.gen/go/replicator" gen "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" @@ -45,6 +48,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/worker" "go.uber.org/yarpc/yarpcerrors" ) @@ -65,6 +69,8 @@ type ( startWG sync.WaitGroup rateLimiter common.TokenBucket config *Config + kafkaProducer messaging.Producer + DomainReplicator worker.DomainReplicator service.Service } @@ -106,7 +112,8 @@ var ( // NewWorkflowHandler creates a thrift handler for the cadence service func NewWorkflowHandler( sVice service.Service, config *Config, metadataMgr persistence.MetadataManager, - historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager) *WorkflowHandler { + historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager, + kafkaProducer messaging.Producer) *WorkflowHandler { handler := &WorkflowHandler{ Service: sVice, config: config, @@ -117,6 +124,8 @@ func NewWorkflowHandler( hSerializerFactory: persistence.NewHistorySerializerFactory(), domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()), rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()), + kafkaProducer: kafkaProducer, + DomainReplicator: worker.NewDomainReplicator(metadataMgr, sVice.GetLogger()), } // prevent us from trying to serve requests before handler's Start() is complete handler.startWG.Add(1) @@ -214,7 +223,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * return wh.error(err, scope) } - response, err := wh.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ + domainRequest := &persistence.CreateDomainRequest{ Info: &persistence.DomainInfo{ ID: uuid.New(), Name: registerRequest.GetName(), @@ -232,14 +241,25 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * }, IsGlobalDomain: clusterMetadata.IsGlobalDomainEnabled(), FailoverVersion: 0, // TODO do something? - }) + } + domainResponse, err := wh.metadataMgr.CreateDomain(domainRequest) if err != nil { return wh.error(err, scope) } + // TODO remove the IsGlobalDomainEnabled check once cross DC is public + if clusterMetadata.IsGlobalDomainEnabled() { + err = wh.DomainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationCreate, + domainRequest.Info, domainRequest.Config, domainRequest.ReplicationConfig, 0, domainRequest.FailoverVersion) + if err != nil { + return wh.error(err, scope) + } + } + // TODO: Log through logging framework. We need to have good auditing of domain CRUD - wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", *registerRequest.Name, response.ID) + wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", registerRequest.GetName(), domainResponse.ID) + return nil } @@ -423,6 +443,14 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context, if err != nil { return nil, wh.error(err, scope) } + // TODO remove the IsGlobalDomainEnabled check once cross DC is public + if clusterMetadata.IsGlobalDomainEnabled() { + err = wh.DomainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationUpdate, + info, config, replicationConfig, configVersion, failoverVersion) + if err != nil { + return nil, wh.error(err, scope) + } + } } else if clusterMetadata.IsGlobalDomainEnabled() && !clusterMetadata.IsMasterCluster() { // although there is no attr updated, just prevent customer to use the non master cluster // for update domain, ever (except if customer want to do a domain failover) diff --git a/service/frontend/service.go b/service/frontend/service.go index b8a122198bd..6fef2a8d377 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -114,7 +114,12 @@ func (s *Service) Start() { history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient()) - handler := NewWorkflowHandler(base, s.config, metadata, history, visibility) + kafkaProducer, err := base.GetMessagingClient().NewProducer(base.GetClusterMetadata().GetCurrentClusterName()) + if err != nil { + log.Fatalf("Creating kafka producer failed: %v", err) + } + + handler := NewWorkflowHandler(base, s.config, metadata, history, visibility, kafkaProducer) handler.Start() log.Infof("%v started", common.FrontendServiceName) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index b25f5a54fb2..8ec025203a4 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2121,7 +2121,7 @@ func getScheduleID(activityID string, msBuilder *mutableStateBuilder) (int64, er } scheduleID, ok := msBuilder.GetScheduleIDByActivityID(activityID) if !ok { - return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %d\n", activityID)} + return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %s\n", activityID)} } return scheduleID, nil } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 25729f2f77b..94610f65b1e 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1325,7 +1325,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfNoAidFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() { @@ -1836,7 +1836,7 @@ func (s *engineSuite) TestRespondActivityTaskFailededIfNoAIdFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { @@ -2548,7 +2548,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledIfNoAidFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_NotScheduled() { diff --git a/service/worker/domainReplicationTaskHandler.go b/service/worker/domainReplicationTaskHandler.go index 57ee276cd8c..52328e08973 100644 --- a/service/worker/domainReplicationTaskHandler.go +++ b/service/worker/domainReplicationTaskHandler.go @@ -26,6 +26,8 @@ import ( "github.com/uber-common/bark" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/persistence" ) @@ -65,8 +67,45 @@ func NewDomainReplicator(metadataManager persistence.MetadataManager, logger bar } } -// handleDomainReplicationTask handle the domain replication task -func (domainReplicator *domainReplicatorImpl) HandleReceiveTask(task *replicator.DomainTaskAttributes) error { +// HandleTransmissionTask handle transmission of the domain replication task +func (domainReplicator *domainReplicatorImpl) HandleTransmissionTask(kafka messaging.Producer, domainOperation replicator.DomainOperation, + info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig, + configVersion int64, failoverVersion int64) error { + status, err := domainReplicator.convertDomainStatusToThrift(info.Status) + if err != nil { + return err + } + + taskType := replicator.ReplicationTaskTypeDomain + task := &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(info.ID), + Info: &shared.DomainInfo{ + Name: common.StringPtr(info.Name), + Status: status, + Description: common.StringPtr(info.Description), + OwnerEmail: common.StringPtr(info.OwnerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(config.Retention), + EmitMetric: common.BoolPtr(config.EmitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(replicationConfig.ActiveClusterName), + Clusters: domainReplicator.convertClusterReplicationConfigToThrift(replicationConfig.Clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + } + + return kafka.Publish(&replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: task, + }) +} + +// HandleReceiveTask handle receiving of the domain replication task +func (domainReplicator *domainReplicatorImpl) HandleReceivingTask(task *replicator.DomainTaskAttributes) error { if err := domainReplicator.validateDomainReplicationTask(task); err != nil { return err } @@ -84,7 +123,7 @@ func (domainReplicator *domainReplicatorImpl) HandleReceiveTask(task *replicator // handleDomainCreationReplicationTask handle the domain creation replication task func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTask(task *replicator.DomainTaskAttributes) error { // task already validated - status, err := domainReplicator.convertDomainStatus(task.Info.Status) + status, err := domainReplicator.convertDomainStatusFromThrift(task.Info.Status) if err != nil { return err } @@ -103,7 +142,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas }, ReplicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: task.ReplicationConfig.GetActiveClusterName(), - Clusters: domainReplicator.convertClusterReplicationConfig(task.ReplicationConfig.Clusters), + Clusters: domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters), }, IsGlobalDomain: true, // local domain will not be replicated FailoverVersion: task.GetFailoverVersion(), @@ -116,7 +155,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas // handleDomainUpdateReplicationTask handle the domain update replication task func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask(task *replicator.DomainTaskAttributes) error { // task already validated - status, err := domainReplicator.convertDomainStatus(task.Info.Status) + status, err := domainReplicator.convertDomainStatusFromThrift(task.Info.Status) if err != nil { return err } @@ -154,7 +193,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask( Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(), EmitMetric: task.Config.GetEmitMetric(), } - request.ReplicationConfig.Clusters = domainReplicator.convertClusterReplicationConfig(task.ReplicationConfig.Clusters) + request.ReplicationConfig.Clusters = domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters) request.ConfigVersion = task.GetConfigVersion() } if resp.FailoverVersion < task.GetFailoverVersion() { @@ -193,7 +232,7 @@ func (domainReplicator *domainReplicatorImpl) validateDomainReplicationTask(task return nil } -func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfig( +func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigFromThrift( input []*shared.ClusterReplicationConfiguration) []*persistence.ClusterReplicationConfig { output := []*persistence.ClusterReplicationConfig{} for _, cluster := range input { @@ -203,12 +242,22 @@ func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfig( return output } -func (domainReplicator *domainReplicatorImpl) convertDomainStatus(status *shared.DomainStatus) (int, error) { - if status == nil { +func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift( + input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration { + output := []*shared.ClusterReplicationConfiguration{} + for _, cluster := range input { + clusterName := common.StringPtr(cluster.ClusterName) + output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName}) + } + return output +} + +func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(input *shared.DomainStatus) (int, error) { + if input == nil { return 0, ErrInvalidDomainStatus } - switch *status { + switch *input { case shared.DomainStatusRegistered: return persistence.DomainStatusRegistered, nil case shared.DomainStatusDeprecated: @@ -217,3 +266,17 @@ func (domainReplicator *domainReplicatorImpl) convertDomainStatus(status *shared return 0, ErrInvalidDomainStatus } } + +func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) { + switch input { + case persistence.DomainStatusRegistered: + output := shared.DomainStatusRegistered + return &output, nil + case persistence.DomainStatusDeprecated: + output := shared.DomainStatusDeprecated + return &output, nil + default: + return nil, ErrInvalidDomainStatus + } + +} diff --git a/service/worker/domainReplicationTaskHandler_test.go b/service/worker/domainReplicationTaskHandler_test.go index 1a3e817d65e..b316e3a6040 100644 --- a/service/worker/domainReplicationTaskHandler_test.go +++ b/service/worker/domainReplicationTaskHandler_test.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" ) @@ -40,6 +41,7 @@ type ( suite.Suite persistence.TestBase domainReplicator *domainReplicatorImpl + kafkaProducer *mocks.KafkaProducer } ) @@ -64,13 +66,150 @@ func (s *domainReplicatorSuite) SetupTest() { s.MetadataManager, bark.NewLoggerFromLogrus(logrus.New()), ).(*domainReplicatorImpl) + s.kafkaProducer = &mocks.KafkaProducer{} } func (s *domainReplicatorSuite) TearDownTest() { s.TearDownWorkflowStore() } -func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { +func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask() { + taskType := replicator.ReplicationTaskTypeDomain + id := uuid.New() + name := "some random domain test name" + status := shared.DomainStatusRegistered + description := "some random test description" + ownerEmail := "some random test owner" + retention := int32(10) + emitMetric := true + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(0) + failoverVersion := int64(59) + clusters := []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ + ClusterName: clusterActive, + }, + &persistence.ClusterReplicationConfig{ + ClusterName: clusterStandby, + }, + } + + domainOperation := replicator.DomainOperationCreate + info := &persistence.DomainInfo{ + ID: id, + Name: name, + Status: persistence.DomainStatusRegistered, + Description: description, + OwnerEmail: ownerEmail, + } + config := &persistence.DomainConfig{ + Retention: retention, + EmitMetric: emitMetric, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: clusterActive, + Clusters: clusters, + } + + s.kafkaProducer.On("Publish", &replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(id), + Info: &shared.DomainInfo{ + Name: common.StringPtr(name), + Status: &status, + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(ownerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(clusterActive), + Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + }, + }).Return(nil).Once() + + err := s.domainReplicator.HandleTransmissionTask(s.kafkaProducer, domainOperation, + info, config, replicationConfig, configVersion, failoverVersion) + s.Nil(err) +} + +func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask() { + taskType := replicator.ReplicationTaskTypeDomain + id := uuid.New() + name := "some random domain test name" + status := shared.DomainStatusDeprecated + description := "some random test description" + ownerEmail := "some random test owner" + retention := int32(10) + emitMetric := true + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(0) + failoverVersion := int64(59) + clusters := []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ + ClusterName: clusterActive, + }, + &persistence.ClusterReplicationConfig{ + ClusterName: clusterStandby, + }, + } + + domainOperation := replicator.DomainOperationUpdate + info := &persistence.DomainInfo{ + ID: id, + Name: name, + Status: persistence.DomainStatusDeprecated, + Description: description, + OwnerEmail: ownerEmail, + } + config := &persistence.DomainConfig{ + Retention: retention, + EmitMetric: emitMetric, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: clusterActive, + Clusters: clusters, + } + + s.kafkaProducer.On("Publish", &replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(id), + Info: &shared.DomainInfo{ + Name: common.StringPtr(name), + Status: &status, + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(ownerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(clusterActive), + Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + }, + }).Return(nil).Once() + + err := s.domainReplicator.HandleTransmissionTask(s.kafkaProducer, domainOperation, + info, config, replicationConfig, configVersion, failoverVersion) + s.Nil(err) +} + +func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -113,7 +252,7 @@ func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(task) + err := s.domainReplicator.HandleReceivingTask(task) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{ID: id}) @@ -127,13 +266,13 @@ func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(0), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_UpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_UpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -176,7 +315,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -218,7 +357,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -231,13 +370,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat s.Equal(updateRetention, resp.Config.Retention) s.Equal(updateEmitMetric, resp.Config.EmitMetric) s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(updateClusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters) s.Equal(updateConfigVersion, resp.ConfigVersion) s.Equal(updateFailoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_NoUpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -280,7 +419,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -322,7 +461,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -335,13 +474,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd s.Equal(updateRetention, resp.Config.Retention) s.Equal(updateEmitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(updateClusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters) s.Equal(updateConfigVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_UpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdateConfig_UpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -384,7 +523,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -426,7 +565,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -439,13 +578,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(updateFailoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoUpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdateConfig_NoUpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -488,7 +627,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -530,7 +669,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -543,7 +682,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(0), resp.DBVersion) diff --git a/service/worker/processor.go b/service/worker/processor.go index bfe51a69d10..a21d9793459 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -36,13 +36,17 @@ import ( "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" ) type ( // DomainReplicator is the interface which can replicate the domain DomainReplicator interface { - HandleReceiveTask(task *replicator.DomainTaskAttributes) error + HandleTransmissionTask(producer messaging.Producer, domainOperation replicator.DomainOperation, + info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig, + configVersion int64, failoverVersion int64) error + HandleReceivingTask(task *replicator.DomainTaskAttributes) error } replicationTaskProcessor struct { @@ -176,7 +180,7 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { switch task.GetTaskType() { case replicator.ReplicationTaskTypeDomain: p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes) - p.domainReplicator.HandleReceiveTask(task.DomainTaskAttributes) + err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) case replicator.ReplicationTaskTypeHistory: p.logger.Debugf("Recieved history replication task %v.", task.HistoryTaskAttributes) default: