Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce persistence context timeout in application layer: Part 5 #3653

Merged
merged 13 commits into from
Oct 15, 2020
8 changes: 6 additions & 2 deletions service/history/task/standby_task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package task

import (
"context"
"time"

"github.com/uber/cadence/.gen/go/shared"
Expand All @@ -32,13 +33,14 @@ import (
)

type (
standbyActionFn func(execution.Context, execution.MutableState) (interface{}, error)
standbyPostActionFn func(Info, interface{}, log.Logger) error
standbyActionFn func(context.Context, execution.Context, execution.MutableState) (interface{}, error)
standbyPostActionFn func(context.Context, Info, interface{}, log.Logger) error

standbyCurrentTimeFn func() time.Time
)

func standbyTaskPostActionNoOp(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand All @@ -53,6 +55,7 @@ func standbyTaskPostActionNoOp(
}

func standbyTransferTaskPostActionTaskDiscarded(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand All @@ -76,6 +79,7 @@ func standbyTransferTaskPostActionTaskDiscarded(
}

func standbyTimerTaskPostActionTaskDiscarded(
ctx context.Context,
taskInfo Info,
postActionInfo interface{},
logger log.Logger,
Expand Down
13 changes: 8 additions & 5 deletions service/history/task/task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,13 @@ func verifyTaskVersion(
// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTimerTask(
ctx context.Context,
wfContext execution.Context,
timerTask *persistence.TimerTaskInfo,
metricsClient metrics.Client,
logger log.Logger,
) (execution.MutableState, error) {
msBuilder, err := wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
Expand All @@ -244,7 +245,7 @@ func loadMutableStateForTimerTask(
metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
wfContext.Clear()

msBuilder, err = wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
Expand All @@ -262,13 +263,14 @@ func loadMutableStateForTimerTask(
// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTransferTask(
ctx context.Context,
wfContext execution.Context,
transferTask *persistence.TransferTaskInfo,
metricsClient metrics.Client,
logger log.Logger,
) (execution.MutableState, error) {

msBuilder, err := wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
Expand All @@ -289,7 +291,7 @@ func loadMutableStateForTransferTask(
metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.StaleMutableStateCounter)
wfContext.Clear()

msBuilder, err = wfContext.LoadWorkflowExecution(context.TODO())
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -326,6 +328,7 @@ func timeoutWorkflow(
}

func retryWorkflow(
ctx context.Context,
mutableState execution.MutableState,
eventBatchFirstEventID int64,
parentDomainName string,
Expand All @@ -343,7 +346,7 @@ func retryWorkflow(
}

_, newMutableState, err := mutableState.AddContinueAsNewEvent(
context.TODO(),
ctx,
eventBatchFirstEventID,
common.EmptyEventID,
parentDomainName,
Expand Down
79 changes: 45 additions & 34 deletions service/history/task/timer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,41 +78,45 @@ func (t *timerActiveTaskExecutor) Execute(
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()

switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
return t.executeUserTimerTimeoutTask(timerTask)
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
return t.executeActivityTimeoutTask(timerTask)
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
return t.executeDecisionTimeoutTask(timerTask)
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
return t.executeWorkflowTimeoutTask(timerTask)
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
return t.executeActivityRetryTimerTask(timerTask)
return t.executeActivityRetryTimerTask(ctx, timerTask)
case persistence.TaskTypeWorkflowBackoffTimer:
return t.executeWorkflowBackoffTimerTask(timerTask)
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
return t.executeDeleteHistoryEventTask(timerTask)
return t.executeDeleteHistoryEventTask(ctx, timerTask)
default:
return errUnknownTimerTask
}
}

func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,24 +153,25 @@ Loop:
return nil
}

return t.updateWorkflowExecution(context, mutableState, timerFired)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, timerFired)
}

func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,24 +249,25 @@ Loop:
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(context, mutableState, scheduleDecision)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}

func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,24 +324,25 @@ func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
scheduleDecision = true
}

return t.updateWorkflowExecution(context, mutableState, scheduleDecision)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}

func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(context, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand All @@ -355,24 +362,25 @@ func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
}

// schedule first decision task
return t.updateWorkflowExecution(context, mutableState, true)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, true)
}

func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(wfContext, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -411,7 +419,7 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
// previously, DomainID in activity info is not used, so need to get
// schedule event from DB checking whether activity to be scheduled
// belongs to this domain
scheduledEvent, err := mutableState.GetActivityScheduledEvent(context.TODO(), scheduledID)
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledID)
if err != nil {
return err
}
Expand All @@ -434,7 +442,7 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(

release(nil) // release earlier as we don't need the lock anymore

return t.shard.GetService().GetMatchingClient().AddActivityTask(nil, &m.AddActivityTaskRequest{
return t.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &m.AddActivityTaskRequest{
DomainUUID: common.StringPtr(targetDomainID),
SourceDomainUUID: common.StringPtr(domainID),
Execution: &execution,
Expand All @@ -445,20 +453,21 @@ func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
}

func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {

wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskDefaultTimeout,
taskGetExecutionContextTimeout,
)
if err != nil {
return err
}
defer func() { release(retError) }()

mutableState, err := loadMutableStateForTimerTask(wfContext, task, t.metricsClient, t.logger)
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
Expand All @@ -482,7 +491,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
continueAsNewInitiator := workflow.ContinueAsNewInitiatorRetryPolicy
if backoffInterval == backoff.NoBackoff {
// check if a cron backoff is needed
backoffInterval, err = mutableState.GetCronBackoffDuration(context.TODO())
backoffInterval, err = mutableState.GetCronBackoffDuration(ctx)
if err != nil {
return err
}
Expand All @@ -495,11 +504,11 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
return t.updateWorkflowExecution(wfContext, mutableState, false)
return t.updateWorkflowExecution(ctx, wfContext, mutableState, false)
}

// workflow timeout, but a retry or cron is needed, so we do continue as new to retry or cron
startEvent, err := mutableState.GetStartEvent(context.TODO())
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
Expand All @@ -521,6 +530,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
SearchAttributes: startAttributes.SearchAttributes,
}
newMutableState, err := retryWorkflow(
ctx,
mutableState,
eventBatchFirstEventID,
startAttributes.GetParentWorkflowDomain(),
Expand All @@ -532,7 +542,7 @@ func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(

newExecutionInfo := newMutableState.GetExecutionInfo()
return wfContext.UpdateWorkflowExecutionWithNewAsActive(
context.TODO(),
ctx,
t.shard.GetTimeSource().Now(),
execution.NewContext(
newExecutionInfo.DomainID,
Expand All @@ -557,6 +567,7 @@ func (t *timerActiveTaskExecutor) getTimerSequence(
}

func (t *timerActiveTaskExecutor) updateWorkflowExecution(
ctx context.Context,
wfContext execution.Context,
mutableState execution.MutableState,
scheduleNewDecision bool,
Expand All @@ -572,7 +583,7 @@ func (t *timerActiveTaskExecutor) updateWorkflowExecution(
}

now := t.shard.GetTimeSource().Now()
err = wfContext.UpdateWorkflowExecutionAsActive(context.TODO(), now)
err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
if err != nil {
// if is shard ownership error, the shard context will stop the entire history engine
// we don't need to explicitly stop the queue processor here
Expand Down
Loading