Skip to content

Commit

Permalink
Improve retry policy for operations in task processing (cadence-workf…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and yux0 committed May 4, 2021
1 parent 2e79186 commit 4550d7a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
10 changes: 5 additions & 5 deletions service/history/task/timer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
)

var (
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
taskRetryPolicy = common.CreateTaskProcessingRetryPolicy()
)

type (
Expand Down Expand Up @@ -227,7 +227,7 @@ func (t *timerTaskExecutorBase) deleteWorkflowExecution(
RunID: task.RunID,
})
}
return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
}

func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution(
Expand All @@ -242,7 +242,7 @@ func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution(
RunID: task.RunID,
})
}
return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
}

func (t *timerTaskExecutorBase) deleteWorkflowHistory(
Expand All @@ -262,7 +262,7 @@ func (t *timerTaskExecutorBase) deleteWorkflowHistory(
})

}
return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
}

func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
Expand All @@ -280,5 +280,5 @@ func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
// TODO: expose GetVisibilityManager method on shardContext interface
return t.shard.GetService().GetVisibilityManager().DeleteWorkflowExecution(ctx, request) // delete from db
}
return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
}
6 changes: 3 additions & 3 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ func (t *transferActiveTaskExecutor) requestCancelExternalExecutionWithRetry(
return thrift.FromError(err)
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
err := backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)

if _, ok := err.(*workflow.CancellationAlreadyRequestedError); ok {
// err is CancellationAlreadyRequestedError
Expand Down Expand Up @@ -1260,7 +1260,7 @@ func (t *transferActiveTaskExecutor) signalExternalExecutionWithRetry(
return thrift.FromError(err)
}

return backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
}

func (t *transferActiveTaskExecutor) startWorkflowWithRetry(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ func (t *transferActiveTaskExecutor) startWorkflowWithRetry(
return err
}

err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
err = backoff.Retry(op, taskRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return "", err
}
Expand Down

0 comments on commit 4550d7a

Please sign in to comment.