Skip to content

Commit

Permalink
use exponential backoff for task processing
Browse files Browse the repository at this point in the history
do not send shard time sync if has sent out replication task
  • Loading branch information
Wenquan Xing committed Jul 12, 2018
1 parent 23ba2b9 commit 3f62a38
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 37 deletions.
17 changes: 17 additions & 0 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
logger bark.Logger
metricsClient metrics.Client
standbyClusterCurrentTime map[string]time.Time
lastShardSyncTimestamp time.Time
}

// TestBase wraps the base setup needed to create workflows over engine layer.
Expand Down Expand Up @@ -246,6 +247,22 @@ func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return nil
}

// GetLastShardSyncTimestamp get the last shard sync time
func (s *TestShardContext) GetLastShardSyncTimestamp() time.Time {
s.RLock()
defer s.RUnlock()

return s.lastShardSyncTimestamp
}

// UpdateLastShardSyncTimestamp set the last shard sync time
func (s *TestShardContext) UpdateLastShardSyncTimestamp(now time.Time) {
s.Lock()
defer s.Unlock()

s.lastShardSyncTimestamp = now
}

// GetDomainNotificationVersion test implementation
func (s *TestShardContext) GetDomainNotificationVersion() int64 {
s.RLock()
Expand Down
38 changes: 22 additions & 16 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
metricsClient metrics.Client
rateLimiter common.TokenBucket // Read rate limiter
ackMgr queueAckMgr
retryPolicy backoff.RetryPolicy

// worker coroutines notification
workerNotificationChans []chan struct{}
Expand Down Expand Up @@ -95,6 +96,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
metricsClient: shard.GetMetricsClient(),
logger: logger,
ackMgr: queueAckMgr,
retryPolicy: common.CreatePersistanceRetryPolicy(),
lastPollTime: time.Time{},
}

Expand Down Expand Up @@ -248,8 +250,20 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{},
var logger bark.Logger
var err error
startTime := time.Now()

retryCount := 0
op := func() error {
err = p.processor.process(task)
if err != nil && err != ErrTaskRetry {
retryCount++
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)
}
return err
}

ProcessRetryLoop:
for retryCount := 1; retryCount <= p.options.MaxRetryCount(); {
for retryCount < p.options.MaxRetryCount() {
select {
case <-p.shutdownCh:
return
Expand All @@ -260,25 +274,17 @@ ProcessRetryLoop:
default:
}

err = p.processor.process(task)
err = backoff.Retry(op, p.retryPolicy, func(err error) bool {
return err != ErrTaskRetry
})

if err != nil {
if err == ErrTaskRetry {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter)
<-notificationChan
} else {
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)

// it is possible that DomainNotActiveError is thrown
// just keep try for cache.DomainCacheRefreshInterval
// and giveup
if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
backoff := time.Duration(retryCount * 100)
time.Sleep(backoff * time.Millisecond)
retryCount++
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
continue ProcessRetryLoop
}
Expand Down
6 changes: 5 additions & 1 deletion service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi
},
}

return p.replicator.Publish(replicationTask)
err = p.replicator.Publish(replicationTask)
if err == nil {
p.shard.UpdateLastShardSyncTimestamp(common.NewRealTimeSource().Now())
}
return err
}

func (p *replicatorQueueProcessorImpl) readTasks(readLevel int64) ([]queueTaskInfo, bool, error) {
Expand Down
31 changes: 27 additions & 4 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type (
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
GetLastShardSyncTimestamp() time.Time
UpdateLastShardSyncTimestamp(time.Time)
GetDomainNotificationVersion() int64
UpdateDomainNotificationVersion(domainNotificationVersion int64) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
Expand Down Expand Up @@ -98,7 +100,10 @@ type (
transferSequenceNumber int64
maxTransferSequenceNumber int64
transferMaxReadLevel int64

// exist only in memory
standbyClusterCurrentTime map[string]time.Time
lastShardSyncTimestamp time.Time
}
)

Expand Down Expand Up @@ -243,6 +248,20 @@ func (s *shardContextImpl) UpdateDomainNotificationVersion(domainNotificationVer
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetLastShardSyncTimestamp() time.Time {
s.RLock()
defer s.RUnlock()

return s.lastShardSyncTimestamp
}

func (s *shardContextImpl) UpdateLastShardSyncTimestamp(now time.Time) {
s.Lock()
defer s.Unlock()

s.lastShardSyncTimestamp = now
}

func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
*persistence.CreateWorkflowExecutionResponse, error) {
s.Lock()
Expand Down Expand Up @@ -588,13 +607,14 @@ func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) {
}

func (s *shardContextImpl) updateShardInfoLocked() error {
now := time.Now()
var err error
now := common.NewRealTimeSource().Now()
if s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
return nil
}
updatedShardInfo := copyShardInfo(s.shardInfo)

if s.messageProducer != nil {
if s.messageProducer != nil && s.lastShardSyncTimestamp.Add(s.config.ShardUpdateMinInterval()).Before(now) {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
Expand All @@ -604,10 +624,13 @@ func (s *shardContextImpl) updateShardInfoLocked() error {
},
}
// ignore the error
s.messageProducer.Publish(syncStatusTask)
err = s.messageProducer.Publish(syncStatusTask)
if err == nil {
s.lastShardSyncTimestamp = now
}
}

err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
err = s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
PreviousRangeID: s.shardInfo.RangeID,
})
Expand Down
38 changes: 22 additions & 16 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type (
timerQueueAckMgr timerQueueAckMgr
rateLimiter common.TokenBucket
startDelay dynamicconfig.DurationPropertyFn
retryPolicy backoff.RetryPolicy

// worker coroutines notification
workerNotificationChans []chan struct{}
Expand Down Expand Up @@ -114,6 +115,7 @@ func newTimerQueueProcessorBase(scope int, shard ShardContext, historyService *h
lastPollTime: time.Time{},
rateLimiter: common.NewTokenBucket(maxPollRPS(), common.NewRealTimeSource()),
startDelay: startDelay,
retryPolicy: common.CreatePersistanceRetryPolicy(),
}

return base
Expand Down Expand Up @@ -373,8 +375,20 @@ func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struc
var logger bark.Logger
var err error
startTime := time.Now()

attempt := 0
op := func() error {
err = t.timerProcessor.process(task)
if err != nil && err != ErrTaskRetry {
attempt++
logger = t.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)
}
return err
}

ProcessRetryLoop:
for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount(); {
for attempt < t.config.TimerTaskMaxRetryCount() {
select {
case <-t.shutdownCh:
return
Expand All @@ -385,25 +399,17 @@ ProcessRetryLoop:
default:
}

err = t.timerProcessor.process(task)
err = backoff.Retry(op, t.retryPolicy, func(err error) bool {
return err != ErrTaskRetry
})

if err != nil {
if err == ErrTaskRetry {
t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskStandbyRetryCounter)
<-notificationChan
} else {
logger = t.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)

// it is possible that DomainNotActiveError is thrown
// just keep try for cache.DomainCacheRefreshInterval
// and giveup
if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskNotActiveCounter)
return
}
backoff := time.Duration(attempt * 100)
time.Sleep(backoff * time.Millisecond)
attempt++
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskNotActiveCounter)
return
}
continue ProcessRetryLoop
}
Expand Down

0 comments on commit 3f62a38

Please sign in to comment.