From d40b096c7c3ae67aed87b7b6bf7ed3fe0e6cb696 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 17 Nov 2020 13:38:08 -0800 Subject: [PATCH] Improve retry policy for operations in task processing --- service/history/task/timer_task_executor_base.go | 10 +++++----- service/history/task/transfer_active_task_executor.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/service/history/task/timer_task_executor_base.go b/service/history/task/timer_task_executor_base.go index 0eaf56d8407..fbf240a676b 100644 --- a/service/history/task/timer_task_executor_base.go +++ b/service/history/task/timer_task_executor_base.go @@ -37,7 +37,7 @@ import ( ) var ( - persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy() + taskRetryPolicy = common.CreateTaskProcessingRetryPolicy() ) type ( @@ -227,7 +227,7 @@ func (t *timerTaskExecutorBase) deleteWorkflowExecution( RunID: task.RunID, }) } - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) } func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution( @@ -242,7 +242,7 @@ func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution( RunID: task.RunID, }) } - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) } func (t *timerTaskExecutorBase) deleteWorkflowHistory( @@ -262,7 +262,7 @@ func (t *timerTaskExecutorBase) deleteWorkflowHistory( }) } - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) } func (t *timerTaskExecutorBase) deleteWorkflowVisibility( @@ -280,5 +280,5 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility( // TODO: expose GetVisibilityManager method on shardContext interface return t.shard.GetService().GetVisibilityManager().DeleteWorkflowExecution(ctx, request) // delete from db } - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) } diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 3087741e8fd..019027ea35f 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -1213,7 +1213,7 @@ func (t *transferActiveTaskExecutor) requestCancelExternalExecutionWithRetry( return thrift.FromError(err) } - err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + err := backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) if _, ok := err.(*workflow.CancellationAlreadyRequestedError); ok { // err is CancellationAlreadyRequestedError @@ -1260,7 +1260,7 @@ func (t *transferActiveTaskExecutor) signalExternalExecutionWithRetry( return thrift.FromError(err) } - return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) } func (t *transferActiveTaskExecutor) startWorkflowWithRetry( @@ -1321,7 +1321,7 @@ func (t *transferActiveTaskExecutor) startWorkflowWithRetry( return err } - err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) + err = backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError) if err != nil { return "", err }