From a602c751e6107dd2010db0511919f9a84abe75c3 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Tue, 12 Jun 2018 15:13:50 -0700 Subject: [PATCH] compact duplicate code, add more logging --- common/persistence/dataInterfaces.go | 16 +++++ service/history/failoverCheck.go | 70 ++++++++++++++++--- service/history/timerQueueActiveProcessor.go | 36 +++------- service/history/timerQueueProcessorBase.go | 2 +- service/history/timerQueueStandbyProcessor.go | 17 +---- .../history/transferQueueActiveProcessor.go | 32 +++------ .../history/transferQueueStandbyProcessor.go | 25 ++----- 7 files changed, 104 insertions(+), 94 deletions(-) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 54ed7d8ba58..495ea7b8be1 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1371,6 +1371,14 @@ func (t *TransferTaskInfo) GetTaskType() int { return t.TaskType } +// String returns string +func (t *TransferTaskInfo) String() string { + return fmt.Sprintf( + "{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}", + t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version, + ) +} + // GetTaskID returns the task ID for replication task func (t *ReplicationTaskInfo) GetTaskID() int64 { return t.TaskID @@ -1401,6 +1409,14 @@ func (t *TimerTaskInfo) GetTaskType() int { return t.TaskType } +// GetTaskType returns the task type for timer task +func (t *TimerTaskInfo) String() string { + return fmt.Sprintf( + "{DomainID: %v, WorkflowID: %v, RunID: %v, VisibilityTimestamp: %v, TaskID: %v, TaskType: %v, TimeoutType: %v, EventID: %v, ScheduleAttempt: %v, Version: %v.}", + t.DomainID, t.WorkflowID, t.RunID, t.VisibilityTimestamp, t.TaskID, t.TaskType, t.TimeoutType, t.EventID, t.ScheduleAttempt, t.Version, + ) +} + // NewHistoryEventBatch returns a new instance of HistoryEventBatch func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch { return &HistoryEventBatch{ diff --git a/service/history/failoverCheck.go b/service/history/failoverCheck.go index aa023dcd2c3..1aee9bd6b0d 100644 --- a/service/history/failoverCheck.go +++ b/service/history/failoverCheck.go @@ -27,27 +27,67 @@ import ( "github.com/uber/cadence/common/persistence" ) -// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful -func verifyTransferTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TransferTaskInfo) (bool, error) { - if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() { +// verifyActiveTask, will return true if task activeness check is successful +func verifyActiveTask(shard ShardContext, logger bark.Logger, taskDomainID string, task interface{}) (bool, error) { + currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() + domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID) + if err != nil { + // it is possible that the domain is deleted + // we should treat that domain as active + if _, ok := err.(*workflow.EntityNotExistsError); !ok { + logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID) + return false, err + } + logger.Warnf("Cannot find domainID: %v, default to process task: %v.", taskDomainID, task) return true, nil } + if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName { + // timer task does not belong to cluster name + logger.Debugf("DomainID: %v is not active, skip task: %v.", taskDomainID, task) + return false, nil + } + logger.Debugf("DomainID: %v is active, process task: %v.", taskDomainID, task) + return true, nil +} - // the first return value is whether this task is valid for further processing - domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID) +// verifyFailoverActiveTask, will return true if task activeness check is successful +func verifyFailoverActiveTask(logger bark.Logger, targetDomainID string, taskDomainID string, task interface{}) (bool, error) { + if targetDomainID == taskDomainID { + logger.Debugf("Failover DomainID: %v is active, process task: %v.", taskDomainID, task) + return true, nil + } + logger.Debugf("Failover DomainID: %v is not active, skip task: %v.", taskDomainID, task) + return false, nil +} + +// verifyStandbyTask, will return true if task standbyness check is successful +func verifyStandbyTask(shard ShardContext, logger bark.Logger, standbyCluster string, taskDomainID string, task interface{}) (bool, error) { + domainEntry, err := shard.GetDomainCache().GetDomainByID(taskDomainID) if err != nil { - return false, err + // it is possible that the domain is deleted + // we should treat that domain as not active + if _, ok := err.(*workflow.EntityNotExistsError); !ok { + logger.Warnf("Cannot find domainID: %v, err: %v.", taskDomainID) + return false, err + } + logger.Warnf("Cannot find domainID: %v, default to not process task: %v.", taskDomainID, task) + return false, nil } if !domainEntry.IsGlobalDomain() { - return true, nil - } else if version != task.GetVersion() { + // non global domain, timer task does not belong here + logger.Debugf("DomainID: %v is not global, skip task: %v.", taskDomainID, task) + return false, nil + } else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != standbyCluster { + // timer task does not belong here + logger.Debugf("DomainID: %v is not standby, skip task: %v.", taskDomainID, task) return false, nil } + logger.Debugf("DomainID: %v is standby, process task: %v.", taskDomainID, task) return true, nil } -// verifyTimerTaskVersion, when not encounter, will return false if true if failover version check is successful -func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64, task *persistence.TimerTaskInfo) (bool, error) { +// verifyTaskVersion, will return true if failover version check is successful +func verifyTaskVersion(shard ShardContext, logger bark.Logger, domainID string, version int64, taskVersion int64, task interface{}) (bool, error) { if !shard.GetService().GetClusterMetadata().IsGlobalDomainEnabled() { return true, nil } @@ -55,13 +95,17 @@ func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64, // the first return value is whether this task is valid for further processing domainEntry, err := shard.GetDomainCache().GetDomainByID(domainID) if err != nil { + logger.Debugf("Cannot find domainID: %v, err: %v.", domainID, task) return false, err } if !domainEntry.IsGlobalDomain() { + logger.Debugf("DomainID: %v is not active, task: %v version check pass", domainID, task) return true, nil - } else if version != task.GetVersion() { + } else if version != taskVersion { + logger.Debugf("DomainID: %v is active, task: %v version != target version: %v.", domainID, task, version) return false, nil } + logger.Debugf("DomainID: %v is active, task: %v version == target version: %v.", domainID, task, version) return true, nil } @@ -72,6 +116,8 @@ func loadMutableStateForTransferTask(context *workflowExecutionContext, transfer if err != nil { if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. + logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing transfer taskID: %v, eventID: %v", + context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), transferTask.TaskID, transferTask.ScheduleID) return nil, nil } return nil, err @@ -110,6 +156,8 @@ func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask * if err != nil { if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. + logger.Debugf("Cannot find execution: domainID: %v, workflowID: %v, runID: %v when processing timer timestamp %v, taskID: %v, eventID: %v", + context.domainID, context.workflowExecution.GetWorkflowId(), context.workflowExecution.GetRunId(), timerTask.VisibilityTimestamp, timerTask.TaskID, timerTask.EventID) return nil, nil } return nil, err diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 8fb1395e2db..00bdc0a2ac7 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -51,34 +51,21 @@ type ( ) func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEngineImpl, matchingClient matching.Client, logger bark.Logger) *timerQueueActiveProcessorImpl { - clusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() + currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() timeNow := func() time.Time { - return shard.GetCurrentTime(clusterName) + return shard.GetCurrentTime(currentClusterName) } logger = logger.WithFields(bark.Fields{ - logging.TagWorkflowCluster: clusterName, + logging.TagWorkflowCluster: currentClusterName, }) timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { - domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) - if err != nil { - // it is possible that domain is deleted, - // we should treat that domain being active - if _, ok := err.(*workflow.EntityNotExistsError); !ok { - return false, err - } - return true, nil - } - if domainEntry.IsGlobalDomain() && clusterName != domainEntry.GetReplicationConfig().ActiveClusterName { - // timer task does not belong to cluster name - return false, nil - } - return true, nil + return verifyActiveTask(shard, logger, timer.DomainID, timer) } timerGate := NewLocalTimerGate() // this will trigger a timer gate fire event immediately timerGate.Update(time.Time{}) - timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, clusterName, logger) + timerQueueAckMgr := newTimerQueueAckMgr(shard, historyService.metricsClient, currentClusterName, logger) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, @@ -87,7 +74,7 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng logger: logger, matchingClient: matchingClient, metricsClient: historyService.metricsClient, - currentClusterName: clusterName, + currentClusterName: currentClusterName, timerGate: timerGate, timerQueueProcessorBase: newTimerQueueProcessorBase(shard, historyService, timerQueueAckMgr, timeNow, logger), timerQueueAckMgr: timerQueueAckMgr, @@ -109,10 +96,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE logging.TagFailover: "from: " + standbyClusterName, }) timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { - if timer.DomainID == domainID { - return true, nil - } - return false, nil + return verifyFailoverActiveTask(logger, domainID, timer.DomainID, timer) } timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, maxLevel, logger) @@ -465,7 +449,7 @@ Update_History_Loop: logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, scheduleID) return nil } - ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, di.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, di.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -539,7 +523,7 @@ func (t *timerQueueActiveProcessorImpl) processRetryTimer(task *persistence.Time if !running || task.ScheduleAttempt < int64(ai.Attempt) { return nil } - ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, ai.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, ai.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -612,7 +596,7 @@ Update_History_Loop: // do version check for global domain task if msBuilder.GetReplicationState() != nil { - ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetReplicationState().StartVersion, task) + ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetReplicationState().StartVersion, task.Version, task) if err != nil { return err } else if !ok { diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 0c7ba2e9df4..045e308a545 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -386,7 +386,7 @@ func (t *timerQueueProcessorBase) processDeleteHistoryEvent(task *persistence.Ti } else if msBuilder == nil { return nil } - ok, err := verifyTimerTaskVersion(t.shard, task.DomainID, msBuilder.GetLastWriteVersion(), task) + ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, msBuilder.GetLastWriteVersion(), task.Version, task) if err != nil { return err } else if !ok { diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index c1cf005e301..91dff32bfde 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -55,18 +55,7 @@ func newTimerQueueStandbyProcessor(shard ShardContext, historyService *historyEn logging.TagWorkflowCluster: clusterName, }) timerTaskFilter := func(timer *persistence.TimerTaskInfo) (bool, error) { - domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID) - if err != nil { - return false, err - } - if !domainEntry.IsGlobalDomain() { - // non global domain, timer task does not belong here - return false, nil - } else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != clusterName { - // timer task does not belong here - return false, nil - } - return true, nil + return verifyStandbyTask(shard, logger, clusterName, timer.DomainID, timer) } timerGate := NewRemoteTimerGate() @@ -272,7 +261,7 @@ func (t *timerQueueStandbyProcessorImpl) processDecisionTimeout(timerTask *persi return nil } - ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, di.Version, timerTask) + ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, di.Version, timerTask.Version, timerTask) if err != nil { return err } else if !ok { @@ -298,7 +287,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowTimeout(timerTask *persi // we do not need to notity new timer to base, since if there is no new event being replicated // checking again if the timer can be completed is meaningless - ok, err := verifyTimerTaskVersion(t.shard, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask) + ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, msBuilder.GetStartVersion(), timerTask.Version, timerTask) if err != nil { return err } else if !ok { diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 21225018dcf..015b224ab64 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -84,20 +84,7 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history logging.TagWorkflowCluster: currentClusterName, }) transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) { - domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID) - if err != nil { - // it is possible that the domain is deleted - // we should treat that domain as active - if _, ok := err.(*workflow.EntityNotExistsError); !ok { - return false, err - } - return true, nil - } - if domainEntry.IsGlobalDomain() && currentClusterName != domainEntry.GetReplicationConfig().ActiveClusterName { - // timer task does not belong to cluster name - return false, nil - } - return true, nil + return verifyActiveTask(shard, logger, task.DomainID, task) } maxReadAckLevel := func() int64 { return shard.GetTransferMaxReadLevel() @@ -150,10 +137,7 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo logging.TagFailover: "from: " + standbyClusterName, }) transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) { - if task.DomainID == domainID { - return true, nil - } - return false, nil + return verifyFailoverActiveTask(logger, domainID, task.DomainID, task) } maxReadAckLevel := func() int64 { return maxLevel // this is a const @@ -282,7 +266,7 @@ func (t *transferQueueActiveProcessorImpl) processActivityTask(task *persistence logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID) return } - ok, err := verifyTransferTaskVersion(t.shard, domainID, ai.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ai.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -340,7 +324,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TaskTypeDecisionTimeout, task.TaskID, task.ScheduleID) return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, di.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, di.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -411,7 +395,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, msBuilder.GetLastWriteVersion(), task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, msBuilder.GetLastWriteVersion(), task.Version, task) if err != nil { return err } else if !ok { @@ -525,7 +509,7 @@ func (t *transferQueueActiveProcessorImpl) processCancelExecution(task *persiste logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID) return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, ri.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ri.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -648,7 +632,7 @@ func (t *transferQueueActiveProcessorImpl) processSignalExecution(task *persiste logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeCancelExecution, task.TaskID, task.ScheduleID) return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, si.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, si.Version, task.Version, task) if err != nil { return err } else if !ok { @@ -802,7 +786,7 @@ func (t *transferQueueActiveProcessorImpl) processStartChildExecution(task *pers logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeStartChildExecution, task.TaskID, task.ScheduleID) return nil } - ok, err := verifyTransferTaskVersion(t.shard, domainID, ci.Version, task) + ok, err := verifyTaskVersion(t.shard, t.logger, domainID, ci.Version, task.Version, task) if err != nil { return err } else if !ok { diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 78811f81a39..932a13b2dc7 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -65,18 +65,7 @@ func newTransferQueueStandbyProcessor(clusterName string, shard ShardContext, hi }) transferTaskFilter := func(task *persistence.TransferTaskInfo) (bool, error) { - domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID) - if err != nil { - return false, err - } - if !domainEntry.IsGlobalDomain() { - // non global domain, timer task does not belong here - return false, nil - } else if domainEntry.IsGlobalDomain() && domainEntry.GetReplicationConfig().ActiveClusterName != clusterName { - // timer task does not belong here - return false, nil - } - return true, nil + return verifyStandbyTask(shard, logger, clusterName, task.DomainID, task) } maxReadAckLevel := func() int64 { return shard.GetTransferMaxReadLevel() @@ -176,7 +165,7 @@ func (t *transferQueueStandbyProcessorImpl) processActivityTask(transferTask *pe if !isPending { return nil } - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, activityInfo.Version, transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, activityInfo.Version, transferTask.Version, transferTask) if err != nil { return err } else if !ok { @@ -205,7 +194,7 @@ func (t *transferQueueStandbyProcessorImpl) processDecisionTask(transferTask *pe } return nil } - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, decisionInfo.Version, transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, decisionInfo.Version, transferTask.Version, transferTask) if err != nil { return err } else if !ok { @@ -232,7 +221,7 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(transferTask * processTaskIfClosed := true return t.processTransfer(processTaskIfClosed, transferTask, func(msBuilder mutableState) error { - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, msBuilder.GetLastWriteVersion(), transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, msBuilder.GetLastWriteVersion(), transferTask.Version, transferTask) if err != nil { return err } else if !ok { @@ -271,7 +260,7 @@ func (t *transferQueueStandbyProcessorImpl) processCancelExecution(transferTask if !isPending { return nil } - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, requestCancelInfo.Version, transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, requestCancelInfo.Version, transferTask.Version, transferTask) if err != nil { return err } else if !ok { @@ -294,7 +283,7 @@ func (t *transferQueueStandbyProcessorImpl) processSignalExecution(transferTask if !isPending { return nil } - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, signalInfo.Version, transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, signalInfo.Version, transferTask.Version, transferTask) if err != nil { return err } else if !ok { @@ -317,7 +306,7 @@ func (t *transferQueueStandbyProcessorImpl) processStartChildExecution(transferT if !isPending { return nil } - ok, err := verifyTransferTaskVersion(t.shard, transferTask.DomainID, childWorkflowInfo.Version, transferTask) + ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, childWorkflowInfo.Version, transferTask.Version, transferTask) if err != nil { return err } else if !ok {