Skip to content

Commit

Permalink
Handle service deployment causing buffer events not flushed (#940)
Browse files Browse the repository at this point in the history
* handle service deployment causing buffer events not flushed
  • Loading branch information
wxing1292 authored Jul 7, 2018
1 parent fcc9b0a commit 3c9a409
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 33 deletions.
98 changes: 66 additions & 32 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE
return r.ApplyOtherEventsMissingMutableState(domainID, request.WorkflowExecution.GetWorkflowId(), firstEvent.GetVersion(), logger)
}

err = r.FlushBuffer(context, msBuilder, logger)
if err != nil {
r.logError(logger, "Fail to pre-flush buffer.", err)
return err
}
msBuilder, err = r.ApplyOtherEventsVersionChecking(context, msBuilder, request, logger)
if err != nil || msBuilder == nil {
return err
Expand All @@ -194,18 +199,20 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r
msBuilder := r.getNewMutableState(request.GetVersion(), logger)
err := r.ApplyReplicationTask(context, msBuilder, request, logger)
if err != nil {
logger.Debugf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
request.GetFirstEventId(), err)
r.logError(logger, "Fail to Apply Replication task.", err)
}
return err
}

func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string, workflowID string, incomingVersion int64, logger bark.Logger) error {
// we need to check the current workflow execution
currentRunID, currentLastWriteVersion, _, err := r.getCurrentWorkflowInfo(domainID, workflowID)
_, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID)
if err != nil {
return err
}
currentRunID := currentMutableState.GetExecutionInfo().RunID
currentLastWriteVersion := currentMutableState.GetLastWriteVersion()
currentRelease(nil)

// we can also use the start version
if currentLastWriteVersion > incomingVersion {
Expand All @@ -217,6 +224,12 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string,
// currentLastWriteVersion <= incomingVersion
logger.Debugf("Retrying replication task. Current RunID: %v, Current LastWriteVersion: %v, Incoming Version: %v.",
currentRunID, currentLastWriteVersion, incomingVersion)

// try flush the current workflow buffer
err = r.flushCurrentWorkflowBuffer(domainID, workflowID, logger)
if err != nil {
return err
}
return ErrRetryEntityNotExists
}

Expand Down Expand Up @@ -312,7 +325,7 @@ func (r *historyReplicator) ApplyOtherEvents(context *workflowExecutionContext,
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.BufferedReplicationTaskCounter)
err = msBuilder.BufferReplicationTask(request)
if err != nil {
logger.Errorf("Failed to buffer out of order replication task. Err: %v", err)
r.logError(logger, "Failed to buffer out of order replication task.", err)
return errors.New("failed to add buffered replication task")
}

Expand All @@ -329,28 +342,17 @@ func (r *historyReplicator) ApplyOtherEvents(context *workflowExecutionContext,
return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID)
}

// apply the events normally
// First check if there are events which needs to be flushed before applying the update
err = r.FlushBuffer(context, msBuilder, request, logger)
if err != nil {
logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v",
msBuilder.GetNextEventID(), firstEventID, err)
return err
}

// Apply the replication task
err = r.ApplyReplicationTask(context, msBuilder, request, logger)
if err != nil {
logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v",
msBuilder.GetNextEventID(), firstEventID, err)
r.logError(logger, "Fail to Apply Replication task.", err)
return err
}

// Flush buffered replication tasks after applying the update
err = r.FlushBuffer(context, msBuilder, request, logger)
err = r.FlushBuffer(context, msBuilder, logger)
if err != nil {
logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v",
msBuilder.GetNextEventID(), firstEventID, err)
r.logError(logger, "Fail to flush buffer.", err)
}

return err
Expand Down Expand Up @@ -412,8 +414,12 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
return err
}

func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBuilder mutableState,
request *h.ReplicateEventsRequest, logger bark.Logger) error {
func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBuilder mutableState, logger bark.Logger) error {
domainID := msBuilder.GetExecutionInfo().DomainID
execution := shared.WorkflowExecution{
WorkflowId: common.StringPtr(msBuilder.GetExecutionInfo().WorkflowID),
RunId: common.StringPtr(msBuilder.GetExecutionInfo().RunID),
}

// Keep on applying on applying buffered replication tasks in a loop
for msBuilder.HasBufferedReplicationTasks() {
Expand All @@ -431,8 +437,8 @@ func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBui
sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(bt.Version)
req := &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(sourceCluster),
DomainUUID: request.DomainUUID,
WorkflowExecution: request.WorkflowExecution,
DomainUUID: common.StringPtr(domainID),
WorkflowExecution: &execution,
FirstEventId: common.Int64Ptr(bt.FirstEventID),
NextEventId: common.Int64Ptr(bt.NextEventID),
Version: common.Int64Ptr(bt.Version),
Expand Down Expand Up @@ -602,6 +608,10 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC
return nil
}
if currentStartVersion == incomingVersion {
err = r.flushCurrentWorkflowBuffer(domainID, execution.GetWorkflowId(), logger)
if err != nil {
return err
}
return ErrRetryExistingWorkflow
}

Expand All @@ -626,6 +636,23 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC
return createWorkflow(isBrandNew, currentRunID)
}

