diff --git a/host/integration_test.go b/host/integration_test.go index fc04ff46d95..80a2100ed44 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -1028,6 +1028,244 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Timeout() { s.True(workflowComplete) } +func (s *integrationSuite) TestActivityTimeouts() { + id := "integration-activity-timeout-test" + wt := "integration-activity-timeout-test-type" + tl := "integration-activity-timeout-test-tasklist" + identity := "worker1" + activityName := "timeout_activity" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + workflowComplete := false + activitiesScheduled := false + activitiesMap := map[int64]*workflow.HistoryEvent{} + failWorkflow := false + failReason := "" + var activityATimedout, activityBTimedout, activityCTimedout, activityDTimedout bool + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + if !activitiesScheduled { + activitiesScheduled = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("A"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: common.StringPtr("NoWorker")}, + Input: []byte("ScheduleToStart"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using ScheduleToStart + StartToCloseTimeoutSeconds: common.Int32Ptr(30), + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("B"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("ScheduleClose"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(7), // ActivityID B is expected to timeout using ScheduleClose + ScheduleToStartTimeoutSeconds: common.Int32Ptr(5), + StartToCloseTimeoutSeconds: common.Int32Ptr(10), + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("C"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("StartToClose"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(15), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), + StartToCloseTimeoutSeconds: common.Int32Ptr(5), // ActivityID C is expected to timeout using StartToClose + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("D"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("Heartbeat"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(20), + StartToCloseTimeoutSeconds: common.Int32Ptr(15), + HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID D is expected to timeout using Heartbeat + }, + }}, nil + } else if previousStartedEventID > 0 { + for _, event := range history.Events[previousStartedEventID:] { + if event.GetEventType() == workflow.EventTypeActivityTaskScheduled { + activitiesMap[event.GetEventId()] = event + } + + if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut { + timeoutEvent := event.ActivityTaskTimedOutEventAttributes + scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()] + if !ok { + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr("ScheduledEvent not found."), + }, + }}, nil + } + + switch timeoutEvent.GetTimeoutType() { + case workflow.TimeoutTypeScheduleToStart: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" { + activityATimedout = true + } else { + failWorkflow = true + failReason = "ActivityID A is expected to timeout with ScheduleToStart" + } + case workflow.TimeoutTypeScheduleToClose: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "B" { + activityBTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID B is expected to timeout with ScheduleToClose" + } + case workflow.TimeoutTypeStartToClose: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "C" { + activityCTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID C is expected to timeout with StartToClose" + } + case workflow.TimeoutTypeHeartbeat: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "D" { + activityDTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID D is expected to timeout with Heartbeat" + } + } + } + } + } + + if failWorkflow { + s.logger.Errorf("Failing workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr(failReason), + }, + }}, nil + } + + if activityATimedout && activityBTimedout && activityCTimedout && activityDTimedout { + s.logger.Info("Completing Workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + return nil, []*workflow.Decision{}, nil + } + + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + s.Equal(id, *execution.WorkflowId) + s.Equal(activityName, *activityType.Name) + timeoutType := string(input) + switch timeoutType { + case "ScheduleToStart": + s.Fail("Activity A not expected to be started.") + case "ScheduleClose": + s.logger.Infof("Sleeping activityB for 6 seconds.") + time.Sleep(7 * time.Second) + case "StartToClose": + s.logger.Infof("Sleeping activityC for 6 seconds.") + time.Sleep(8 * time.Second) + case "Heartbeat": + s.logger.Info("Starting hearbeat activity.") + go func() { + for i := 0; i < 6; i++ { + s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i) + _, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{ + TaskToken: taskToken, Details: []byte(string(i))}) + s.Nil(err) + time.Sleep(1 * time.Second) + } + s.logger.Info("End Heartbeating.") + }() + s.logger.Info("Sleeping hearbeat activity.") + time.Sleep(10 * time.Second) + } + + return []byte("Activity Result."), false, nil + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: atHandler, + logger: s.logger, + suite: s, + } + + _, err := poller.pollAndProcessDecisionTask(false, false) + s.True(err == nil || err == matching.ErrNoTasks) + + for i := 0; i < 3; i++ { + go func() { + err = poller.pollAndProcessActivityTask(false) + s.logger.Infof("Activity Processing Completed. Error: %v", err) + }() + } + + s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId) + for i := 0; i < 10; i++ { + s.logger.Infof("Processing decision task: %v", i) + _, err := poller.pollAndProcessDecisionTask(false, false) + s.Nil(err, "Poll for decision task failed.") + + if workflowComplete { + break + } + } + + s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(we.GetRunId()), + }) + s.True(workflowComplete) +} + func (s *integrationSuite) TestSequential_UserTimers() { id := "interation-sequential-user-timers-test" wt := "interation-sequential-user-timers-test-type" diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index ed2c52af853..f88292905fe 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -82,6 +82,14 @@ var ( ErrConflict = errors.New("Conditional update failed") // ErrMaxAttemptsExceeded is exported temporarily for integration test ErrMaxAttemptsExceeded = errors.New("Maximum attempts exceeded to update history") + // ErrStaleState is the error returned during state update indicating that cached mutable state could be stale + ErrStaleState = errors.New("Cache mutable state could potentially be stale") + // ErrActivityTaskNotFound is the error to indicate activity task could be duplicate and activity already completed + ErrActivityTaskNotFound = &workflow.EntityNotExistsError{Message: "Activity task not found."} + // ErrWorkflowCompleted is the error to indicate workflow execution already completed + ErrWorkflowCompleted = &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + // ErrDeserializingToken is the error to indicate task token is invalid + ErrDeserializingToken = &workflow.BadRequestError{Message: "Error deserializing task token."} // FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy // for start workflow execution API @@ -617,108 +625,87 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( if err != nil { return nil, err } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, *request.WorkflowExecution) - if err0 != nil { - return nil, err0 - } - defer release() - scheduleID := *request.ScheduleId - requestID := common.StringDefault(request.RequestId) - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err0 := context.loadWorkflowExecution() - if err0 != nil { - return nil, err0 - } - tBuilder := e.getTimerBuilder(&context.workflowExecution) - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) + execution := workflow.WorkflowExecution{ + WorkflowId: request.WorkflowExecution.WorkflowId, + RunId: request.WorkflowExecution.RunId, + } - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } + response := &h.RecordActivityTaskStartedResponse{} + err = e.updateWorkflowExecution(domainID, execution, false, false, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted + } - // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If - // task is not outstanding than it is most probably a duplicate and complete the task. - if !msBuilder.isWorkflowExecutionRunning() || !isRunning { - // Looks like ActivityTask already completed as a result of another call. - // It is OK to drop the task at this point. - logging.LogDuplicateTaskEvent(context.logger, persistence.TransferTaskTypeActivityTask, common.Int64Default(request.TaskId), requestID, - scheduleID, emptyEventID, isRunning) + scheduleID := request.GetScheduleId() + requestID := request.GetRequestId() + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - return nil, &workflow.EntityNotExistsError{Message: "Activity task not found."} - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - scheduledEvent, exists := msBuilder.GetActivityScheduledEvent(scheduleID) - if !exists { - return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} - } + // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If + // task is not outstanding than it is most probably a duplicate and complete the task. + if !isRunning { + // Looks like ActivityTask already completed as a result of another call. + // It is OK to drop the task at this point. + logging.LogDuplicateTaskEvent(e.logger, persistence.TransferTaskTypeActivityTask, + common.Int64Default(request.TaskId), requestID, scheduleID, emptyEventID, isRunning) - if ai.StartedID != emptyEventID { - // If activity is started as part of the current request scope then return a positive response - if ai.RequestID == requestID { - response := &h.RecordActivityTaskStartedResponse{} - startedEvent, exists := msBuilder.GetActivityStartedEvent(scheduleID) - if !exists { - return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} - } - response.ScheduledEvent = scheduledEvent - response.StartedEvent = startedEvent - return response, nil + return nil, ErrActivityTaskNotFound } - // Looks like ActivityTask already started as a result of another call. - // It is OK to drop the task at this point. - logging.LogDuplicateTaskEvent(context.logger, persistence.TransferTaskTypeActivityTask, common.Int64Default(request.TaskId), requestID, - scheduleID, ai.StartedID, isRunning) - - return nil, &h.EventAlreadyStartedError{Message: "Activity task already started."} - } + scheduledEvent, exists := msBuilder.GetActivityScheduledEvent(scheduleID) + if !exists { + return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} + } + response.ScheduledEvent = scheduledEvent + + if ai.StartedID != emptyEventID { + // If activity is started as part of the current request scope then return a positive response + if ai.RequestID == requestID { + startedEvent, exists := msBuilder.GetActivityStartedEvent(scheduleID) + if !exists { + return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} + } + response.StartedEvent = startedEvent + return nil, nil + } - startedEvent := msBuilder.AddActivityTaskStartedEvent(ai, scheduleID, requestID, request.PollRequest) - if startedEvent == nil { - // Unable to add ActivityTaskStarted event to history - return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskStarted event to history."} - } + // Looks like ActivityTask already started as a result of another call. + // It is OK to drop the task at this point. + logging.LogDuplicateTaskEvent(e.logger, persistence.TransferTaskTypeActivityTask, + common.Int64Default(request.TaskId), requestID, scheduleID, ai.StartedID, isRunning) - // Start a timer for the activity task. - timerTasks := []persistence.Task{} - if tt := tBuilder.GetActivityTimerTaskIfNeeded(msBuilder); tt != nil { - timerTasks = append(timerTasks, tt) - } + return nil, &h.EventAlreadyStartedError{Message: "Activity task already started."} + } - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return nil, err2 - } + startedEvent := msBuilder.AddActivityTaskStartedEvent(ai, scheduleID, requestID, request.PollRequest) + if startedEvent == nil { + // Unable to add ActivityTaskStarted event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskStarted event to history."} + } + response.StartedEvent = startedEvent - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operationi again. - if err3 := context.updateWorkflowExecution(nil, timerTasks, transactionID); err3 != nil { - if err3 == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // Start a timer for the activity task. + timerTasks := []persistence.Task{} + if tt := tBuilder.GetActivityTimerTaskIfNeeded(msBuilder); tt != nil { + timerTasks = append(timerTasks, tt) } - return nil, err3 - } - defer e.timerProcessor.NotifyNewTimer(timerTasks) + return timerTasks, nil + }) - response := &h.RecordActivityTaskStartedResponse{} - response.ScheduledEvent = scheduledEvent - response.StartedEvent = startedEvent - return response, nil + if err != nil { + return nil, err } - return nil, ErrMaxAttemptsExceeded + return response, err } // RespondDecisionTaskCompleted completes a decision task @@ -730,7 +717,7 @@ func (e *historyEngineImpl) RespondDecisionTaskCompleted(ctx context.Context, re request := req.CompleteRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1203,7 +1190,7 @@ func (e *historyEngineImpl) RespondDecisionTaskFailed(req *h.RespondDecisionTask request := req.FailedRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1212,21 +1199,21 @@ func (e *historyEngineImpl) RespondDecisionTaskFailed(req *h.RespondDecisionTask } return e.updateWorkflowExecution(domainID, workflowExecution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } scheduleID := token.ScheduleID di, isRunning := msBuilder.GetPendingDecision(scheduleID) if !isRunning || di.Attempt != token.ScheduleAttempt || di.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Decision task not found."} + return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."} } msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID, request.GetCause(), request.Details, request.GetIdentity()) - return nil + return nil, nil }) } @@ -1239,7 +1226,7 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(req *h.RespondActivityT request := req.CompleteRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1247,86 +1234,40 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(req *h.RespondActivityT RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - - startedID := ai.StartedID - if msBuilder.AddActivityTaskCompletedEvent(scheduleID, startedID, request) == nil { - // Unable to add ActivityTaskCompleted event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."} - } - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return err2 - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound } - return err - } - e.timerProcessor.NotifyNewTimer(timerTasks) - return nil - } + startedID := ai.StartedID + if msBuilder.AddActivityTaskCompletedEvent(scheduleID, startedID, request) == nil { + // Unable to add ActivityTaskCompleted event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."} + } - return ErrMaxAttemptsExceeded + return nil, nil + }) } // RespondActivityTaskFailed completes an activity task failure. @@ -1338,7 +1279,7 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(req *h.RespondActivityTask request := req.FailedRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1346,87 +1287,40 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(req *h.RespondActivityTask RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - - startedID := ai.StartedID - if msBuilder.AddActivityTaskFailedEvent(scheduleID, startedID, request) == nil { - // Unable to add ActivityTaskFailed event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskFailed event to history."} - } - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err3 := e.shard.GetNextTransferTaskID() - if err3 != nil { - return err3 - } - - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState } - return err - } - e.timerProcessor.NotifyNewTimer(timerTasks) + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound + } - return nil - } + startedID := ai.StartedID + if msBuilder.AddActivityTaskFailedEvent(scheduleID, startedID, request) == nil { + // Unable to add ActivityTaskFailed event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskFailed event to history."} + } - return ErrMaxAttemptsExceeded + return nil, nil + }) } // RespondActivityTaskCanceled completes an activity task failure. @@ -1438,7 +1332,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(req *h.RespondActivityTa request := req.CancelRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1446,90 +1340,41 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(req *h.RespondActivityTa RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If - // task is not outstanding than it is most probably a duplicate and complete the task. - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - if msBuilder.AddActivityTaskCanceledEvent(scheduleID, ai.StartedID, ai.CancelRequestID, request.Details, - common.StringDefault(request.Identity)) == nil { - // Unable to add ActivityTaskCanceled event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."} - } - - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err3 := e.shard.GetNextTransferTaskID() - if err3 != nil { - return err3 - } - - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState } - return err - } + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound + } - e.timerProcessor.NotifyNewTimer(timerTasks) + if msBuilder.AddActivityTaskCanceledEvent(scheduleID, ai.StartedID, ai.CancelRequestID, request.Details, + common.StringDefault(request.Identity)) == nil { + // Unable to add ActivityTaskCanceled event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."} + } - return nil - } + return nil, nil + }) - return ErrMaxAttemptsExceeded } // RecordActivityTaskHeartbeat records an hearbeat for a task. @@ -1545,7 +1390,7 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( request := req.HeartbeatRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return nil, &workflow.BadRequestError{Message: "Error deserializing task token."} + return nil, ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1553,66 +1398,46 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return nil, err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return nil, err1 - } - - scheduleID := token.ScheduleID - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) + var cancelRequested bool + err = e.updateWorkflowExecution(domainID, workflowExecution, false, false, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + e.logger.Errorf("Heartbeat failed ") + return nil, ErrWorkflowCompleted + } - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } + scheduleID := token.ScheduleID + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v", - scheduleID, ai, isRunning) - return nil, &workflow.EntityNotExistsError{Message: "Activity task not found."} - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - cancelRequested := ai.CancelRequested + if !isRunning || ai.StartedID == emptyEventID { + e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v", scheduleID, ai, + isRunning) + return nil, ErrActivityTaskNotFound + } - e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", - scheduleID, ai, cancelRequested) + cancelRequested = ai.CancelRequested - // Save progress and last HB reported time. - msBuilder.updateActivityProgress(ai, request) + e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", + scheduleID, ai, cancelRequested) - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return nil, err2 - } + // Save progress and last HB reported time. + msBuilder.updateActivityProgress(ai, request) - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(nil, nil, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop - } + return nil, nil + }) - return nil, err - } - return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil + if err != nil { + return &workflow.RecordActivityTaskHeartbeatResponse{}, err } - return &workflow.RecordActivityTaskHeartbeatResponse{}, ErrMaxAttemptsExceeded + return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil } // RequestCancelWorkflowExecution records request cancellation event for workflow execution @@ -1630,9 +1455,9 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( } return e.updateWorkflowExecution(domainID, workflowExecution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } isCancelRequested, cancelRequestID := msBuilder.isCancelRequested() @@ -1641,20 +1466,20 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( if cancelRequest.RequestId != nil { requestID := *cancelRequest.RequestId if requestID != "" && cancelRequestID == requestID { - return nil + return nil, nil } } - return &workflow.CancellationAlreadyRequestedError{ + return nil, &workflow.CancellationAlreadyRequestedError{ Message: "Cancellation already requested for this workflow execution.", } } if msBuilder.AddWorkflowExecutionCancelRequestedEvent("", req) == nil { - return &workflow.InternalServiceError{Message: "Unable to cancel workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to cancel workflow execution."} } - return nil + return nil, nil }) } @@ -1670,24 +1495,24 @@ func (e *historyEngineImpl) SignalWorkflowExecution(signalRequest *h.SignalWorkf } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } // deduplicate by request id for signal decision if requestID := request.GetRequestId(); requestID != "" { if msBuilder.isSignalRequested(requestID) { - return nil + return nil, nil } msBuilder.addSignalRequested(requestID) } if msBuilder.AddWorkflowExecutionSignaled(request) == nil { - return &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} } - return nil + return nil, nil }) } @@ -1703,14 +1528,14 @@ func (e *historyEngineImpl) RemoveSignalMutableState(request *h.RemoveSignalMuta } return e.updateWorkflowExecution(domainID, execution, false, false, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } msBuilder.deleteSignalRequested(request.GetRequestId()) - return nil + return nil, nil }) } @@ -1726,16 +1551,16 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(terminateRequest *h.Termi } return e.updateWorkflowExecution(domainID, execution, true, false, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } if msBuilder.AddWorkflowExecutionTerminatedEvent(request) == nil { - return &workflow.InternalServiceError{Message: "Unable to terminate workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to terminate workflow execution."} } - return nil + return nil, nil }) } @@ -1751,14 +1576,14 @@ func (e *historyEngineImpl) ScheduleDecisionTask(scheduleRequest *h.ScheduleDeci } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } // Noop - return nil + return nil, nil }) } @@ -1774,9 +1599,9 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } initiatedID := *completionRequest.InitiatedId @@ -1786,7 +1611,7 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R // Check mutable state to make sure child execution is in pending child executions ci, isRunning := msBuilder.GetChildExecutionInfo(initiatedID) if !isRunning || ci.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Pending child execution not found."} + return nil, &workflow.EntityNotExistsError{Message: "Pending child execution not found."} } switch *completionEvent.EventType { @@ -1807,13 +1632,13 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R msBuilder.AddChildWorkflowExecutionTimedOutEvent(initiatedID, completedExecution, attributes) } - return nil + return nil, nil }) } func (e *historyEngineImpl) updateWorkflowExecution(domainID string, execution workflow.WorkflowExecution, createDeletionTask, createDecisionTask bool, - action func(builder *mutableStateBuilder) error) error { + action func(builder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error)) error { context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, execution) if err0 != nil { @@ -1829,13 +1654,23 @@ Update_History_Loop: } tBuilder := e.getTimerBuilder(&context.workflowExecution) + var timerTasks []persistence.Task + var err error // conduct caller action - if err := action(msBuilder); err != nil { + if timerTasks, err = action(msBuilder, tBuilder); err != nil { + if err == ErrStaleState { + // Handler detected that cached workflow mutable could potentially be stale + // Reload workflow execution history + context.clear() + continue Update_History_Loop + } + + // Returned error back to the caller return err } var transferTasks []persistence.Task - var timerTasks []persistence.Task + if createDeletionTask { tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, tBuilder) if err != nil { diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index 8459f1ac6af..64da27dd8ea 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -152,11 +152,6 @@ func (tb *timerBuilder) AddScheduleToStartDecisionTimoutTask(scheduleID, schedul return timeOutTask } -func (tb *timerBuilder) AddScheduleToStartActivityTimeout( - ai *persistence.ActivityInfo) *persistence.ActivityTimeoutTask { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeScheduleToStart, ai.ScheduleToStartTimeout, nil) -} - func (tb *timerBuilder) AddScheduleToCloseActivityTimeout( ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) { return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeScheduleToClose, ai.ScheduleToCloseTimeout, nil), nil @@ -167,17 +162,6 @@ func (tb *timerBuilder) AddStartToCloseActivityTimeout(ai *persistence.ActivityI return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeStartToClose, ai.StartToCloseTimeout, nil), nil } -func (tb *timerBuilder) AddHeartBeatActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, - error) { - // We want to create the timer starting from the last heart beat time stamp but - // avoid creating timers before the current timer frame. - targetTime := common.AddSecondsToBaseTime(ai.LastHeartBeatUpdatedTime.UnixNano(), int64(ai.HeartbeatTimeout)) - if targetTime > tb.timeSource.Now().UnixNano() { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeHeartbeat, ai.HeartbeatTimeout, &ai.LastHeartBeatUpdatedTime), nil - } - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeHeartbeat, ai.HeartbeatTimeout, nil), nil -} - // AddActivityTimeoutTask - Adds an activity timeout task. func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64, timeoutType w.TimeoutType, fireTimeout int32, baseTime *time.Time) *persistence.ActivityTimeoutTask { @@ -257,7 +241,7 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder *mutableStateBuil td := tb.activityTimers[0] ai := tb.pendingActivityTimers[td.ActivityID] at := timerTask.(*persistence.ActivityTimeoutTask) - ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(w.TimeoutType(at.TimeoutType)) + ai.TimerTaskStatus = ai.TimerTaskStatus | getActivityTimerStatus(w.TimeoutType(at.TimeoutType)) msBuilder.UpdateActivity(ai) tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", @@ -287,6 +271,16 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { tb.activityTimers = make(timers, 0, len(msBuilder.pendingActivityInfoIDs)) for _, v := range msBuilder.pendingActivityInfoIDs { if v.ScheduleID != emptyEventID { + scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second) + td := &timerDetails{ + SequenceID: SequenceID{VisibilityTimestamp: scheduleToCloseExpiry}, + ActivityID: v.ScheduleID, + EventID: v.ScheduleID, + TimeoutSec: v.ScheduleToCloseTimeout, + TimeoutType: w.TimeoutTypeScheduleToClose, + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0} + tb.activityTimers = append(tb.activityTimers, td) + if v.StartedID != emptyEventID { startToCloseExpiry := v.StartedTime.Add(time.Duration(v.StartToCloseTimeout) * time.Second) td := &timerDetails{ @@ -322,15 +316,6 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { TimeoutType: w.TimeoutTypeScheduleToStart, TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToStart) != 0} tb.activityTimers = append(tb.activityTimers, td) - scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second) - td = &timerDetails{ - SequenceID: SequenceID{VisibilityTimestamp: scheduleToCloseExpiry}, - ActivityID: v.ScheduleID, - EventID: v.ScheduleID, - TimeoutSec: v.ScheduleToCloseTimeout, - TimeoutType: w.TimeoutTypeScheduleToClose, - TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0} - tb.activityTimers = append(tb.activityTimers, td) } } } diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 015903525fb..a905161ab90 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -608,8 +608,8 @@ Update_History_Loop: } var timerTasks []persistence.Task - scheduleNewDecision := false updateHistory := false + createNewTimer := false ExpireActivityTimers: for _, td := range tBuilder.GetActivityTimers(msBuilder) { @@ -650,7 +650,7 @@ Update_History_Loop: t.metricsClient.IncCounter(metrics.TimerTaskActivityTimeoutScope, metrics.HeartbeatTimeoutCounter) t.logger.Debugf("Activity Heartbeat expired: %+v", *ai) - if msBuilder.AddActivityTaskTimedOutEvent(ai.ScheduleID, ai.StartedID, timeoutType, nil) == nil { + if msBuilder.AddActivityTaskTimedOutEvent(ai.ScheduleID, ai.StartedID, timeoutType, ai.Details) == nil { return errFailedToAddTimeoutEvent } updateHistory = true @@ -675,13 +675,14 @@ Update_History_Loop: // if current one is HB task and we need to create next HB task for the same. // NOTE: When record activity HB comes in we only update last heartbeat timestamp, this is the place // where we create next timer task based on that new updated timestamp. - if !td.TaskCreated || (isHeartBeatTask && td.ActivityID == scheduleID) { + if !td.TaskCreated || (isHeartBeatTask && td.EventID == scheduleID) { nextTask := tBuilder.createNewTask(td) timerTasks = []persistence.Task{nextTask} at := nextTask.(*persistence.ActivityTimeoutTask) - ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType)) + ai.TimerTaskStatus = ai.TimerTaskStatus | getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType)) msBuilder.UpdateActivity(ai) + createNewTimer = true t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) @@ -692,10 +693,10 @@ Update_History_Loop: } } - if updateHistory { + if updateHistory || createNewTimer { // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload // the history and try the operation again. - scheduleNewDecision = !msBuilder.HasPendingDecisionTask() + scheduleNewDecision := updateHistory && !msBuilder.HasPendingDecisionTask() err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil) if err != nil { if err == ErrConflict {