Skip to content

Commit

Permalink
Fixes various issues with ActivityTimeout Processing (#528)
Browse files Browse the repository at this point in the history
* Fixes various issues with ActivityTimeout Processing

Heartbeart timeout fixes to create next timeout if first heartbeat timer
fires and activity is heartbeating.

Always include ScheduleToClose timeout when creating next activity
timeout.

Fix logic for correctly setting TimerCreated flag on TimerDetails.

Remove redundant code for all activity APIs from history engine.
  • Loading branch information
samarabbas authored Jan 26, 2018
1 parent 6fe6949 commit 78c4b7a
Show file tree
Hide file tree
Showing 4 changed files with 484 additions and 425 deletions.
238 changes: 238 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,244 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Timeout() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestActivityTimeouts() {
id := "integration-activity-timeout-test"
wt := "integration-activity-timeout-test-type"
tl := "integration-activity-timeout-test-tasklist"
identity := "worker1"
activityName := "timeout_activity"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowComplete := false
activitiesScheduled := false
activitiesMap := map[int64]*workflow.HistoryEvent{}
failWorkflow := false
failReason := ""
var activityATimedout, activityBTimedout, activityCTimedout, activityDTimedout bool
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if !activitiesScheduled {
activitiesScheduled = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("A"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: common.StringPtr("NoWorker")},
Input: []byte("ScheduleToStart"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using ScheduleToStart
StartToCloseTimeoutSeconds: common.Int32Ptr(30),
HeartbeatTimeoutSeconds: common.Int32Ptr(0),
},
}, {
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("B"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: []byte("ScheduleClose"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(7), // ActivityID B is expected to timeout using ScheduleClose
ScheduleToStartTimeoutSeconds: common.Int32Ptr(5),
StartToCloseTimeoutSeconds: common.Int32Ptr(10),
HeartbeatTimeoutSeconds: common.Int32Ptr(0),
},
}, {
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("C"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: []byte("StartToClose"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(15),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
StartToCloseTimeoutSeconds: common.Int32Ptr(5), // ActivityID C is expected to timeout using StartToClose
HeartbeatTimeoutSeconds: common.Int32Ptr(0),
},
}, {
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("D"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: []byte("Heartbeat"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(20),
StartToCloseTimeoutSeconds: common.Int32Ptr(15),
HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID D is expected to timeout using Heartbeat
},
}}, nil
} else if previousStartedEventID > 0 {
for _, event := range history.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeActivityTaskScheduled {
activitiesMap[event.GetEventId()] = event
}

if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut {
timeoutEvent := event.ActivityTaskTimedOutEventAttributes
scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()]
if !ok {
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr("ScheduledEvent not found."),
},
}}, nil
}

switch timeoutEvent.GetTimeoutType() {
case workflow.TimeoutTypeScheduleToStart:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" {
activityATimedout = true
} else {
failWorkflow = true
failReason = "ActivityID A is expected to timeout with ScheduleToStart"
}
case workflow.TimeoutTypeScheduleToClose:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "B" {
activityBTimedout = true
} else {
failWorkflow = true
failReason = "ActivityID B is expected to timeout with ScheduleToClose"
}
case workflow.TimeoutTypeStartToClose:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "C" {
activityCTimedout = true
} else {
failWorkflow = true
failReason = "ActivityID C is expected to timeout with StartToClose"
}
case workflow.TimeoutTypeHeartbeat:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "D" {
activityDTimedout = true
} else {
failWorkflow = true
failReason = "ActivityID D is expected to timeout with Heartbeat"
}
}
}
}
}

if failWorkflow {
s.logger.Errorf("Failing workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr(failReason),
},
}}, nil
}

if activityATimedout && activityBTimedout && activityCTimedout && activityDTimedout {
s.logger.Info("Completing Workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

return nil, []*workflow.Decision{}, nil
}

atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {
s.Equal(id, *execution.WorkflowId)
s.Equal(activityName, *activityType.Name)
timeoutType := string(input)
switch timeoutType {
case "ScheduleToStart":
s.Fail("Activity A not expected to be started.")
case "ScheduleClose":
s.logger.Infof("Sleeping activityB for 6 seconds.")
time.Sleep(7 * time.Second)
case "StartToClose":
s.logger.Infof("Sleeping activityC for 6 seconds.")
time.Sleep(8 * time.Second)
case "Heartbeat":
s.logger.Info("Starting hearbeat activity.")
go func() {
for i := 0; i < 6; i++ {
s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i)
_, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken, Details: []byte(string(i))})
s.Nil(err)
time.Sleep(1 * time.Second)
}
s.logger.Info("End Heartbeating.")
}()
s.logger.Info("Sleeping hearbeat activity.")
time.Sleep(10 * time.Second)
}

return []byte("Activity Result."), false, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: atHandler,
logger: s.logger,
suite: s,
}

_, err := poller.pollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)

for i := 0; i < 3; i++ {
go func() {
err = poller.pollAndProcessActivityTask(false)
s.logger.Infof("Activity Processing Completed. Error: %v", err)
}()
}

s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId)
for i := 0; i < 10; i++ {
s.logger.Infof("Processing decision task: %v", i)
_, err := poller.pollAndProcessDecisionTask(false, false)
s.Nil(err, "Poll for decision task failed.")

if workflowComplete {
break
}
}

s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})
s.True(workflowComplete)
}

func (s *integrationSuite) TestSequential_UserTimers() {
id := "interation-sequential-user-timers-test"
wt := "interation-sequential-user-timers-test-type"
Expand Down
Loading

0 comments on commit 78c4b7a

Please sign in to comment.