diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 60110c59f2d..91cd1fd91a3 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -182,6 +182,11 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE return r.ApplyOtherEventsMissingMutableState(domainID, request.WorkflowExecution.GetWorkflowId(), firstEvent.GetVersion(), logger) } + err = r.FlushBuffer(context, msBuilder, logger) + if err != nil { + r.logError(logger, "Fail to pre-flush buffer.", err) + return err + } msBuilder, err = r.ApplyOtherEventsVersionChecking(context, msBuilder, request, logger) if err != nil || msBuilder == nil { return err @@ -194,18 +199,20 @@ func (r *historyReplicator) ApplyStartEvent(context *workflowExecutionContext, r msBuilder := r.getNewMutableState(request.GetVersion(), logger) err := r.ApplyReplicationTask(context, msBuilder, request, logger) if err != nil { - logger.Debugf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(), - request.GetFirstEventId(), err) + r.logError(logger, "Fail to Apply Replication task.", err) } return err } func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string, workflowID string, incomingVersion int64, logger bark.Logger) error { // we need to check the current workflow execution - currentRunID, currentLastWriteVersion, _, err := r.getCurrentWorkflowInfo(domainID, workflowID) + _, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID) if err != nil { return err } + currentRunID := currentMutableState.GetExecutionInfo().RunID + currentLastWriteVersion := currentMutableState.GetLastWriteVersion() + currentRelease(nil) // we can also use the start version if currentLastWriteVersion > incomingVersion { @@ -217,6 +224,12 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(domainID string, // currentLastWriteVersion <= incomingVersion logger.Debugf("Retrying replication task. Current RunID: %v, Current LastWriteVersion: %v, Incoming Version: %v.", currentRunID, currentLastWriteVersion, incomingVersion) + + // try flush the current workflow buffer + err = r.flushCurrentWorkflowBuffer(domainID, workflowID, logger) + if err != nil { + return err + } return ErrRetryEntityNotExists } @@ -312,7 +325,7 @@ func (r *historyReplicator) ApplyOtherEvents(context *workflowExecutionContext, r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.BufferedReplicationTaskCounter) err = msBuilder.BufferReplicationTask(request) if err != nil { - logger.Errorf("Failed to buffer out of order replication task. Err: %v", err) + r.logError(logger, "Failed to buffer out of order replication task.", err) return errors.New("failed to add buffered replication task") } @@ -329,28 +342,17 @@ func (r *historyReplicator) ApplyOtherEvents(context *workflowExecutionContext, return context.updateHelper(nil, nil, nil, false, sourceCluster, lastWriteVersion, transactionID) } - // apply the events normally - // First check if there are events which needs to be flushed before applying the update - err = r.FlushBuffer(context, msBuilder, request, logger) - if err != nil { - logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", - msBuilder.GetNextEventID(), firstEventID, err) - return err - } - // Apply the replication task err = r.ApplyReplicationTask(context, msBuilder, request, logger) if err != nil { - logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", - msBuilder.GetNextEventID(), firstEventID, err) + r.logError(logger, "Fail to Apply Replication task.", err) return err } // Flush buffered replication tasks after applying the update - err = r.FlushBuffer(context, msBuilder, request, logger) + err = r.FlushBuffer(context, msBuilder, logger) if err != nil { - logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", - msBuilder.GetNextEventID(), firstEventID, err) + r.logError(logger, "Fail to flush buffer.", err) } return err @@ -412,8 +414,12 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte return err } -func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBuilder mutableState, - request *h.ReplicateEventsRequest, logger bark.Logger) error { +func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBuilder mutableState, logger bark.Logger) error { + domainID := msBuilder.GetExecutionInfo().DomainID + execution := shared.WorkflowExecution{ + WorkflowId: common.StringPtr(msBuilder.GetExecutionInfo().WorkflowID), + RunId: common.StringPtr(msBuilder.GetExecutionInfo().RunID), + } // Keep on applying on applying buffered replication tasks in a loop for msBuilder.HasBufferedReplicationTasks() { @@ -431,8 +437,8 @@ func (r *historyReplicator) FlushBuffer(context *workflowExecutionContext, msBui sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(bt.Version) req := &h.ReplicateEventsRequest{ SourceCluster: common.StringPtr(sourceCluster), - DomainUUID: request.DomainUUID, - WorkflowExecution: request.WorkflowExecution, + DomainUUID: common.StringPtr(domainID), + WorkflowExecution: &execution, FirstEventId: common.Int64Ptr(bt.FirstEventID), NextEventId: common.Int64Ptr(bt.NextEventID), Version: common.Int64Ptr(bt.Version), @@ -602,6 +608,10 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC return nil } if currentStartVersion == incomingVersion { + err = r.flushCurrentWorkflowBuffer(domainID, execution.GetWorkflowId(), logger) + if err != nil { + return err + } return ErrRetryExistingWorkflow } @@ -626,6 +636,23 @@ func (r *historyReplicator) replicateWorkflowStarted(context *workflowExecutionC return createWorkflow(isBrandNew, currentRunID) } +func (r *historyReplicator) flushCurrentWorkflowBuffer(domainID string, workflowID string, logger bark.Logger) error { + currentContext, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID) + if err != nil { + return err + } + // since this new workflow cannnot make progress due to existing workflow being open + // try flush the existing workflow's buffer see if we can make it move forward + // First check if there are events which needs to be flushed before applying the update + err = r.FlushBuffer(currentContext, currentMutableState, logger) + currentRelease(err) + if err != nil { + r.logError(logger, "Fail to flush buffer for current workflow.", err) + return err + } + return nil +} + func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder mutableState) (retError error) { // this function aims to solve the edge case when this workflow, when going through // reset, has already started a next generation (continue as new-ed workflow) @@ -647,11 +674,14 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(msBuilder m // and enounter a dead lock domainID := msBuilder.GetExecutionInfo().DomainID workflowID := msBuilder.GetExecutionInfo().WorkflowID - currentRunID, _, closeStatus, err := r.getCurrentWorkflowInfo(domainID, workflowID) + _, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(domainID, workflowID) if err != nil { return err } - if closeStatus != persistence.WorkflowCloseStatusNone { + currentRunID := currentMutableState.GetExecutionInfo().RunID + currentCloseStatus := currentMutableState.GetExecutionInfo().CloseStatus + currentRelease(nil) + if currentCloseStatus != persistence.WorkflowCloseStatusNone { // current workflow finished // note, it is impassoble that a current workflow ends with continue as new as close status return nil @@ -729,7 +759,8 @@ func (r *historyReplicator) Serialize(history *shared.History) (*persistence.Ser return h, nil } -func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) { +// func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) { +func (r *historyReplicator) getCurrentWorkflowMutableState(domainID string, workflowID string) (*workflowExecutionContext, mutableState, releaseWorkflowExecutionFunc, error) { // we need to check the current workflow execution context, release, err := r.historyCache.getOrCreateWorkflowExecution( domainID, @@ -737,19 +768,16 @@ func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID s shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)}, ) if err != nil { - return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err + return nil, nil, nil, err } - defer func() { release(retError) }() msBuilder, err := context.loadWorkflowExecution() if err != nil { // no matter what error happen, we need to retry - return "", common.EmptyVersion, persistence.WorkflowCloseStatusNone, err + release(err) + return nil, nil, nil, err } - lastWriteVersion = msBuilder.GetLastWriteVersion() - runID = msBuilder.GetExecutionInfo().RunID - closeStatus = msBuilder.GetExecutionInfo().CloseStatus - return + return context, msBuilder, release, nil } func (r *historyReplicator) terminateWorkflow(domainID string, workflowID string, runID string) error { @@ -779,3 +807,9 @@ func (r *historyReplicator) notify(clusterName string, now time.Time, transferTa r.historyEngine.txProcessor.NotifyNewTask(clusterName, now, transferTasks) r.historyEngine.timerProcessor.NotifyNewTimers(clusterName, now, timerTasks) } + +func (r *historyReplicator) logError(logger bark.Logger, msg string, err error) { + logger.WithFields(bark.Fields{ + logging.TagErr: err, + }).Error(msg) +} diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index cb616cbe150..cbe3a36ce26 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -1422,6 +1422,26 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc // the test above already assert the create workflow request, so here jsut use anyting s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, errRet).Once() + currentContext := newWorkflowExecutionContext(domainID, shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(currentRunID), + }, s.mockShard, s.mockExecutionMgr, s.logger) + currentMsBuilder := &mockMutableState{} + currentMsBuilder.On("HasBufferedReplicationTasks").Return(false) + // return empty since not actually used + currentMsBuilder.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{}) + // return nil to bypass updating the version, since this test does not test that + currentMsBuilder.On("GetReplicationState").Return(nil) + currentContext.msBuilder = currentMsBuilder + s.historyReplicator.historyCache.PutIfNotExist(currentRunID, currentContext) + s.mockExecutionMgr.On("GetCurrentExecution", &persistence.GetCurrentExecutionRequest{ + DomainID: domainID, + WorkflowID: workflowID, + }).Return(&persistence.GetCurrentExecutionResponse{ + RunID: currentRunID, + // other attributes are not used + }, nil) + err := s.historyReplicator.replicateWorkflowStarted(context, msBuilder, di, sourceCluster, history, sBuilder, s.logger) s.Equal(ErrRetryExistingWorkflow, err) s.Equal(1, len(transferTasks)) diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index ba504bf85ec..64810a427fc 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -392,7 +392,7 @@ func (t *timerQueueProcessorBase) internalProcessor() error { } func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*persistence.TimerTaskInfo, error) { - if !t.rateLimiter.Consume(1, t.shard.GetConfig().TransferProcessorMaxPollInterval()) { + if !t.rateLimiter.Consume(1, t.shard.GetConfig().TimerProcessorMaxPollInterval()) { t.notifyNewTimer(time.Time{}) // re-enqueue the event return nil, nil }