Skip to content

Commit

Permalink
Fix missing decision timeout for transient decisions
Browse files Browse the repository at this point in the history
We were using incorrect scheduleID for transient decisions on decision
started event.  This happens if a new event comes in after a transient
decision is scheduled, but before it is started.  We end up a creating a
timeout task with the wrong decision schedule id causing timer
processing to skip that timeout event.
Also added an integration test for this use case.
  • Loading branch information
samarabbas committed Jun 25, 2018
1 parent 622d1a0 commit fdebcf4
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
103 changes: 103 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6146,6 +6146,109 @@ func (s *integrationSuite) TestSignalWithStartWorkflow() {
s.Equal(identity, *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
}

func (s *integrationSuite) TestTransientDecisionTimeout() {
id := "integration-transient-decision-timeout-test"
wt := "integration-transient-decision-timeout-test-type"
tl := "integration-transient-decision-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)
s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

// decider logic
workflowComplete := false
failDecision := true
signalCount := 0
//var signalEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if failDecision {
failDecision = false
return nil, nil, errors.New("Decider Panic")
}

// Count signals
for _, event := range history.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeWorkflowExecutionSignaled {
signalCount++
}
}


workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
suite: s,
}

// First decision immediately fails and schedules a transient decision
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Now send a signal when transient decision is scheduled
err = s.sendSignal(s.domainName, workflowExecution, "signalA", nil, identity)
s.Nil(err, "failed to send signal to execution")

// Drop decision task to cause a Decision Timeout
_, err = poller.pollAndProcessDecisionTask(true, true)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Print history after dropping decision
s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})

// Now process signal and complete workflow execution
_, err = poller.pollAndProcessDecisionTaskWithAttempt(true, false, false, false, int64(1))
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.Equal(1, signalCount)
s.True(workflowComplete)
}

func (s *integrationSuite) getHistory(domain string, execution *workflow.WorkflowExecution) []*workflow.HistoryEvent {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ Update_History_Loop:
}

// Start a timer for the decision task.
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(scheduleID, di.Attempt, di.DecisionTimeout)
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(di.ScheduleID, di.Attempt, di.DecisionTimeout)
timerTasks := []persistence.Task{timeOutTask}
defer e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)

Expand Down

0 comments on commit fdebcf4

Please sign in to comment.