Skip to content

Commit

Permalink
Add filter for pending active task to redispatch (cadence-workflow#3279)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and mkolodezny committed May 29, 2020
1 parent 751ac69 commit 5ba14af
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 0 deletions.
5 changes: 5 additions & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ func (entry *DomainCacheEntry) GetNotificationVersion() int64 {
return entry.notificationVersion
}

// GetFailoverEndTime return the failover end time
func (entry *DomainCacheEntry) GetFailoverEndTime() *int64 {
return entry.failoverEndTime
}

// IsDomainActive return whether the domain is active, i.e. non global domain or global domain which active cluster is the current cluster
func (entry *DomainCacheEntry) IsDomainActive() bool {
if !entry.isGlobalDomain {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/loggerimpl"
Expand Down
6 changes: 6 additions & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ var (
ErrTaskDiscarded = errors.New("passive task pending for too long")
// ErrTaskRetry is the error indicating that the timer / transfer task should be retried.
ErrTaskRetry = errors.New("passive task should retry due to condition in mutable state is not met")
// ErrTaskRedispatch is the error indicating that the task should be re-dispatch
ErrTaskRedispatch = errors.New("redispatch the task while the domain is pending-acitve")
)

type (
Expand Down Expand Up @@ -305,6 +307,10 @@ func (t *taskBase) HandleErr(
func (t *taskBase) RetryErr(
err error,
) bool {
if err == ErrTaskRedispatch {
return false
}

return true
}

Expand Down
46 changes: 46 additions & 0 deletions service/history/taskAllocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/service/history/shard"
htask "github.com/uber/cadence/service/history/task"
)

type (
Expand Down Expand Up @@ -80,6 +81,15 @@ func (t *taskAllocatorImpl) verifyActiveTask(taskDomainID string, task interface
t.logger.Debug("Domain is not active, skip task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task))
return false, nil
}

if err := t.checkDomainPendingActive(
domainEntry,
taskDomainID,
task,
); err != nil {
return false, err
}

t.logger.Debug("Domain is active, process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task))
return true, nil
}
Expand All @@ -88,6 +98,28 @@ func (t *taskAllocatorImpl) verifyActiveTask(taskDomainID string, task interface
func (t *taskAllocatorImpl) verifyFailoverActiveTask(targetDomainIDs map[string]struct{}, taskDomainID string, task interface{}) (bool, error) {
_, ok := targetDomainIDs[taskDomainID]
if ok {
t.locker.RLock()
defer t.locker.RUnlock()

domainEntry, err := t.domainCache.GetDomainByID(taskDomainID)
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as not active
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID))
return false, err
}
t.logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task))
return false, nil
}
if err := t.checkDomainPendingActive(
domainEntry,
taskDomainID,
task,
); err != nil {
return false, err
}

t.logger.Debug("Failover Domain is active, process task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task))
return true, nil
}
Expand Down Expand Up @@ -124,6 +156,20 @@ func (t *taskAllocatorImpl) verifyStandbyTask(standbyCluster string, taskDomainI
return true, nil
}

func (t *taskAllocatorImpl) checkDomainPendingActive(
domainEntry *cache.DomainCacheEntry,
taskDomainID string,
task interface{},
) error {

if domainEntry.IsGlobalDomain() && domainEntry.GetFailoverEndTime() != nil {
// the domain is pending active, pause on processing this task
t.logger.Debug("Domain is not in pending active, skip task.", tag.WorkflowDomainID(taskDomainID), tag.Value(task))
return htask.ErrTaskRedispatch
}
return nil
}

// lock block all task allocation
func (t *taskAllocatorImpl) lock() {
t.locker.Lock()
Expand Down
2 changes: 2 additions & 0 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func newTransferQueueActiveProcessor(
}
return taskAllocator.verifyActiveTask(task.DomainID, task)
}

maxReadAckLevel := func() int64 {
return shard.GetTransferMaxReadLevel()
}
Expand Down Expand Up @@ -219,6 +220,7 @@ func newTransferQueueFailoverProcessor(
}
return taskAllocator.verifyFailoverActiveTask(domainIDs, task.DomainID, task)
}

maxReadAckLevel := func() int64 {
return maxLevel // this is a const
}
Expand Down

0 comments on commit 5ba14af

Please sign in to comment.