Skip to content

Commit

Permalink
Adding nil checks for version histories (#3724)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Nov 4, 2020
1 parent f3b9fdb commit 995c4dc
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 6 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,7 @@ const (
TaskLatencyPerDomain
TaskFailuresPerDomain
TaskDiscardedPerDomain
TaskUnsupportedPerDomain
TaskAttemptTimerPerDomain
TaskStandbyRetryCounterPerDomain
TaskPendingActiveCounterPerDomain
Expand Down Expand Up @@ -2187,6 +2188,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskAttemptTimerPerDomain: {metricName: "task_attempt_per_domain", metricRollupName: "task_attempt", metricType: Timer},
TaskFailuresPerDomain: {metricName: "task_errors_per_domain", metricRollupName: "task_errors", metricType: Counter},
TaskDiscardedPerDomain: {metricName: "task_errors_discarded_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter},
TaskUnsupportedPerDomain: {metricName: "task_errors_unsupported_per_domain", metricRollupName: "task_errors_discarded", metricType: Counter},
TaskStandbyRetryCounterPerDomain: {metricName: "task_errors_standby_retry_counter_per_domain", metricRollupName: "task_errors_standby_retry_counter", metricType: Counter},
TaskPendingActiveCounterPerDomain: {metricName: "task_errors_pending_active_counter_per_domain", metricRollupName: "task_errors_pending_active_counter", metricType: Counter},
TaskNotActiveCounterPerDomain: {metricName: "task_errors_not_active_counter_per_domain", metricRollupName: "task_errors_not_active_counter", metricType: Counter},
Expand Down
2 changes: 2 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var (
ErrMissingChildWorkflowInitiatedEvent = &workflow.InternalServiceError{Message: "unable to get child workflow initiated event"}
// ErrEventsAfterWorkflowFinish is the error indicating server error trying to write events after workflow finish event
ErrEventsAfterWorkflowFinish = &workflow.InternalServiceError{Message: "error validating last event being workflow finish event"}
// ErrMissingVersionHistories is the error indicating cadence failed to process 2dc workflow type.
ErrMissingVersionHistories = &workflow.BadRequestError{Message: "versionHistories is empty, which is required for NDC feature. It's probably from deprecated 2dc workflows"}
)

type (
Expand Down
3 changes: 3 additions & 0 deletions service/history/execution/state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func (b *stateBuilderImpl) ApplyEvents(
return nil, err
}
versionHistories := b.mutableState.GetVersionHistories()
if versionHistories == nil {
return nil, ErrMissingVersionHistories
}
versionHistory, err := versionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion service/history/execution/state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (r *stateRebuilderImpl) Rebuild(
if err := rebuiltMutableState.SetCurrentBranchToken(targetBranchToken); err != nil {
return nil, 0, err
}
currentVersionHistory, err := rebuiltMutableState.GetVersionHistories().GetCurrentVersionHistory()
rebuildVersionHistories := rebuiltMutableState.GetVersionHistories()
if rebuildVersionHistories == nil {
return nil, 0, ErrMissingVersionHistories
}
currentVersionHistory, err := rebuildVersionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, 0, err
}
Expand Down
6 changes: 6 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2534,6 +2534,9 @@ func (e *historyEngineImpl) ResetWorkflowExecution(
resetRunID := uuid.New()
baseRebuildLastEventID := request.GetDecisionFinishEventId() - 1
baseVersionHistories := baseMutableState.GetVersionHistories()
if baseVersionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3162,6 +3165,9 @@ func (e *historyEngineImpl) ReapplyEvents(
}

baseVersionHistories := mutableState.GetVersionHistories()
if baseVersionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
Expand Down
13 changes: 11 additions & 2 deletions service/history/ndc/branch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (r *branchManagerImpl) prepareVersionHistory(
}

localVersionHistories := r.mutableState.GetVersionHistories()
if localVersionHistories == nil {
return false, 0, execution.ErrMissingVersionHistories
}
versionHistory, err := localVersionHistories.GetVersionHistory(versionHistoryIndex)
if err != nil {
return false, 0, err
Expand Down Expand Up @@ -151,7 +154,9 @@ func (r *branchManagerImpl) flushBufferedEvents(
) (int, *persistence.VersionHistoryItem, error) {

localVersionHistories := r.mutableState.GetVersionHistories()

if localVersionHistories == nil {
return 0, nil, execution.ErrMissingVersionHistories
}
versionHistoryIndex, lcaVersionHistoryItem, err := localVersionHistories.FindLCAVersionHistoryIndexAndItem(
incomingVersionHistory,
)
Expand Down Expand Up @@ -249,7 +254,11 @@ func (r *branchManagerImpl) createNewBranch(
if err := newVersionHistory.SetBranchToken(resp.NewBranchToken); err != nil {
return 0, err
}
branchChanged, newIndex, err := r.mutableState.GetVersionHistories().AddVersionHistory(
versionHistory := r.mutableState.GetVersionHistories()
if versionHistory == nil {
return 0, execution.ErrMissingVersionHistories
}
branchChanged, newIndex, err := versionHistory.AddVersionHistory(
newVersionHistory,
)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions service/history/ndc/conflict_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (r *conflictResolverImpl) prepareMutableState(
) (execution.MutableState, bool, error) {

versionHistories := r.mutableState.GetVersionHistories()
if versionHistories == nil {
return nil, false, execution.ErrMissingVersionHistories
}
currentVersionHistoryIndex := versionHistories.GetCurrentVersionHistoryIndex()

// replication task to be applied to current branch
Expand Down Expand Up @@ -123,6 +126,9 @@ func (r *conflictResolverImpl) rebuild(
) (execution.MutableState, error) {

versionHistories := r.mutableState.GetVersionHistories()
if versionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
replayVersionHistory, err := versionHistories.GetVersionHistory(branchIndex)
if err != nil {
return nil, err
Expand Down Expand Up @@ -156,6 +162,9 @@ func (r *conflictResolverImpl) rebuild(

// after rebuilt verification
rebuildVersionHistories := rebuildMutableState.GetVersionHistories()
if rebuildVersionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
rebuildVersionHistory, err := rebuildVersionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (r *historyReplicatorImpl) applyEvents(
case nil:
// Sanity check to make only 3DC mutable state here
if mutableState.GetVersionHistories() == nil {
return &shared.InternalServiceError{Message: "The mutable state does not support 3DC."}
return execution.ErrMissingVersionHistories
}

doContinue, branchIndex, err := r.applyNonStartEventsPrepareBranch(ctx, context, mutableState, task)
Expand Down Expand Up @@ -514,7 +514,11 @@ func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithoutCon
task.getLastEvent().GetEventId(),
task.getLastEvent().GetVersion(),
)
versionHistory, err := mutableState.GetVersionHistories().GetVersionHistory(branchIndex)
versionHistories := mutableState.GetVersionHistories()
if versionHistories == nil {
return execution.ErrMissingVersionHistories
}
versionHistory, err := versionHistories.GetVersionHistory(branchIndex)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/ndc/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ func (r *transactionManagerImpl) backfillWorkflowEventsReapply(
}

baseVersionHistories := baseMutableState.GetVersionHistories()
if baseVersionHistories == nil {
return 0, execution.TransactionPolicyActive, execution.ErrMissingVersionHistories
}
baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
if err != nil {
return 0, execution.TransactionPolicyActive, err
Expand Down
3 changes: 3 additions & 0 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (r *workflowResetterImpl) getBaseBranchToken(
}()

baseVersionHistories := baseWorkflow.GetMutableState().GetVersionHistories()
if baseVersionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
index, err := baseVersionHistories.FindFirstVersionHistoryIndexByItem(
persistence.NewVersionHistoryItem(baseLastEventID, baseLastEventVersion),
)
Expand Down
6 changes: 5 additions & 1 deletion service/history/task/standby_task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ func getHistoryResendInfo(
mutableState execution.MutableState,
) (*historyResendInfo, error) {

currentBranch, err := mutableState.GetVersionHistories().GetCurrentVersionHistory()
versionHistories := mutableState.GetVersionHistories()
if versionHistories == nil {
return nil, execution.ErrMissingVersionHistories
}
currentBranch, err := versionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions service/history/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ func (t *taskBase) HandleErr(
err = nil
}

if err == execution.ErrMissingVersionHistories {
t.logger.Error("Encounter 2DC workflow during task processing.")
t.scope.IncCounter(metrics.TaskUnsupportedPerDomain)
err = nil
}

// this is a transient error
// TODO remove this error check special case
// since the new task life cycle will not give up until task processed / verified
Expand Down
3 changes: 3 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,9 @@ func (t *transferActiveTaskExecutor) resetWorkflow(
resetRunID := uuid.New()
baseRebuildLastEventID := resetPoint.GetFirstDecisionCompletedId() - 1
baseVersionHistories := baseMutableState.GetVersionHistories()
if baseVersionHistories == nil {
return execution.ErrMissingVersionHistories
}
baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
if err != nil {
return err
Expand Down

0 comments on commit 995c4dc

Please sign in to comment.