Skip to content

Commit

Permalink
Notify new tasks when persistence operation timeout (#3678)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Oct 22, 2020
1 parent f1ae5a7 commit bb7ad07
Showing 1 changed file with 70 additions and 37 deletions.
107 changes: 70 additions & 37 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,14 @@ func (c *contextImpl) CreateWorkflowExecution(

_, err := c.createWorkflowExecutionWithRetry(createRequest)
if err != nil {
if c.isPersistenceTimeoutError(err) {
c.notifyTasksFromWorkflowSnapshot(newWorkflow)
}
return err
}

c.notifyTasks(
newWorkflow.TransferTasks,
newWorkflow.ReplicationTasks,
newWorkflow.TimerTasks,
)
c.notifyTasksFromWorkflowSnapshot(newWorkflow)

return nil
}

Expand Down Expand Up @@ -524,6 +524,11 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
CurrentWorkflowMutation: currentWorkflow,
// Encoding, this is set by shard context
}); err != nil {
if c.isPersistenceTimeoutError(err) {
c.notifyTasksFromWorkflowSnapshot(resetWorkflow)
c.notifyTasksFromWorkflowSnapshot(newWorkflow)
c.notifyTasksFromWorkflowMutation(currentWorkflow)
}
return err
}

Expand All @@ -545,25 +550,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
workflowCloseState,
))

c.notifyTasks(
resetWorkflow.TransferTasks,
resetWorkflow.ReplicationTasks,
resetWorkflow.TimerTasks,
)
if newWorkflow != nil {
c.notifyTasks(
newWorkflow.TransferTasks,
newWorkflow.ReplicationTasks,
newWorkflow.TimerTasks,
)
}
if currentWorkflow != nil {
c.notifyTasks(
currentWorkflow.TransferTasks,
currentWorkflow.ReplicationTasks,
currentWorkflow.TimerTasks,
)
}
c.notifyTasksFromWorkflowSnapshot(resetWorkflow)
c.notifyTasksFromWorkflowSnapshot(newWorkflow)
c.notifyTasksFromWorkflowMutation(currentWorkflow)

return nil
}

Expand Down Expand Up @@ -717,6 +707,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
// Encoding, this is set by shard context
})
if err != nil {
if c.isPersistenceTimeoutError(err) {
c.notifyTasksFromWorkflowMutation(currentWorkflow)
c.notifyTasksFromWorkflowSnapshot(newWorkflow)
}
return err
}

Expand All @@ -741,20 +735,10 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
))

// notify current workflow tasks
c.notifyTasks(
currentWorkflow.TransferTasks,
currentWorkflow.ReplicationTasks,
currentWorkflow.TimerTasks,
)
c.notifyTasksFromWorkflowMutation(currentWorkflow)

// notify new workflow tasks
if newWorkflow != nil {
c.notifyTasks(
newWorkflow.TransferTasks,
newWorkflow.ReplicationTasks,
newWorkflow.TimerTasks,
)
}
c.notifyTasksFromWorkflowSnapshot(newWorkflow)

// finally emit session stats
domainName := c.GetDomainName()
Expand All @@ -781,9 +765,34 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
return nil
}

func (c *contextImpl) notifyTasksFromWorkflowSnapshot(
workflowSnapShot *persistence.WorkflowSnapshot,
) {
if workflowSnapShot == nil {
return
}

c.notifyTasks(
workflowSnapShot.TransferTasks,
workflowSnapShot.TimerTasks,
)
}

func (c *contextImpl) notifyTasksFromWorkflowMutation(
workflowMutation *persistence.WorkflowMutation,
) {
if workflowMutation == nil {
return
}

c.notifyTasks(
workflowMutation.TransferTasks,
workflowMutation.TimerTasks,
)
}

func (c *contextImpl) notifyTasks(
transferTasks []persistence.Task,
replicationTasks []persistence.Task,
timerTasks []persistence.Task,
) {
c.shard.GetEngine().NotifyNewTransferTasks(transferTasks)
Expand Down Expand Up @@ -1152,3 +1161,27 @@ func (c *contextImpl) ReapplyEvents(
},
)
}

func (c *contextImpl) isPersistenceTimeoutError(
err error,
) bool {
// TODO: ideally we only need to check if err has type *persistence.Timeout,
// but currently only cassandra will return timeout error of that type.
// so currently this method will return false positives
switch err.(type) {
case nil:
return false
case *workflow.WorkflowExecutionAlreadyStartedError,
*persistence.WorkflowExecutionAlreadyStartedError,
*persistence.CurrentWorkflowConditionFailedError,
*persistence.ConditionFailedError,
*workflow.ServiceBusyError,
*workflow.LimitExceededError,
*persistence.ShardOwnershipLostError:
return false
case *persistence.TimeoutError:
return true
default:
return err != ErrConflict
}
}

0 comments on commit bb7ad07

Please sign in to comment.