diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 72c4117195b..4fc638b6eab 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -771,6 +771,11 @@ func (entry *DomainCacheEntry) GetNotificationVersion() int64 { return entry.notificationVersion } +// GetFailoverEndTime return the failover end time +func (entry *DomainCacheEntry) GetFailoverEndTime() *int64 { + return entry.failoverEndTime +} + // IsDomainActive return whether the domain is active, i.e. non global domain or global domain which active cluster is the current cluster func (entry *DomainCacheEntry) IsDomainActive() bool { if !entry.isGlobalDomain { diff --git a/service/history/queue/transfer_queue_processor_base_test.go b/service/history/queue/transfer_queue_processor_base_test.go index 048e0394679..d2e87ca36ad 100644 --- a/service/history/queue/transfer_queue_processor_base_test.go +++ b/service/history/queue/transfer_queue_processor_base_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" diff --git a/service/history/task/task.go b/service/history/task/task.go index 9e574bce40e..262248e543e 100644 --- a/service/history/task/task.go +++ b/service/history/task/task.go @@ -47,6 +47,8 @@ var ( ErrTaskDiscarded = errors.New("passive task pending for too long") // ErrTaskRetry is the error indicating that the timer / transfer task should be retried. ErrTaskRetry = errors.New("passive task should retry due to condition in mutable state is not met") + // ErrTaskRedispatch is the error indicating that the task should be re-dispatch + ErrTaskRedispatch = errors.New("redispatch the task while the domain is pending-acitve") ) type ( @@ -305,6 +307,10 @@ func (t *taskBase) HandleErr( func (t *taskBase) RetryErr( err error, ) bool { + if err == ErrTaskRedispatch { + return false + } + return true } diff --git a/service/history/taskAllocator.go b/service/history/taskAllocator.go index 899eb1499ce..1f2af66456d 100644 --- a/service/history/taskAllocator.go +++ b/service/history/taskAllocator.go @@ -28,6 +28,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/service/history/shard" + htask "github.com/uber/cadence/service/history/task" ) type ( @@ -80,6 +81,15 @@ func (t *taskAllocatorImpl) verifyActiveTask(taskDomainID string, task interface t.logger.Debug("Domain is not active, skip task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task)) return false, nil } + + if err := t.checkDomainPendingActive( + domainEntry, + taskDomainID, + task, + ); err != nil { + return false, err + } + t.logger.Debug("Domain is active, process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task)) return true, nil } @@ -88,6 +98,28 @@ func (t *taskAllocatorImpl) verifyActiveTask(taskDomainID string, task interface func (t *taskAllocatorImpl) verifyFailoverActiveTask(targetDomainIDs map[string]struct{}, taskDomainID string, task interface{}) (bool, error) { _, ok := targetDomainIDs[taskDomainID] if ok { + t.locker.RLock() + defer t.locker.RUnlock() + + domainEntry, err := t.domainCache.GetDomainByID(taskDomainID) + if err != nil { + // it is possible that the domain is deleted + // we should treat that domain as not active + if _, ok := err.(*workflow.EntityNotExistsError); !ok { + t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID)) + return false, err + } + t.logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task)) + return false, nil + } + if err := t.checkDomainPendingActive( + domainEntry, + taskDomainID, + task, + ); err != nil { + return false, err + } + t.logger.Debug("Failover Domain is active, process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task)) return true, nil } @@ -124,6 +156,20 @@ func (t *taskAllocatorImpl) verifyStandbyTask(standbyCluster string, taskDomainI return true, nil } +func (t *taskAllocatorImpl) checkDomainPendingActive( + domainEntry *cache.DomainCacheEntry, + taskDomainID string, + task interface{}, +) error { + + if domainEntry.IsGlobalDomain() && domainEntry.GetFailoverEndTime() != nil { + // the domain is pending active, pause on processing this task + t.logger.Debug("Domain is not in pending active, skip task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task)) + return htask.ErrTaskRedispatch + } + return nil +} + // lock block all task allocation func (t *taskAllocatorImpl) lock() { t.locker.Lock() diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 8efed1eb217..122e6bffee7 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -89,6 +89,7 @@ func newTransferQueueActiveProcessor( } return taskAllocator.verifyActiveTask(task.DomainID, task) } + maxReadAckLevel := func() int64 { return shard.GetTransferMaxReadLevel() } @@ -219,6 +220,7 @@ func newTransferQueueFailoverProcessor( } return taskAllocator.verifyFailoverActiveTask(domainIDs, task.DomainID, task) } + maxReadAckLevel := func() int64 { return maxLevel // this is a const }