diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index 7dff36dbea5..98e64499f79 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -1335,7 +1335,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx d.createTimerTasks(batch, request.TimerTasks, request.DeleteTimerTask, request.ExecutionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp) - d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfo, executionInfo.DomainID, + d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfos, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID) d.updateTimerInfos(batch, request.UpserTimerInfos, request.DeleteTimerInfos, executionInfo.DomainID, @@ -2229,7 +2229,7 @@ func (d *cassandraPersistence) createTimerTasks(batch *gocql.Batch, timerTasks [ } } -func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfo *int64, +func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfos []int64, domainID, workflowID, runID string, condition int64, rangeID int64) { for _, a := range activityInfos { @@ -2262,9 +2262,9 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI condition) } - if deleteInfo != nil { + for _, deleteInfo := range deleteInfos { batch.Query(templateDeleteActivityInfoQuery, - *deleteInfo, + deleteInfo, d.shardID, rowTypeExecution, domainID, diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index 9789258acab..e1432d22220 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -959,7 +959,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() { s.Equal(currentTime.Unix(), ai.LastHeartBeatUpdatedTime.Unix()) s.Equal(int32(1), ai.TimerTaskStatus) - err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, common.Int64Ptr(1), nil, nil) + err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, []int64{1}, nil, nil) s.Nil(err2, "No error expected.") state, err1 = s.GetWorkflowExecutionInfo(domainID, workflowExecution) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 756ee1df01d..2c45b53b2ab 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -512,7 +512,7 @@ type ( // Mutable state UpsertActivityInfos []*ActivityInfo - DeleteActivityInfo *int64 + DeleteActivityInfos []int64 UpserTimerInfos []*TimerInfo DeleteTimerInfos []string UpsertChildExecutionInfos []*ChildExecutionInfo diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 035071747a5..2934405e98d 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -425,7 +425,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co DeleteTimerTask: nil, RangeID: s.ShardInfo.RangeID, UpsertActivityInfos: nil, - DeleteActivityInfo: nil, + DeleteActivityInfos: nil, UpserTimerInfos: nil, DeleteTimerInfos: nil, ContinueAsNew: &CreateWorkflowExecutionRequest{ @@ -454,10 +454,10 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co // UpdateWorkflowExecution is a utility method to update workflow execution func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task, - upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, + upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error { return s.UpdateWorkflowExecutionWithRangeID(updatedInfo, decisionScheduleIDs, activityScheduleIDs, - s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, + s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfos, upsertTimerInfos, deleteTimerInfos, nil, nil, nil, nil, nil, nil, nil, "") } @@ -474,7 +474,7 @@ func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *WorkflowExecuti DeleteTimerTask: nil, RangeID: s.ShardInfo.RangeID, UpsertActivityInfos: nil, - DeleteActivityInfo: nil, + DeleteActivityInfos: nil, UpserTimerInfos: nil, DeleteTimerInfos: nil, FinishExecution: true, @@ -563,13 +563,13 @@ func (s *TestBase) UpdateWorklowStateAndReplication(updatedInfo *WorkflowExecuti // UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, - upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, + upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error { return s.UpdateWorkflowExecutionWithReplication(updatedInfo, nil, decisionScheduleIDs, activityScheduleIDs, rangeID, - condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, upsertTimerInfos, deleteTimerInfos, + condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfos, upsertTimerInfos, deleteTimerInfos, upsertChildInfos, deleteChildInfo, upsertCancelInfos, deleteCancelInfo, upsertSignalInfos, deleteSignalInfo, upsertSignalRequestedIDs, deleteSignalRequestedID) } @@ -578,7 +578,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecu func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowExecutionInfo, updatedReplicationState *ReplicationState, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, txTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, - deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, + deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error { @@ -620,7 +620,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowE DeleteTimerTask: deleteTimerTask, RangeID: rangeID, UpsertActivityInfos: upsertActivityInfos, - DeleteActivityInfo: deleteActivityInfo, + DeleteActivityInfos: deleteActivityInfos, UpserTimerInfos: upsertTimerInfos, DeleteTimerInfos: deleteTimerInfos, UpsertChildExecutionInfos: upsertChildInfos, diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index f284fdaf945..99b443f73f0 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -44,7 +44,7 @@ type ( pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info. pendingActivityInfoByActivityID map[string]int64 // Activity ID -> Schedule Event ID of the activity. updateActivityInfos []*persistence.ActivityInfo // Modified activities from last update. - deleteActivityInfo *int64 // Deleted activities from last update. + deleteActivityInfos []int64 // Deleted activities from last update. pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info. updateTimerInfos []*persistence.TimerInfo // Modified timers from last update. @@ -82,7 +82,7 @@ type ( mutableStateSessionUpdates struct { newEventsBuilder *historyBuilder updateActivityInfos []*persistence.ActivityInfo - deleteActivityInfo *int64 + deleteActivityInfos []int64 updateTimerInfos []*persistence.TimerInfo deleteTimerInfos []string updateChildExecutionInfos []*persistence.ChildExecutionInfo @@ -115,6 +115,7 @@ func newMutableStateBuilder(config *Config, logger bark.Logger) *mutableStateBui updateActivityInfos: []*persistence.ActivityInfo{}, pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo), pendingActivityInfoByActivityID: make(map[string]int64), + deleteActivityInfos: []int64{}, pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo), updateTimerInfos: []*persistence.TimerInfo{}, deleteTimerInfos: []string{}, @@ -244,7 +245,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates, updates := &mutableStateSessionUpdates{ newEventsBuilder: e.hBuilder, updateActivityInfos: e.updateActivityInfos, - deleteActivityInfo: e.deleteActivityInfo, + deleteActivityInfos: e.deleteActivityInfos, updateTimerInfos: e.updateTimerInfos, deleteTimerInfos: e.deleteTimerInfos, updateChildExecutionInfos: e.updateChildExecutionInfos, @@ -263,7 +264,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates, // Clear all updates to prepare for the next session e.hBuilder = newHistoryBuilder(e, e.logger) e.updateActivityInfos = []*persistence.ActivityInfo{} - e.deleteActivityInfo = nil + e.deleteActivityInfos = []int64{} e.updateTimerInfos = []*persistence.TimerInfo{} e.deleteTimerInfos = []string{} e.updateChildExecutionInfos = []*persistence.ChildExecutionInfo{} @@ -675,7 +676,7 @@ func (e *mutableStateBuilder) DeleteActivity(scheduleEventID int64) error { } delete(e.pendingActivityInfoByActivityID, a.ActivityID) - e.deleteActivityInfo = common.Int64Ptr(scheduleEventID) + e.deleteActivityInfos = append(e.deleteActivityInfos, scheduleEventID) return nil } diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index 5bcea7bfc6a..5ac3f151a6a 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -759,6 +759,69 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskHeartBeat_JustStarted() s.False(running) } +func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() { + domainID := testDomainActiveID + workflowExecution := workflow.WorkflowExecution{ + WorkflowId: common.StringPtr("activity-timer-same-expiry-test"), + RunId: common.StringPtr(validRunID), + } + + taskList := "activity-timer-queue" + s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{}) + + // TimeoutTypeScheduleToClose - Scheduled, started, completed. + p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl) + p.Start() + + state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err) + builder := newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger) + builder.Load(state) + condition := state.ExecutionInfo.NextEventID + + ase1, ai1 := builder.AddActivityTaskScheduledEvent(emptyEventID, + &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("testID-1"), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1), + }) + s.NotNil(ase1) + ase2, ai2 := builder.AddActivityTaskScheduledEvent(emptyEventID, + &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("testID-2"), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1), + }) + s.NotNil(ase2) + + // create a schedule to close timeout + tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()}) + t, err := tBuilder.AddScheduleToCloseActivityTimeout(ai1) + s.NoError(err) + s.NotNil(t) + t, err = tBuilder.AddScheduleToCloseActivityTimeout(ai2) + s.NoError(err) + s.NotNil(t) + timerTasks := []persistence.Task{t} + + s.updateHistoryAndTimers(builder, timerTasks, condition) + p.NotifyNewTimers(cluster.TestCurrentClusterName, timerTasks) + + s.waitForTimerTasksToProcess(p) + s.Equal(uint64(1), p.getTimerFiredCount(cluster.TestCurrentClusterName)) + running := s.checkTimedOutEventFor(domainID, workflowExecution, *ase1.EventId) + s.False(running) + running = s.checkTimedOutEventFor(domainID, workflowExecution, *ase2.EventId) + s.False(running) + + // assert activity infos are deleted + state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err) + builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger) + builder.Load(state) + s.Equal(0, len(builder.pendingActivityInfoIDs)) +} + func (s *timerQueueProcessorSuite) TestTimerUserTimers() { domainID := testDomainActiveID workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-test"), @@ -781,7 +844,7 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers() { s.False(running) } -func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() { +func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() { domainID := testDomainActiveID workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-same-expiry-test"), RunId: common.StringPtr(validRunID)} @@ -823,6 +886,13 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() { s.False(running) running = s.checkTimedOutEventForUserTimer(domainID, workflowExecution, ti2.TimerID) s.False(running) + + // assert user timer infos are deleted + state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err) + builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger) + builder.Load(state) + s.Equal(0, len(builder.pendingTimerInfoIDs)) } func (s *timerQueueProcessorSuite) TestTimersOnClosedWorkflow() { diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index a960cbafbc9..c0a6a7c0d1c 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -226,7 +226,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe Condition: c.updateCondition, DeleteTimerTask: c.deleteTimerTask, UpsertActivityInfos: updates.updateActivityInfos, - DeleteActivityInfo: updates.deleteActivityInfo, + DeleteActivityInfos: updates.deleteActivityInfos, UpserTimerInfos: updates.updateTimerInfos, DeleteTimerInfos: updates.deleteTimerInfos, UpsertChildExecutionInfos: updates.updateChildExecutionInfos,