Skip to content

Commit

Permalink
Enforce persistence context timeout in application layer: Part 5 (cad…
Browse files Browse the repository at this point in the history
…ence-workflow#3653)

Enforce persistence context timeout for task processing
  • Loading branch information
yycptt authored Feb 4, 2021
1 parent a2ba536 commit 3bea963
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 207 deletions.
8 changes: 6 additions & 2 deletions service/history/task/standby_task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package task

import (
"context"
"time"

"github.com/uber/cadence/.gen/go/shared"
Expand All @@ -32,13 +33,14 @@ import (
)

type (
standbyActionFn func(execution.Context, execution.MutableState) (interface{}, error)
standbyPostActionFn func(Info, interface{}, log.Logger) error
standbyActionFn func(context.Context, execution.Context, execution.MutableState) (interface{}, error)
standbyPostActionFn func(context.Context, Info, interface{}, log.Logger) error

standbyCurrentTimeFn func() time.Time
)

func standbyTaskPostActionNoOp(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand All @@ -53,6 +55,7 @@ func standbyTaskPostActionNoOp(
}

func standbyTransferTaskPostActionTaskDiscarded(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand All @@ -76,6 +79,7 @@ func standbyTransferTaskPostActionTaskDiscarded(
}

func standbyTimerTaskPostActionTaskDiscarded(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand Down
13 changes: 8 additions & 5 deletions service/history/task/task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,13 @@ func verifyTaskVersion(
// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTimerTask(
ctx context.Context,
wfContext execution.Context,
timerTask *persistence.TimerTaskInfo,
metricsClient metrics.Client,
logger log.Logger,
) (execution.MutableState, error) {
msBuilder, err := wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
Expand All @@ -244,7 +245,7 @@ func loadMutableStateForTimerTask(
metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
wfContext.Clear()

msBuilder, err = wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
Expand All @@ -262,13 +263,14 @@ func loadMutableStateForTimerTask(
// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTransferTask(
ctx context.Context,
wfContext execution.Context,
transferTask *persistence.TransferTaskInfo,
metricsClient metrics.Client,
logger log.Logger,
) (execution.MutableState, error) {

msBuilder, err := wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
Expand All @@ -289,7 +291,7 @@ func loadMutableStateForTransferTask(
metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.StaleMutableStateCounter)
wfContext.Clear()

msBuilder, err = wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -326,6 +328,7 @@ func timeoutWorkflow(
}

func retryWorkflow(
ctx context.Context,
mutableState execution.MutableState,
eventBatchFirstEventID int64,
parentDomainName string,
Expand All @@ -343,7 +346,7 @@ func retryWorkflow(
}

_, newMutableState, err := mutableState.AddContinueAsNewEvent(
context.TODO(),
ctx,
eventBatchFirstEventID,
common.EmptyEventID,
parentDomainName,
Expand Down
79 changes: 45 additions & 34 deletions service/history/task/timer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,41 +78,45 @@ func (t *timerActiveTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
return t.executeUserTimerTimeoutTask(timerTask)
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
return t.executeActivityTimeoutTask(timerTask)
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
return t.executeDecisionTimeoutTask(timerTask)
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
return t.executeWorkflowTimeoutTask(timerTask)
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
return t.executeActivityRetryTimerTask(timerTask)
return t.executeActivityRetryTimerTask(ctx, timerTask)
case persistence.TaskTypeWorkflowBackoffTimer:
return t.executeWorkflowBackoffTimerTask(timerTask)
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
return t.executeDeleteHistoryEventTask(timerTask)
return t.executeDeleteHistoryEventTask(ctx, timerTask)
default:
return errUnknownTimerTask
}
}

func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,24 +153,25 @@ Loop:
return nil
}

return t.updateWorkflowExecution(context, mutableState, timerFired)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, timerFired)
}

func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,24 +249,25 @@ Loop:
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(context, mutableState, scheduleDecision)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}

func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,24 +324,25 @@ func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
scheduleDecision = true
}

return t.updateWorkflowExecution(context, mutableState, scheduleDecision)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}

func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand All @@ -355,24 +362,25 @@ func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
}

// schedule first decision task
return t.updateWorkflowExecution(context, mutableState, true)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, true)
}

func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(wfContext, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -411,7 +419,7 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
// previously, DomainID in activity info is not used, so need to get
// schedule event from DB checking whether activity to be scheduled
// belongs to this domain
scheduledEvent, err := mutableState.GetActivityScheduledEvent(context.TODO(), scheduledID)
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledID)
if err != nil {
return err
}
Expand All @@ -434,7 +442,7 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(

release(nil) // release earlier as we don't need the lock anymore

return t.shard.GetService().GetMatchingClient().AddActivityTask(nil, &m.AddActivityTaskRequest{
return t.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &m.AddActivityTaskRequest{
DomainUUID: common.StringPtr(targetDomainID),
SourceDomainUUID: common.StringPtr(domainID),
Execution: &execution,
Expand All @@ -445,20 +453,21 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
}

func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(wfContext, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand All @@ -482,7 +491,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
continueAsNewInitiator := workflow.ContinueAsNewInitiatorRetryPolicy
if backoffInterval == backoff.NoBackoff {
// check if a cron backoff is needed
backoffInterval, err = mutableState.GetCronBackoffDuration(context.TODO())
backoffInterval, err = mutableState.GetCronBackoffDuration(ctx)
if err != nil {
return err
}
Expand All @@ -495,11 +504,11 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
return t.updateWorkflowExecution(wfContext, mutableState, false)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, false)
}

// workflow timeout, but a retry or cron is needed, so we do continue as new to retry or cron
startEvent, err := mutableState.GetStartEvent(context.TODO())
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
Expand All @@ -521,6 +530,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
SearchAttributes: startAttributes.SearchAttributes,
}
newMutableState, err := retryWorkflow(
ctx,
mutableState,
eventBatchFirstEventID,
startAttributes.GetParentWorkflowDomain(),
Expand All @@ -532,7 +542,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(

newExecutionInfo := newMutableState.GetExecutionInfo()
return wfContext.UpdateWorkflowExecutionWithNewAsActive(
context.TODO(),
ctx,
t.shard.GetTimeSource().Now(),
execution.NewContext(
newExecutionInfo.DomainID,
Expand All @@ -557,6 +567,7 @@ func (t *timerActiveTaskExecutor) getTimerSequence(
}

func (t *timerActiveTaskExecutor) updateWorkflowExecution(
ctx context.Context,
wfContext execution.Context,
mutableState execution.MutableState,
scheduleNewDecision bool,
Expand All @@ -572,7 +583,7 @@ func (t *timerActiveTaskExecutor) updateWorkflowExecution(
}

now := t.shard.GetTimeSource().Now()
err = wfContext.UpdateWorkflowExecutionAsActive(context.TODO(), now)
err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
if err != nil {
// if is shard ownership error, the shard context will stop the entire history engine
// we don't need to explicitly stop the queue processor here
Expand Down
Loading

0 comments on commit 3bea963

Please sign in to comment.