From 7ad6d5bc0454179285ff5543e77757020b901d68 Mon Sep 17 00:00:00 2001 From: yiminc Date: Thu, 7 Dec 2017 09:54:00 -0800 Subject: [PATCH] fix buffered events bug (#449) --- service/history/mutableStateBuilder.go | 31 ++++++++++++++++---- service/history/timerQueueProcessor2_test.go | 3 +- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index a6df450006a..4e6369c682b 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -235,6 +235,7 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates, e.updateRequestCancelInfos = []*persistence.RequestCancelInfo{} e.deleteRequestCancelInfo = nil e.continueAsNew = nil + e.clearBufferedEvents = false if e.updateBufferedEvents != nil { e.bufferedEvents = append(e.bufferedEvents, e.updateBufferedEvents) e.updateBufferedEvents = nil @@ -328,19 +329,39 @@ func (e *mutableStateBuilder) isStickyTaskListEnabled() bool { func (e *mutableStateBuilder) createNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent { eventID := e.executionInfo.NextEventID - if e.HasInFlightDecisionTask() && - eventType != workflow.EventTypeDecisionTaskCompleted && - eventType != workflow.EventTypeDecisionTaskFailed && - eventType != workflow.EventTypeDecisionTaskTimedOut { + if e.shouldBufferEvent(eventType) { eventID = bufferedEventID } else { - // only increase NextEventID if there is no in-flight decision task + // only increase NextEventID if event is not buffered e.executionInfo.NextEventID++ } return e.createNewHistoryEventWithTimestamp(eventID, eventType, time.Now().UnixNano()) } +func (e *mutableStateBuilder) shouldBufferEvent(eventType workflow.EventType) bool { + if !e.HasInFlightDecisionTask() { + // do not buffer event if there is no in-flight decision + return false + } + + switch eventType { + case workflow.EventTypeDecisionTaskCompleted, + workflow.EventTypeDecisionTaskFailed, + workflow.EventTypeDecisionTaskTimedOut, + workflow.EventTypeWorkflowExecutionCompleted, + workflow.EventTypeWorkflowExecutionFailed, + workflow.EventTypeWorkflowExecutionTimedOut, + workflow.EventTypeWorkflowExecutionTerminated, + workflow.EventTypeWorkflowExecutionContinuedAsNew, + workflow.EventTypeWorkflowExecutionCanceled: + // do not buffer event if it is any type of close decision or close workflow + return false + } + + return true +} + func (e *mutableStateBuilder) createNewHistoryEventWithTimestamp(eventID int64, eventType workflow.EventType, timestamp int64) *workflow.HistoryEvent { ts := common.Int64Ptr(timestamp) diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index afe8f20a495..d1e0a11de7b 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -227,9 +227,8 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() { s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once() s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once() + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) { - request := arguments.Get(0).(*persistence.UpdateWorkflowExecutionRequest) - s.NotNil(request.NewBufferedEvents) // Done. waitCh <- struct{}{} }).Once()