func (r *historyReplicator) flushCurrentWorkflowBuffer(domainID string, workflowID string, logger bark.Logger) error {
currentContext, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID)
if err != nil {
return err
}
// since this new workflow cannnot make progress due to existing workflow being open
// try flush the existing workflow's buffer see if we can make it move forward
// First check if there are events which needs to be flushed before applying the update
err = r.FlushBuffer(currentContext, currentMutableState, logger)
currentRelease(err)
if err != nil {
r.logError(logger, "Fail to flush buffer for current workflow.", err)
return err
}
return nil
}

func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder mutableState) (retError error) {
// this function aims to solve the edge case when this workflow, when going through
// reset, has already started a next generation (continue as new-ed workflow)
Expand All @@ -647,11 +674,14 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder m
// and enounter a dead lock
domainID := msBuilder.GetExecutionInfo().DomainID
workflowID := msBuilder.GetExecutionInfo().WorkflowID
currentRunID, _, closeStatus, err := r.getCurrentWorkflowInfo(domainID, workflowID)
_, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID)
if err != nil {
return err
}
if closeStatus != persistence.WorkflowCloseStatusNone {
currentRunID := currentMutableState.GetExecutionInfo().RunID
currentCloseStatus := currentMutableState.GetExecutionInfo().CloseStatus
currentRelease(nil)
if currentCloseStatus != persistence.WorkflowCloseStatusNone {
// current workflow finished
// note, it is impassoble that a current workflow ends with continue as new as close status
return nil
Expand Down Expand Up @@ -729,27 +759,25 @@ func (r *historyReplicator) Serialize(history *shared.History) (*persistence.Ser
return h, nil
}

func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) {
// func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) {
func (r *historyReplicator) getCurrentWorkflowMutableState(domainID string, workflowID string) (*workflowExecutionContext, mutableState, releaseWorkflowExecutionFunc, error) {
// we need to check the current workflow execution
context, release, err := r.historyCache.getOrCreateWorkflowExecution(
domainID,
// only use the workflow ID, to get the current running one
shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)},
)
if err != nil {
return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err
return nil, nil, nil, err
}
defer func() { release(retError) }()

msBuilder, err := context.loadWorkflowExecution()
if err != nil {
// no matter what error happen, we need to retry
return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err
release(err)
return nil, nil, nil, err
}
lastWriteVersion = msBuilder.GetLastWriteVersion()
runID = msBuilder.GetExecutionInfo().RunID
closeStatus = msBuilder.GetExecutionInfo().CloseStatus
return
return context, msBuilder, release, nil
}

func (r *historyReplicator) terminateWorkflow(domainID string, workflowID string, runID string) error {
Expand Down Expand Up @@ -779,3 +807,9 @@ func (r *historyReplicator) notify(clusterName string, now time.Time, transferTa
r.historyEngine.txProcessor.NotifyNewTask(clusterName, now, transferTasks)
r.historyEngine.timerProcessor.NotifyNewTimers(clusterName, now, timerTasks)
}

func (r *historyReplicator) logError(logger bark.Logger, msg string, err error) {
logger.WithFields(bark.Fields{
logging.TagErr: err,
}).Error(msg)
}
20 changes: 20 additions & 0 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,26 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc
// the test above already assert the create workflow request, so here jsut use anyting
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, errRet).Once()

currentContext := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(currentRunID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
currentMsBuilder := &mockMutableState{}
currentMsBuilder.On("HasBufferedReplicationTasks").Return(false)
// return empty since not actually used
currentMsBuilder.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{})
// return nil to bypass updating the version, since this test does not test that
currentMsBuilder.On("GetReplicationState").Return(nil)
currentContext.msBuilder = currentMsBuilder
s.historyReplicator.historyCache.PutIfNotExist(currentRunID, currentContext)
s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
WorkflowID: workflowID,
}).Return(&persistence.GetCurrentExecutionResponse{
RunID: currentRunID,
// other attributes are not used
}, nil)

err := s.historyReplicator.replicateWorkflowStarted(context, msBuilder, di, sourceCluster, history, sBuilder, s.logger)
s.Equal(ErrRetryExistingWorkflow, err)
s.Equal(1, len(transferTasks))
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (t *timerQueueProcessorBase) internalProcessor() error {
}

func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerTaskInfo, error) {
if !t.rateLimiter.Consume(1, t.shard.GetConfig().TransferProcessorMaxPollInterval()) {
if !t.rateLimiter.Consume(1, t.shard.GetConfig().TimerProcessorMaxPollInterval()) {
t.notifyNewTimer(time.Time{}) // re-enqueue the event
return nil, nil
}
Expand Down

0 comments on commit 3c9a409

Please sign in to comment.