From 2e6c28dabc432237beeaa73a1639ae3909d46e29 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Mon, 19 Mar 2018 15:18:00 -0700 Subject: [PATCH] bugfix: when setting up new cluster, there should be a way do replicate existing domain --- .../cassandraMetadataPersistence.go | 5 +- .../cassandraMetadataPersistence_test.go | 19 ++++-- common/persistence/dataInterfaces.go | 1 + .../worker/domainReplicationTaskHandler.go | 31 ++------- .../domainReplicationTaskHandler_test.go | 63 +++++++++++++++++++ 5 files changed, 88 insertions(+), 31 deletions(-) diff --git a/common/persistence/cassandraMetadataPersistence.go b/common/persistence/cassandraMetadataPersistence.go index c8fc72ad9c5..eee10ce73fb 100644 --- a/common/persistence/cassandraMetadataPersistence.go +++ b/common/persistence/cassandraMetadataPersistence.go @@ -54,8 +54,8 @@ const ( `VALUES(?, {name: ?}) IF NOT EXISTS` templateCreateDomainByNameQuery = `INSERT INTO domains_by_name (` + - `name, domain, config, replication_config, is_global_domain, failover_version) ` + - `VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?) IF NOT EXISTS` + `name, domain, config, replication_config, is_global_domain, config_version, failover_version) ` + + `VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?) IF NOT EXISTS` templateGetDomainQuery = `SELECT domain.name ` + `FROM domains ` + @@ -149,6 +149,7 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest request.ReplicationConfig.ActiveClusterName, serializeClusterConfigs(request.ReplicationConfig.Clusters), request.IsGlobalDomain, + request.ConfigVersion, request.FailoverVersion, ) diff --git a/common/persistence/cassandraMetadataPersistence_test.go b/common/persistence/cassandraMetadataPersistence_test.go index 86fdc567c7f..64a81b75f39 100644 --- a/common/persistence/cassandraMetadataPersistence_test.go +++ b/common/persistence/cassandraMetadataPersistence_test.go @@ -73,6 +73,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() { retention := int32(10) emitMetric := true isGlobalDomain := false + configVersion := int64(0) failoverVersion := int64(0) resp0, err0 := m.CreateDomain( @@ -89,6 +90,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() { }, &DomainReplicationConfig{}, isGlobalDomain, + configVersion, failoverVersion, ) @@ -111,7 +113,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() { m.Equal(testCurrentClusterName, resp1.ReplicationConfig.ActiveClusterName) m.Equal(1, len(resp1.ReplicationConfig.Clusters)) m.Equal(isGlobalDomain, resp1.IsGlobalDomain) - m.Equal(int64(0), resp1.ConfigVersion) + m.Equal(configVersion, resp1.ConfigVersion) m.Equal(failoverVersion, resp1.FailoverVersion) m.True(resp1.ReplicationConfig.Clusters[0].ClusterName == testCurrentClusterName) m.Equal(int64(0), resp1.DBVersion) @@ -130,6 +132,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() { }, &DomainReplicationConfig{}, isGlobalDomain, + configVersion, failoverVersion, ) m.NotNil(err2) @@ -148,6 +151,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() { clusterActive := "some random active cluster name" clusterStandby := "some random standby cluster name" + configVersion := int64(11) failoverVersion := int64(59) isGlobalDomain := true clusters := []*ClusterReplicationConfig{ @@ -181,6 +185,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() { Clusters: clusters, }, isGlobalDomain, + configVersion, failoverVersion, ) m.Nil(err1) @@ -203,7 +208,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() { m.Equal(clusters[index], resp2.ReplicationConfig.Clusters[index]) } m.Equal(isGlobalDomain, resp2.IsGlobalDomain) - m.Equal(int64(0), resp2.ConfigVersion) + m.Equal(configVersion, resp2.ConfigVersion) m.Equal(failoverVersion, resp2.FailoverVersion) m.Equal(int64(0), resp2.DBVersion) @@ -223,7 +228,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() { m.Equal(clusters[index], resp3.ReplicationConfig.Clusters[index]) } m.Equal(isGlobalDomain, resp2.IsGlobalDomain) - m.Equal(int64(0), resp2.ConfigVersion) + m.Equal(configVersion, resp2.ConfigVersion) m.Equal(failoverVersion, resp3.FailoverVersion) m.Equal(int64(0), resp3.DBVersion) @@ -244,6 +249,7 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() { clusterActive := "some random active cluster name" clusterStandby := "some random standby cluster name" + configVersion := int64(10) failoverVersion := int64(59) isGlobalDomain := true clusters := []*ClusterReplicationConfig{ @@ -272,6 +278,7 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() { Clusters: clusters, }, isGlobalDomain, + configVersion, failoverVersion, ) m.Nil(err1) @@ -371,6 +378,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() { clusterActive := "some random active cluster name" clusterStandby := "some random standby cluster name" + configVersion := int64(10) failoverVersion := int64(59) isGlobalDomain := true clusters := []*ClusterReplicationConfig{ @@ -399,6 +407,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() { Clusters: clusters, }, isGlobalDomain, + configVersion, failoverVersion, ) m.Nil(err1) @@ -438,6 +447,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() { Clusters: clusters, }, isGlobalDomain, + configVersion, failoverVersion, ) m.Nil(err6) @@ -458,12 +468,13 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() { } func (m *metadataPersistenceSuite) CreateDomain(info *DomainInfo, config *DomainConfig, - replicationConfig *DomainReplicationConfig, isGlobaldomain bool, failoverVersion int64) (*CreateDomainResponse, error) { + replicationConfig *DomainReplicationConfig, isGlobaldomain bool, configVersion int64, failoverVersion int64) (*CreateDomainResponse, error) { return m.MetadataManager.CreateDomain(&CreateDomainRequest{ Info: info, Config: config, ReplicationConfig: replicationConfig, IsGlobalDomain: isGlobaldomain, + ConfigVersion: configVersion, FailoverVersion: failoverVersion, }) } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 8a66e3c24aa..becc578e630 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -698,6 +698,7 @@ type ( Config *DomainConfig ReplicationConfig *DomainReplicationConfig IsGlobalDomain bool + ConfigVersion int64 FailoverVersion int64 } diff --git a/service/worker/domainReplicationTaskHandler.go b/service/worker/domainReplicationTaskHandler.go index 30a7b0448a6..f1e4dd11c6b 100644 --- a/service/worker/domainReplicationTaskHandler.go +++ b/service/worker/domainReplicationTaskHandler.go @@ -26,7 +26,6 @@ import ( "github.com/uber-common/bark" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" ) @@ -109,6 +108,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas Clusters: domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters), }, IsGlobalDomain: true, // local domain will not be replicated + ConfigVersion: task.GetConfigVersion(), FailoverVersion: task.GetFailoverVersion(), } @@ -131,6 +131,11 @@ func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask( Name: task.Info.GetName(), }) if err != nil { + if _, ok := err.(*shared.EntityNotExistsError); ok { + // this can happen if the create domain replication task is to processed. + // e.g. new cluster being lanuched + return domainReplicator.handleDomainCreationReplicationTask(task) + } return err } @@ -206,16 +211,6 @@ func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigFro return output } -func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift( - input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration { - output := []*shared.ClusterReplicationConfiguration{} - for _, cluster := range input { - clusterName := common.StringPtr(cluster.ClusterName) - output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName}) - } - return output -} - func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(input *shared.DomainStatus) (int, error) { if input == nil { return 0, ErrInvalidDomainStatus @@ -230,17 +225,3 @@ func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(inpu return 0, ErrInvalidDomainStatus } } - -func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) { - switch input { - case persistence.DomainStatusRegistered: - output := shared.DomainStatusRegistered - return &output, nil - case persistence.DomainStatusDeprecated: - output := shared.DomainStatusDeprecated - return &output, nil - default: - return nil, ErrInvalidDomainStatus - } - -} diff --git a/service/worker/domainReplicationTaskHandler_test.go b/service/worker/domainReplicationTaskHandler_test.go index f441cb727cf..caa78285c4a 100644 --- a/service/worker/domainReplicationTaskHandler_test.go +++ b/service/worker/domainReplicationTaskHandler_test.go @@ -133,6 +133,69 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() { s.Equal(int64(0), resp.DBVersion) } +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_DomainNotExist() { + operation := replicator.DomainOperationUpdate + id := uuid.New() + name := "some random domain test name" + status := shared.DomainStatusRegistered + description := "some random test description" + ownerEmail := "some random test owner" + retention := int32(10) + emitMetric := true + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(12) + failoverVersion := int64(59) + clusters := []*shared.ClusterReplicationConfiguration{ + &shared.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterActive), + }, + &shared.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterStandby), + }, + } + + updateTask := &replicator.DomainTaskAttributes{ + DomainOperation: &operation, + ID: common.StringPtr(id), + Info: &shared.DomainInfo{ + Name: common.StringPtr(name), + Status: &status, + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(ownerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(clusterActive), + Clusters: clusters, + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + } + + err := s.domainReplicator.HandleReceivingTask(updateTask) + s.Nil(err) + + resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) + s.Nil(err) + s.NotNil(resp) + s.Equal(id, resp.Info.ID) + s.Equal(name, resp.Info.Name) + s.Equal(persistence.DomainStatusRegistered, resp.Info.Status) + s.Equal(description, resp.Info.Description) + s.Equal(ownerEmail, resp.Info.OwnerEmail) + s.Equal(retention, resp.Config.Retention) + s.Equal(emitMetric, resp.Config.EmitMetric) + s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) + s.Equal(configVersion, resp.ConfigVersion) + s.Equal(failoverVersion, resp.FailoverVersion) + s.Equal(int64(0), resp.DBVersion) +} + func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_UpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New()