diff --git a/service/history/historyCache_test.go b/service/history/historyCache_test.go index bf6047bc1df..e042b9f7ff2 100644 --- a/service/history/historyCache_test.go +++ b/service/history/historyCache_test.go @@ -36,9 +36,11 @@ import ( "github.com/uber-go/tally" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/dynamicconfig" ) @@ -48,10 +50,14 @@ type ( // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, // not merely log an error *require.Assertions - logger bark.Logger - mockExecutionMgr *mocks.ExecutionManager - mockShard *shardContextImpl - cache *historyCache + logger bark.Logger + mockExecutionMgr *mocks.ExecutionManager + mockClusterMetadata *mocks.ClusterMetadata + mockProducer *mocks.KafkaProducer + mockMessagingClient messaging.Client + mockService service.Service + mockShard *shardContextImpl + cache *historyCache } ) @@ -75,7 +81,13 @@ func (s *historyCacheSuite) SetupTest() { // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil s.Assertions = require.New(s.T()) s.mockExecutionMgr = &mocks.ExecutionManager{} + s.mockClusterMetadata = &mocks.ClusterMetadata{} + s.mockProducer = &mocks.KafkaProducer{} + s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.logger) s.mockShard = &shardContextImpl{ + service: s.mockService, shardInfo: &persistence.ShardInfo{ShardID: 0, RangeID: 1, TransferAckLevel: 0}, transferSequenceNumber: 1, executionManager: s.mockExecutionMgr, @@ -87,6 +99,8 @@ func (s *historyCacheSuite) SetupTest() { metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), } s.cache = newHistoryCache(s.mockShard, s.logger) + + s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(false) } func (s *historyCacheSuite) TearDownTest() { diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index d073671b0fb..283302e1b71 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -342,7 +342,9 @@ func (e *mutableStateBuilder) GetLastWriteVersion() int64 { } func (e *mutableStateBuilder) updateReplicationStateVersion(version int64) { - e.replicationState.CurrentVersion = version + if version > e.replicationState.CurrentVersion { + e.replicationState.CurrentVersion = version + } } // Assumption: It is expected CurrentVersion on replication state is updated at the start of transaction when diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index 8e171c3fb35..d1cf5bd0f3c 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -443,8 +443,9 @@ func (b *stateBuilder) scheduleDeleteHistoryTimerTask(event *shared.HistoryEvent if _, ok := err.(*shared.EntityNotExistsError); !ok { return nil, err } + } else { + retentionInDays = domainEntry.GetConfig().Retention } - retentionInDays = domainEntry.GetConfig().Retention return b.getTimerBuilder(event).createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24), nil } diff --git a/service/history/transferQueueActiveProcessor_test.go b/service/history/transferQueueActiveProcessor_test.go index 28914d84a48..249320650c4 100644 --- a/service/history/transferQueueActiveProcessor_test.go +++ b/service/history/transferQueueActiveProcessor_test.go @@ -68,6 +68,7 @@ type ( mockTimerQueueProcessor *MockTimerQueueProcessor mockService service.Service + version int64 transferQueueActiveProcessor *transferQueueActiveProcessorImpl } ) @@ -102,6 +103,7 @@ func (s *transferQueueActiveProcessorSuite) SetupTest() { s.mockTransferQueueProcessor = &MockTransferQueueProcessor{} s.mockTimerQueueProcessor = &MockTimerQueueProcessor{} s.mockClusterMetadata = &mocks.ClusterMetadata{} + s.version = int64(4096) // ack manager will use the domain information s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(&persistence.GetDomainResponse{ Info: &persistence.DomainInfo{ID: validDomainID}, @@ -111,7 +113,8 @@ func (s *transferQueueActiveProcessorSuite) SetupTest() { ActiveClusterName: cluster.TestCurrentClusterName, // Clusters attr is not used. }, - TableVersion: persistence.DomainTableVersionV1, + FailoverVersion: s.version, + TableVersion: persistence.DomainTableVersionV1, }, nil) s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName) s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true) @@ -179,8 +182,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessActivityTask_Success() { workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -203,9 +205,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessActivityTask_Success() { activityID := "activity-1" activityType := "some random activity type" event, ai := addActivityTaskScheduledEvent(msBuilder, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, TargetDomainID: targetDomainID, WorkflowID: execution.GetWorkflowId(), @@ -233,8 +236,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessActivityTask_Duplication( workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -259,7 +261,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessActivityTask_Duplication( event, ai := addActivityTaskScheduledEvent(msBuilder, event.GetEventId(), activityID, activityType, taskListName, []byte{}, 1, 1, 1) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, TargetDomainID: targetDomainID, WorkflowID: execution.GetWorkflowId(), @@ -272,7 +274,8 @@ func (s *transferQueueActiveProcessorSuite) TestProcessActivityTask_Duplication( event = addActivityTaskStartedEvent(msBuilder, event.GetEventId(), taskListName, "") ai.StartedID = event.GetEventId() - addActivityTaskCompletedEvent(msBuilder, ai.ScheduleID, ai.StartedID, nil, "") + event = addActivityTaskCompletedEvent(msBuilder, ai.ScheduleID, ai.StartedID, nil, "") + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) @@ -289,8 +292,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_FirstDecisio workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -306,9 +308,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_FirstDecisio taskID := int64(59) di := addDecisionTaskScheduledEvent(msBuilder) + msBuilder.updateReplicationStateLastEventID("", s.version, di.ScheduleID) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -335,8 +338,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_NonFirstDeci workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -358,9 +360,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_NonFirstDeci // make another round of decision taskID := int64(59) di = addDecisionTaskScheduledEvent(msBuilder) + msBuilder.updateReplicationStateLastEventID("", s.version, di.ScheduleID) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -388,8 +391,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_Sticky_NonFi stickyTaskListName := "some random sticky task list" stickyTaskListTimeout := int32(233) - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -414,9 +416,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_Sticky_NonFi // make another round of decision taskID := int64(59) di = addDecisionTaskScheduledEvent(msBuilder) + msBuilder.updateReplicationStateLastEventID("", s.version, di.ScheduleID) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -442,8 +445,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_Duplication( workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -462,9 +464,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessDecisionTask_Duplication( event := addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, taskListName, uuid.New()) di.StartedID = event.GetEventId() event = addDecisionTaskCompletedEvent(msBuilder, di.ScheduleID, di.StartedID, nil, "some random identity") + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -497,8 +500,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_HasParent( RunId: common.StringPtr(uuid.New()), } - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -525,10 +527,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_HasParent( taskID := int64(59) event = addCompleteWorkflowEvent(msBuilder, event.GetEventId(), nil) - msBuilder.updateReplicationStateLastEventID("", version, event.GetEventId()) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -562,8 +564,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_NoParent() workflowType := "some random workflow type" taskListName := "some random task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -584,10 +585,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCloseExecution_NoParent() taskID := int64(59) event = addCompleteWorkflowEvent(msBuilder, event.GetEventId(), nil) - msBuilder.updateReplicationStateLastEventID("", version, event.GetEventId()) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -620,8 +621,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Success() RunId: common.StringPtr(uuid.New()), } - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -644,7 +644,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Success() event, rci := addRequestCancelInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -662,6 +662,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Success() s.mockHistoryClient.On("RequestCancelWorkflowExecution", nil, s.createRequetCancelWorkflowExecutionRequest(transferTask, rci)).Return(nil).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once() s.mockQueueAckMgr.On("completeQueueTask", taskID).Return(nil).Once() s.Nil(s.transferQueueActiveProcessor.process(transferTask)) @@ -682,8 +683,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Failure() RunId: common.StringPtr(uuid.New()), } - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -704,9 +704,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Failure() taskID := int64(59) event, rci := addRequestCancelInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -724,6 +725,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Failure() s.mockHistoryClient.On("RequestCancelWorkflowExecution", nil, s.createRequetCancelWorkflowExecutionRequest(transferTask, rci)).Return(&workflow.EntityNotExistsError{}).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once() s.mockQueueAckMgr.On("completeQueueTask", taskID).Return(nil).Once() s.Nil(s.transferQueueActiveProcessor.process(transferTask)) @@ -744,8 +746,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Duplicati RunId: common.StringPtr(uuid.New()), } - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -768,7 +769,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Duplicati event, _ = addRequestCancelInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -781,7 +782,8 @@ func (s *transferQueueActiveProcessorSuite) TestProcessCancelExecution_Duplicati ScheduleID: event.GetEventId(), } - addCancelRequestedEvent(msBuilder, event.GetEventId(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) + event = addCancelRequestedEvent(msBuilder, event.GetEventId(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) @@ -807,8 +809,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Success() signalInput := []byte("some random signal input") signalControl := []byte("some random signal control") - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -830,9 +831,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Success() taskID := int64(59) event, si := addRequestSignalInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, signalControl) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -850,6 +852,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Success() s.mockHistoryClient.On("SignalWorkflowExecution", nil, s.createSignallWorkflowExecutionRequest(transferTask, si)).Return(nil).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once() s.mockHistoryClient.On("RemoveSignalMutableState", nil, &history.RemoveSignalMutableStateRequest{ DomainUUID: common.StringPtr(transferTask.TargetDomainID), @@ -881,8 +884,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Failure() signalInput := []byte("some random signal input") signalControl := []byte("some random signal control") - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -904,9 +906,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Failure() taskID := int64(59) event, si := addRequestSignalInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, signalControl) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -924,6 +927,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Failure() s.mockHistoryClient.On("SignalWorkflowExecution", nil, s.createSignallWorkflowExecutionRequest(transferTask, si)).Return(&workflow.EntityNotExistsError{}).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once() s.mockQueueAckMgr.On("completeQueueTask", taskID).Return(nil).Once() s.Nil(s.transferQueueActiveProcessor.process(transferTask)) @@ -947,8 +951,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Duplicati signalInput := []byte("some random signal input") signalControl := []byte("some random signal control") - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -972,7 +975,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Duplicati targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, signalInput, signalControl) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -985,7 +988,8 @@ func (s *transferQueueActiveProcessorSuite) TestProcessSignalExecution_Duplicati ScheduleID: event.GetEventId(), } - addSignaledEvent(msBuilder, event.GetEventId(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), nil) + event = addSignaledEvent(msBuilder, event.GetEventId(), targetDomainID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), nil) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) persistenceMutableState := createMutableState(msBuilder) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) @@ -1010,8 +1014,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe childWorkflowType := "some random child workflow type" childTaskListName := "some random child task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -1033,9 +1036,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe taskID := int64(59) event, ci := addStartChildWorkflowExecutionInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), childDomainID, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -1048,15 +1052,13 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe ScheduleID: event.GetEventId(), } - // event = addChildWorkflowExecutionStartedEvent(msBuilder, event.GetEventId(), childDomainID, childWorkflowID, uuid.New(), childWorkflowType) - // ci.StartedID = event.GetEventId() - persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ Info: &persistence.DomainInfo{Name: domainName}, Config: &persistence.DomainConfig{}, ReplicationConfig: &persistence.DomainReplicationConfig{}, + FailoverVersion: s.version, TableVersion: persistence.DomainTableVersionV1, }, nil).Once() s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ @@ -1075,6 +1077,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe )).Return(&workflow.StartWorkflowExecutionResponse{RunId: common.StringPtr(childRunID)}, nil).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockHistoryClient.On("ScheduleDecisionTask", nil, &history.ScheduleDecisionTaskRequest{ DomainUUID: common.StringPtr(childDomainID), WorkflowExecution: &workflow.WorkflowExecution{ @@ -1104,8 +1107,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu childWorkflowType := "some random child workflow type" childTaskListName := "some random child task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -1127,9 +1129,10 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu taskID := int64(59) event, ci := addStartChildWorkflowExecutionInitiatedEvent(msBuilder, event.GetEventId(), uuid.New(), childDomainID, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -1148,6 +1151,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu Info: &persistence.DomainInfo{Name: domainName}, Config: &persistence.DomainConfig{}, ReplicationConfig: &persistence.DomainReplicationConfig{}, + FailoverVersion: s.version, TableVersion: persistence.DomainTableVersionV1, }, nil).Once() s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ @@ -1166,6 +1170,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu )).Return(nil, &workflow.WorkflowExecutionAlreadyStartedError{}).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + s.mockClusterMetadata.On("ClusterNameForFailoverVersion", s.version).Return(cluster.TestCurrentClusterName) s.mockTimerQueueProcessor.On("NotifyNewTimers", cluster.TestCurrentClusterName, mock.Anything, mock.Anything).Once() s.mockQueueAckMgr.On("completeQueueTask", taskID).Return(nil).Once() @@ -1189,8 +1194,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe childWorkflowType := "some random child workflow type" childTaskListName := "some random child task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -1214,7 +1218,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe childDomainID, childWorkflowID, childWorkflowType, childTaskListName, nil, 1, 1) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -1229,6 +1233,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe event = addChildWorkflowExecutionStartedEvent(msBuilder, event.GetEventId(), childDomainID, childWorkflowID, childRunID, childWorkflowType) ci.StartedID = event.GetEventId() + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil @@ -1236,6 +1241,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe Info: &persistence.DomainInfo{Name: domainName}, Config: &persistence.DomainConfig{}, ReplicationConfig: &persistence.DomainReplicationConfig{}, + FailoverVersion: s.version, TableVersion: persistence.DomainTableVersionV1, }, nil).Once() s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ @@ -1276,8 +1282,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli childWorkflowType := "some random child workflow type" childTaskListName := "some random child task list" - version := int64(4096) - msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, version) + msBuilder := newMutableStateBuilderWithReplicationState(s.mockShard.GetConfig(), s.logger, s.version) msBuilder.AddWorkflowExecutionStartedEvent( execution, &history.StartWorkflowExecutionRequest{ @@ -1301,7 +1306,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli childDomainID, childExecution.GetWorkflowId(), childWorkflowType, childTaskListName, nil, 1, 1) transferTask := &persistence.TransferTaskInfo{ - Version: version, + Version: s.version, DomainID: domainID, WorkflowID: execution.GetWorkflowId(), RunID: execution.GetRunId(), @@ -1316,10 +1321,11 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli event = addChildWorkflowExecutionStartedEvent(msBuilder, event.GetEventId(), childDomainID, childExecution.GetWorkflowId(), childExecution.GetRunId(), childWorkflowType) ci.StartedID = event.GetEventId() - addChildWorkflowExecutionCompletedEvent(msBuilder, ci.InitiatedID, &childExecution, &workflow.WorkflowExecutionCompletedEventAttributes{ + event = addChildWorkflowExecutionCompletedEvent(msBuilder, ci.InitiatedID, &childExecution, &workflow.WorkflowExecutionCompletedEventAttributes{ Result: []byte("some random child workflow execution result"), DecisionTaskCompletedEventId: common.Int64Ptr(transferTask.ScheduleID), }) + msBuilder.updateReplicationStateLastEventID("", s.version, event.GetEventId()) persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil @@ -1327,6 +1333,7 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli Info: &persistence.DomainInfo{Name: domainName}, Config: &persistence.DomainConfig{}, ReplicationConfig: &persistence.DomainReplicationConfig{}, + FailoverVersion: s.version, TableVersion: persistence.DomainTableVersionV1, }, nil).Once() s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index f56d02e88bf..a32f93077b8 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -29,6 +29,8 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/persistence" ) @@ -42,6 +44,7 @@ type ( domainID string workflowExecution workflow.WorkflowExecution shard ShardContext + clusterMetadata cluster.Metadata executionManager persistence.ExecutionManager logger bark.Logger @@ -67,6 +70,7 @@ func newWorkflowExecutionContext(domainID string, execution workflow.WorkflowExe domainID: domainID, workflowExecution: execution, shard: shard, + clusterMetadata: shard.GetService().GetClusterMetadata(), executionManager: executionManager, logger: lg, locker: common.NewMutex(), @@ -168,8 +172,16 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi // Only generate replication task if this is a global domain createReplicationTask := c.msBuilder.replicationState != nil - lastWriteVersion := c.msBuilder.GetCurrentVersion() - return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", lastWriteVersion, transactionID) + currentVersion := c.msBuilder.GetCurrentVersion() + if createReplicationTask { + activeCluster := c.clusterMetadata.ClusterNameForFailoverVersion(currentVersion) + currentCluster := c.clusterMetadata.GetCurrentClusterName() + if activeCluster != currentCluster { + return errors.NewDomainNotActiveError(c.msBuilder.executionInfo.DomainID, currentCluster, activeCluster) + } + } + + return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", currentVersion, transactionID) } func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,