Skip to content

Commit

Permalink
fix buffered events bug (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Dec 7, 2017
1 parent f7e2ff1 commit 7ad6d5b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
31 changes: 26 additions & 5 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,
e.updateRequestCancelInfos = []*persistence.RequestCancelInfo{}
e.deleteRequestCancelInfo = nil
e.continueAsNew = nil
e.clearBufferedEvents = false
if e.updateBufferedEvents != nil {
e.bufferedEvents = append(e.bufferedEvents, e.updateBufferedEvents)
e.updateBufferedEvents = nil
Expand Down Expand Up @@ -328,19 +329,39 @@ func (e *mutableStateBuilder) isStickyTaskListEnabled() bool {

func (e *mutableStateBuilder) createNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent {
eventID := e.executionInfo.NextEventID
if e.HasInFlightDecisionTask() &&
eventType != workflow.EventTypeDecisionTaskCompleted &&
eventType != workflow.EventTypeDecisionTaskFailed &&
eventType != workflow.EventTypeDecisionTaskTimedOut {
if e.shouldBufferEvent(eventType) {
eventID = bufferedEventID
} else {
// only increase NextEventID if there is no in-flight decision task
// only increase NextEventID if event is not buffered
e.executionInfo.NextEventID++
}

return e.createNewHistoryEventWithTimestamp(eventID, eventType, time.Now().UnixNano())
}

func (e *mutableStateBuilder) shouldBufferEvent(eventType workflow.EventType) bool {
if !e.HasInFlightDecisionTask() {
// do not buffer event if there is no in-flight decision
return false
}

switch eventType {
case workflow.EventTypeDecisionTaskCompleted,
workflow.EventTypeDecisionTaskFailed,
workflow.EventTypeDecisionTaskTimedOut,
workflow.EventTypeWorkflowExecutionCompleted,
workflow.EventTypeWorkflowExecutionFailed,
workflow.EventTypeWorkflowExecutionTimedOut,
workflow.EventTypeWorkflowExecutionTerminated,
workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeWorkflowExecutionCanceled:
// do not buffer event if it is any type of close decision or close workflow
return false
}

return true
}

func (e *mutableStateBuilder) createNewHistoryEventWithTimestamp(eventID int64, eventType workflow.EventType,
timestamp int64) *workflow.HistoryEvent {
ts := common.Int64Ptr(timestamp)
Expand Down
3 changes: 1 addition & 2 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once()
s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) {
request := arguments.Get(0).(*persistence.UpdateWorkflowExecutionRequest)
s.NotNil(request.NewBufferedEvents)
// Done.
waitCh <- struct{}{}
}).Once()
Expand Down

0 comments on commit 7ad6d5b

Please sign in to comment.