Skip to content

Commit

Permalink
some behavior change on worker (#847)
Browse files Browse the repository at this point in the history
* some behavior change on worker, i.e. retry forever on RetryTaskError
  • Loading branch information
wxing1292 authored Jun 13, 2018
1 parent a8958f0 commit 524538d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
17 changes: 14 additions & 3 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ var (
// ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should
// try this task again after a small delay.
ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "workflow execution not found"}
// ErrRetryExecutionAlreadyStarted is returned to indicate another workflow execution already started,
// this error can be return if we encounter race condition, i.e. terminating the target workflow while
// the target workflow has done continue as new.
// try this task again after a small delay.
ErrRetryExecutionAlreadyStarted = &shared.RetryTaskError{Message: "another workflow execution is running"}
// ErrMissingReplicationInfo is returned when replication task is missing replication information from source cluster
ErrMissingReplicationInfo = &shared.BadRequestError{Message: "replication task is missing cluster replication info"}
// ErrCorruptedReplicationInfo is returned when replication task has corrupted replication information from source cluster
Expand Down Expand Up @@ -99,9 +104,15 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl,

func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retError error) {
defer func() {
if _, ok := retError.(*shared.EntityNotExistsError); ok {
r.logger.Warnf("Encounter EntityNotExistsError: %v", retError)
retError = ErrRetryEntityNotExists
if retError != nil {
switch retError.(type) {
case *shared.EntityNotExistsError:
r.logger.Warnf("Encounter EntityNotExistsError: %v", retError)
retError = ErrRetryEntityNotExists
case *shared.WorkflowExecutionAlreadyStartedError:
r.logger.Warnf("Encounter WorkflowExecutionAlreadyStartedError: %v", retError)
retError = ErrRetryExecutionAlreadyStarted
}
}
}()

Expand Down
19 changes: 5 additions & 14 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {

var err error
ProcessRetryLoop:
for retryCount := 1; retryCount <= p.config.ReplicatorMaxRetryCount; {
for {
select {
case <-p.shutdownCh:
return
Expand All @@ -188,20 +188,9 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) {
return p.process(msg)
}
err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError)
if err != nil {
// Check if this is an explicit ask to retry the task by handler
if _, ok := err.(*shared.RetryTaskError); ok {
// Increment the retryCount as we will retry the error upto ReplicatorMaxRetryCount before moving
// it to DLQ
retryCount++
time.Sleep(p.config.ReplicatorRetryDelay)
continue ProcessRetryLoop
}

if err != nil && p.isTransientRetryableError(err) {
// Keep on retrying transient errors for ever
if p.isTransientRetryableError(err) {
continue ProcessRetryLoop
}
continue ProcessRetryLoop
}
}

Expand Down Expand Up @@ -331,6 +320,8 @@ func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool {
return true
case *shared.InternalServiceError:
return true
case *shared.RetryTaskError:
return true
}

return false
Expand Down
7 changes: 1 addition & 6 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package worker

import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand All @@ -45,7 +43,6 @@ type (
// Replicator settings
ReplicatorConcurrency int
ReplicatorMaxRetryCount int
ReplicatorRetryDelay time.Duration
}
)

Expand All @@ -61,9 +58,7 @@ func NewService(params *service.BootstrapParams) common.Daemon {
// NewConfig builds the new Config for cadence-worker service
func NewConfig() *Config {
return &Config{
ReplicatorConcurrency: 10,
ReplicatorMaxRetryCount: 20,
ReplicatorRetryDelay: 50 * time.Millisecond,
ReplicatorConcurrency: 1000,
}
}

Expand Down

0 comments on commit 524538d

Please sign in to comment.