Skip to content

Commit

Permalink
Refactor handleWorkflowTaskCompleted retry loop (#2483)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 17, 2022
1 parent d2f41f0 commit 1d33c3e
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 257 deletions.
39 changes: 6 additions & 33 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedConflictOnUpdate() {
addActivityTaskCompletedEvent(msBuilder, activity1ScheduledEvent.EventId,
activity1StartedEvent.EventId, activity1Result, identity)
di2 := addWorkflowTaskScheduledEvent(msBuilder)
workflowTaskStartedEvent2 := addWorkflowTaskStartedEvent(msBuilder, di2.ScheduleID, tq, identity)
addWorkflowTaskStartedEvent(msBuilder, di2.ScheduleID, tq, identity)

tt := &tokenspb.Task{
ScheduleAttempt: 1,
Expand Down Expand Up @@ -1038,19 +1038,9 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedConflictOnUpdate() {
addActivityTaskCompletedEvent(msBuilder, activity2ScheduledEvent.EventId,
activity2StartedEvent.EventId, activity2Result, identity)

ms2 := workflow.TestCloneToProto(msBuilder)
gwmsResponse2 := &persistence.GetWorkflowExecutionResponse{State: ms2}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, &persistence.ConditionFailedError{})

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse2, nil)
var updatedWorkflowMutation persistence.WorkflowMutation
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).DoAndReturn(func(request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) {
updatedWorkflowMutation = request.UpdateWorkflowMutation
return tests.UpdateWorkflowExecutionResponse, nil
})

_, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
Expand All @@ -1059,26 +1049,8 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedConflictOnUpdate() {
Identity: identity,
},
})
s.NoError(err)
s.NotNil(updatedWorkflowMutation)
s.Equal(int64(16), updatedWorkflowMutation.NextEventID)
s.Equal(workflowTaskStartedEvent2.EventId, updatedWorkflowMutation.ExecutionInfo.LastWorkflowTaskStartId)

executionBuilder := s.getBuilder(tests.NamespaceID, we)
activity3Attributes := s.getActivityScheduledEvent(executionBuilder, 13).GetActivityTaskScheduledEventAttributes()
s.Equal(activity3ID, activity3Attributes.ActivityId)
s.Equal(activity3Type, activity3Attributes.ActivityType.Name)
s.Equal(int64(12), activity3Attributes.WorkflowTaskCompletedEventId)
s.Equal(tq, activity3Attributes.TaskQueue.Name)
s.Equal(activity3Input, activity3Attributes.Input)
s.Equal(100*time.Second, timestamp.DurationValue(activity3Attributes.ScheduleToCloseTimeout))
s.Equal(10*time.Second, timestamp.DurationValue(activity3Attributes.ScheduleToStartTimeout))
s.Equal(50*time.Second, timestamp.DurationValue(activity3Attributes.StartToCloseTimeout))
s.Equal(5*time.Second, timestamp.DurationValue(activity3Attributes.HeartbeatTimeout))

di, ok := executionBuilder.GetWorkflowTaskInfo(15)
s.True(ok)
s.EqualValues(int64(100), di.WorkflowTaskTimeout.Seconds())
s.Error(err)
s.ErrorIs(err, consts.ErrConflict)
}

func (s *engineSuite) TestValidateSignalRequest() {
Expand Down Expand Up @@ -1120,6 +1092,8 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedMaxAttemptsExceeded() {
RunId: we.RunId,
ScheduleId: 2,
}
tt.ScheduleId = 4 // Set it to 4 to emulate stale cache.

taskToken, _ := tt.Marshal()
identity := "testIdentity"
input := payloads.EncodeString("input")
Expand Down Expand Up @@ -1149,7 +1123,6 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedMaxAttemptsExceeded() {
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, &persistence.ConditionFailedError{})
}

_, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
Expand All @@ -1160,7 +1133,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedMaxAttemptsExceeded() {
Identity: identity,
},
})
s.NotNil(err)
s.Error(err)
s.Equal(consts.ErrMaxAttemptsExceeded, err)
}

Expand Down
Loading

0 comments on commit 1d33c3e

Please sign in to comment.