Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: when multiple activity got timeouted, there will be at most o… #655

Merged
merged 2 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.createTimerTasks(batch, request.TimerTasks, request.DeleteTimerTask, request.ExecutionInfo.DomainID,
executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp)

d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfo, executionInfo.DomainID,
d.updateActivityInfos(batch, request.UpsertActivityInfos, request.DeleteActivityInfos, executionInfo.DomainID,
executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateTimerInfos(batch, request.UpserTimerInfos, request.DeleteTimerInfos, executionInfo.DomainID,
Expand Down Expand Up @@ -2229,7 +2229,7 @@ func (d *cassandraPersistence) createTimerTasks(batch *gocql.Batch, timerTasks [
}
}

func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfo *int64,
func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, deleteInfos []int64,
domainID, workflowID, runID string, condition int64, rangeID int64) {

for _, a := range activityInfos {
Expand Down Expand Up @@ -2262,9 +2262,9 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
condition)
}

if deleteInfo != nil {
for _, deleteInfo := range deleteInfos {
batch.Query(templateDeleteActivityInfoQuery,
*deleteInfo,
deleteInfo,
d.shardID,
rowTypeExecution,
domainID,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
s.Equal(currentTime.Unix(), ai.LastHeartBeatUpdatedTime.Unix())
s.Equal(int32(1), ai.TimerTaskStatus)

err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, common.Int64Ptr(1), nil, nil)
err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, nil, nil, []int64{1}, nil, nil)
s.Nil(err2, "No error expected.")

state, err1 = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ type (

// Mutable state
UpsertActivityInfos []*ActivityInfo
DeleteActivityInfo *int64
DeleteActivityInfos []int64
UpserTimerInfos []*TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*ChildExecutionInfo
Expand Down
16 changes: 8 additions & 8 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co
DeleteTimerTask: nil,
RangeID: s.ShardInfo.RangeID,
UpsertActivityInfos: nil,
DeleteActivityInfo: nil,
DeleteActivityInfos: nil,
UpserTimerInfos: nil,
DeleteTimerInfos: nil,
ContinueAsNew: &CreateWorkflowExecutionRequest{
Expand Down Expand Up @@ -454,10 +454,10 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co
// UpdateWorkflowExecution is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64,
upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64,
upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error {
return s.UpdateWorkflowExecutionWithRangeID(updatedInfo, decisionScheduleIDs, activityScheduleIDs,
s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfo,
s.ShardInfo.RangeID, condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfos,
upsertTimerInfos, deleteTimerInfos, nil, nil, nil, nil,
nil, nil, nil, "")
}
Expand All @@ -474,7 +474,7 @@ func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *WorkflowExecuti
DeleteTimerTask: nil,
RangeID: s.ShardInfo.RangeID,
UpsertActivityInfos: nil,
DeleteActivityInfo: nil,
DeleteActivityInfos: nil,
UpserTimerInfos: nil,
DeleteTimerInfos: nil,
FinishExecution: true,
Expand Down Expand Up @@ -563,13 +563,13 @@ func (s *TestBase) UpdateWorklowStateAndReplication(updatedInfo *WorkflowExecuti
// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo,
upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo,
deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64,
upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64,
upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64,
upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error {
return s.UpdateWorkflowExecutionWithReplication(updatedInfo, nil, decisionScheduleIDs, activityScheduleIDs, rangeID,
condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, upsertTimerInfos, deleteTimerInfos,
condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfos, upsertTimerInfos, deleteTimerInfos,
upsertChildInfos, deleteChildInfo, upsertCancelInfos, deleteCancelInfo, upsertSignalInfos, deleteSignalInfo,
upsertSignalRequestedIDs, deleteSignalRequestedID)
}
Expand All @@ -578,7 +578,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecu
func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowExecutionInfo,
updatedReplicationState *ReplicationState, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID,
condition int64, timerTasks []Task, txTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo,
deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string,
deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string,
upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo,
deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string,
deleteSignalRequestedID string) error {
Expand Down Expand Up @@ -620,7 +620,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowE
DeleteTimerTask: deleteTimerTask,
RangeID: rangeID,
UpsertActivityInfos: upsertActivityInfos,
DeleteActivityInfo: deleteActivityInfo,
DeleteActivityInfos: deleteActivityInfos,
UpserTimerInfos: upsertTimerInfos,
DeleteTimerInfos: deleteTimerInfos,
UpsertChildExecutionInfos: upsertChildInfos,
Expand Down
11 changes: 6 additions & 5 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type (
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityInfoByActivityID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos []*persistence.ActivityInfo // Modified activities from last update.
deleteActivityInfo *int64 // Deleted activities from last update.
deleteActivityInfos []int64 // Deleted activities from last update.

pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
updateTimerInfos []*persistence.TimerInfo // Modified timers from last update.
Expand Down Expand Up @@ -82,7 +82,7 @@ type (
mutableStateSessionUpdates struct {
newEventsBuilder *historyBuilder
updateActivityInfos []*persistence.ActivityInfo
deleteActivityInfo *int64
deleteActivityInfos []int64
updateTimerInfos []*persistence.TimerInfo
deleteTimerInfos []string
updateChildExecutionInfos []*persistence.ChildExecutionInfo
Expand Down Expand Up @@ -115,6 +115,7 @@ func newMutableStateBuilder(config *Config, logger bark.Logger) *mutableStateBui
updateActivityInfos: []*persistence.ActivityInfo{},
pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo),
pendingActivityInfoByActivityID: make(map[string]int64),
deleteActivityInfos: []int64{},
pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo),
updateTimerInfos: []*persistence.TimerInfo{},
deleteTimerInfos: []string{},
Expand Down Expand Up @@ -244,7 +245,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,
updates := &mutableStateSessionUpdates{
newEventsBuilder: e.hBuilder,
updateActivityInfos: e.updateActivityInfos,
deleteActivityInfo: e.deleteActivityInfo,
deleteActivityInfos: e.deleteActivityInfos,
updateTimerInfos: e.updateTimerInfos,
deleteTimerInfos: e.deleteTimerInfos,
updateChildExecutionInfos: e.updateChildExecutionInfos,
Expand All @@ -263,7 +264,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,
// Clear all updates to prepare for the next session
e.hBuilder = newHistoryBuilder(e, e.logger)
e.updateActivityInfos = []*persistence.ActivityInfo{}
e.deleteActivityInfo = nil
e.deleteActivityInfos = []int64{}
e.updateTimerInfos = []*persistence.TimerInfo{}
e.deleteTimerInfos = []string{}
e.updateChildExecutionInfos = []*persistence.ChildExecutionInfo{}
Expand Down Expand Up @@ -675,7 +676,7 @@ func (e *mutableStateBuilder) DeleteActivity(scheduleEventID int64) error {
}
delete(e.pendingActivityInfoByActivityID, a.ActivityID)

e.deleteActivityInfo = common.Int64Ptr(scheduleEventID)
e.deleteActivityInfos = append(e.deleteActivityInfos, scheduleEventID)
return nil
}

Expand Down
72 changes: 71 additions & 1 deletion service/history/timerQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,69 @@ func (s *timerQueueProcessorSuite) TestTimerActivityTaskHeartBeat_JustStarted()
s.False(running)
}

func (s *timerQueueProcessorSuite) TestTimerActivityTask_SameExpiry() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("activity-timer-same-expiry-test"),
RunId: common.StringPtr(validRunID),
}

taskList := "activity-timer-queue"
s.createExecutionWithTimers(domainID, workflowExecution, taskList, "identity", []int32{})

// TimeoutTypeScheduleToClose - Scheduled, started, completed.
p := newTimerQueueProcessor(s.ShardContext, s.engineImpl, s.WorkflowMgr, s.logger).(*timerQueueProcessorImpl)
p.Start()

state, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder := newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
condition := state.ExecutionInfo.NextEventID

ase1, ai1 := builder.AddActivityTaskScheduledEvent(emptyEventID,
&workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("testID-1"),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
})
s.NotNil(ase1)
ase2, ai2 := builder.AddActivityTaskScheduledEvent(emptyEventID,
&workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("testID-2"),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
})
s.NotNil(ase2)

// create a schedule to close timeout
tBuilder := newTimerBuilder(s.ShardContext.GetConfig(), s.logger, &mockTimeSource{currTime: time.Now()})
t, err := tBuilder.AddScheduleToCloseActivityTimeout(ai1)
s.NoError(err)
s.NotNil(t)
t, err = tBuilder.AddScheduleToCloseActivityTimeout(ai2)
s.NoError(err)
s.NotNil(t)
timerTasks := []persistence.Task{t}

s.updateHistoryAndTimers(builder, timerTasks, condition)
p.NotifyNewTimers(cluster.TestCurrentClusterName, timerTasks)

s.waitForTimerTasksToProcess(p)
s.Equal(uint64(1), p.getTimerFiredCount(cluster.TestCurrentClusterName))
running := s.checkTimedOutEventFor(domainID, workflowExecution, *ase1.EventId)
s.False(running)
running = s.checkTimedOutEventFor(domainID, workflowExecution, *ase2.EventId)
s.False(running)

// assert activity infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
s.Equal(0, len(builder.pendingActivityInfoIDs))
}

func (s *timerQueueProcessorSuite) TestTimerUserTimers() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-test"),
Expand All @@ -781,7 +844,7 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimers() {
s.False(running)
}

func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() {
func (s *timerQueueProcessorSuite) TestTimerUserTimers_SameExpiry() {
domainID := testDomainActiveID
workflowExecution := workflow.WorkflowExecution{WorkflowId: common.StringPtr("user-timer-same-expiry-test"),
RunId: common.StringPtr(validRunID)}
Expand Down Expand Up @@ -823,6 +886,13 @@ func (s *timerQueueProcessorSuite) TestTimerUserTimersSameExpiry() {
s.False(running)
running = s.checkTimedOutEventForUserTimer(domainID, workflowExecution, ti2.TimerID)
s.False(running)

// assert user timer infos are deleted
state, err = s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err)
builder = newMutableStateBuilder(s.ShardContext.GetConfig(), s.logger)
builder.Load(state)
s.Equal(0, len(builder.pendingTimerInfoIDs))
}

func (s *timerQueueProcessorSuite) TestTimersOnClosedWorkflow() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
Condition: c.updateCondition,
DeleteTimerTask: c.deleteTimerTask,
UpsertActivityInfos: updates.updateActivityInfos,
DeleteActivityInfo: updates.deleteActivityInfo,
DeleteActivityInfos: updates.deleteActivityInfos,
UpserTimerInfos: updates.updateTimerInfos,
DeleteTimerInfos: updates.deleteTimerInfos,
UpsertChildExecutionInfos: updates.updateChildExecutionInfos,
Expand Down