From 524538d7c00950649c214c73fa6e9d7b4fd9cde9 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Wed, 13 Jun 2018 15:00:08 -0700 Subject: [PATCH] some behavior change on worker (#847) * some behavior change on worker, i.e. retry forever on RetryTaskError --- service/history/historyReplicator.go | 17 ++++++++++++++--- service/worker/processor.go | 19 +++++-------------- service/worker/service.go | 7 +------ 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index f12e637dcd0..b249f0c7aa7 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -64,6 +64,11 @@ var ( // ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should // try this task again after a small delay. ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "workflow execution not found"} + // ErrRetryExecutionAlreadyStarted is returned to indicate another workflow execution already started, + // this error can be return if we encounter race condition, i.e. terminating the target workflow while + // the target workflow has done continue as new. + // try this task again after a small delay. + ErrRetryExecutionAlreadyStarted = &shared.RetryTaskError{Message: "another workflow execution is running"} // ErrMissingReplicationInfo is returned when replication task is missing replication information from source cluster ErrMissingReplicationInfo = &shared.BadRequestError{Message: "replication task is missing cluster replication info"} // ErrCorruptedReplicationInfo is returned when replication task has corrupted replication information from source cluster @@ -99,9 +104,15 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl, func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retError error) { defer func() { - if _, ok := retError.(*shared.EntityNotExistsError); ok { - r.logger.Warnf("Encounter EntityNotExistsError: %v", retError) - retError = ErrRetryEntityNotExists + if retError != nil { + switch retError.(type) { + case *shared.EntityNotExistsError: + r.logger.Warnf("Encounter EntityNotExistsError: %v", retError) + retError = ErrRetryEntityNotExists + case *shared.WorkflowExecutionAlreadyStartedError: + r.logger.Warnf("Encounter WorkflowExecutionAlreadyStartedError: %v", retError) + retError = ErrRetryExecutionAlreadyStarted + } } }() diff --git a/service/worker/processor.go b/service/worker/processor.go index 3a7daf35ce8..7f4d78445e5 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -179,7 +179,7 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { var err error ProcessRetryLoop: - for retryCount := 1; retryCount <= p.config.ReplicatorMaxRetryCount; { + for { select { case <-p.shutdownCh: return @@ -188,20 +188,9 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { return p.process(msg) } err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError) - if err != nil { - // Check if this is an explicit ask to retry the task by handler - if _, ok := err.(*shared.RetryTaskError); ok { - // Increment the retryCount as we will retry the error upto ReplicatorMaxRetryCount before moving - // it to DLQ - retryCount++ - time.Sleep(p.config.ReplicatorRetryDelay) - continue ProcessRetryLoop - } - + if err != nil && p.isTransientRetryableError(err) { // Keep on retrying transient errors for ever - if p.isTransientRetryableError(err) { - continue ProcessRetryLoop - } + continue ProcessRetryLoop } } @@ -331,6 +320,8 @@ func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool { return true case *shared.InternalServiceError: return true + case *shared.RetryTaskError: + return true } return false diff --git a/service/worker/service.go b/service/worker/service.go index 291277d58ab..295b40913fe 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -21,8 +21,6 @@ package worker import ( - "time" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -45,7 +43,6 @@ type ( // Replicator settings ReplicatorConcurrency int ReplicatorMaxRetryCount int - ReplicatorRetryDelay time.Duration } ) @@ -61,9 +58,7 @@ func NewService(params *service.BootstrapParams) common.Daemon { // NewConfig builds the new Config for cadence-worker service func NewConfig() *Config { return &Config{ - ReplicatorConcurrency: 10, - ReplicatorMaxRetryCount: 20, - ReplicatorRetryDelay: 50 * time.Millisecond, + ReplicatorConcurrency: 1000, } }