Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ut state builder #808

Merged
merged 10 commits into from
Jun 11, 2018
2,116 changes: 2,116 additions & 0 deletions service/history/MockMutableState.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newConflictResolver(shard ShardContext, context *workflowExecutionContext,
}
}

func (r *conflictResolver) reset(requestID string, replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) {
func (r *conflictResolver) reset(requestID string, replayEventID int64, startTime time.Time) (mutableState, error) {
domainID := r.context.domainID
execution := r.context.workflowExecution
replayNextEventID := replayEventID + 1
Expand Down Expand Up @@ -111,7 +111,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim
resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime

sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(lastEvent.GetVersion())
resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, lastEvent.GetVersion(), replayEventID)
resetMutableStateBuilder.UpdateReplicationStateLastEventID(sourceCluster, lastEvent.GetVersion(), replayEventID)

r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v",
execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID())
Expand Down
14 changes: 8 additions & 6 deletions service/history/failoverCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func verifyTimerTaskVersion(shard ShardContext, domainID string, version int64,

// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTransferTask(context *workflowExecutionContext, transferTask *persistence.TransferTaskInfo, metricsClient metrics.Client, logger bark.Logger) (*mutableStateBuilder, error) {
func loadMutableStateForTransferTask(context *workflowExecutionContext, transferTask *persistence.TransferTaskInfo, metricsClient metrics.Client, logger bark.Logger) (mutableState, error) {
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
Expand All @@ -76,13 +76,14 @@ func loadMutableStateForTransferTask(context *workflowExecutionContext, transfer
}
return nil, err
}
executionInfo := msBuilder.GetExecutionInfo()

// check to see if cache needs to be refreshed as we could potentially have stale workflow execution
// the exception is decision consistently fail
// there will be no event generated, thus making the decision schedule ID == next event ID
isDecisionRetry := transferTask.TaskType == persistence.TransferTaskTypeDecisionTask &&
msBuilder.executionInfo.DecisionScheduleID == transferTask.ScheduleID &&
msBuilder.executionInfo.DecisionAttempt > 0
executionInfo.DecisionScheduleID == transferTask.ScheduleID &&
executionInfo.DecisionAttempt > 0

if transferTask.ScheduleID >= msBuilder.GetNextEventID() && !isDecisionRetry {
metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.StaleMutableStateCounter)
Expand All @@ -104,7 +105,7 @@ func loadMutableStateForTransferTask(context *workflowExecutionContext, transfer

// load mutable state, if mutable state's next event ID <= task ID, will attempt to refresh
// if still mutable state's next event ID <= task ID, will return nil, nil
func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *persistence.TimerTaskInfo, metricsClient metrics.Client, logger bark.Logger) (*mutableStateBuilder, error) {
func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *persistence.TimerTaskInfo, metricsClient metrics.Client, logger bark.Logger) (mutableState, error) {
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
Expand All @@ -113,13 +114,14 @@ func loadMutableStateForTimerTask(context *workflowExecutionContext, timerTask *
}
return nil, err
}
executionInfo := msBuilder.GetExecutionInfo()

// check to see if cache needs to be refreshed as we could potentially have stale workflow execution
// the exception is decision consistently fail
// there will be no event generated, thus making the decision schedule ID == next event ID
isDecisionRetry := timerTask.TaskType == persistence.TaskTypeDecisionTimeout &&
msBuilder.executionInfo.DecisionScheduleID == timerTask.EventID &&
msBuilder.executionInfo.DecisionAttempt > 0
executionInfo.DecisionScheduleID == timerTask.EventID &&
executionInfo.DecisionAttempt > 0

if timerTask.EventID >= msBuilder.GetNextEventID() && !isDecisionRetry {
metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.StaleMutableStateCounter)
Expand Down
Loading