Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] fix ArrayNode state's TaskPhase reset #5451

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,18 +576,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
taskPhase = idlcore.TaskExecution_FAILED
}

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise
// increment it if we detect any changes in subNode state.
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
} else if incrementTaskPhaseVersion {
// increment taskPhaseVersion if we detect any changes in subNode state.
if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
}
Comment on lines +589 to +593
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't setting this afterwards cause problems with admin requiring incremental values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario of interest would be if we would lose some eventing data if we have incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase + have a previous event emitted with the same TaskPhase and TaskPhaseVersion.

We set incrementTaskPhaseVersion = True if there's a subnode phase change. The arrayNodeState.Phase is updated in 3 different places: arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing, arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding, and arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting.

For arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing and arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding, there would have to be subnode phase change so we couldn't have a scenario where incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase.

For arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting, we shouldn't have a previous event emitted with the same TaskPhase and TaskPhaseVersion as we should only be in the v1alpha1.ArrayNodePhaseNone phase for the first pass through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proceeding loops would have a new TaskPhase as well.

Copy link
Contributor

@hamersaw hamersaw Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario of interest would be if we would lose some eventing data if we have incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase + have a previous event emitted with the same TaskPhase and TaskPhaseVersion.
Do we have a repro for this? I'm having difficulty understanding how this is possible. If incrementPhase == False then no subnode phases have been updated so currentArrayNodePhase != arrayNodeState.Phase cannot be true since they are determined from subNode phases - unless this is a system failure in the handler logic?

Then taskPhase is deterministic on currentArrayNodePhase, so if currentArrayNodePhase != arrayNodeState.Phase then we can not have emitted an event with the same taskPhase and taskPhaseVersion. FlyteAdmin does not take taskPhaseVersion into accord for new taskPhase values.

I'm probably missing something here, a repro would help. Or is there an issue this should be linked to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hamersaw I noticed this when working on mapping over launch plans but don't think I actually ran into the bug. However this should repro an issue.

@task
def hello(num: int) -> int:
    if num > 9:
        time.sleep(10)
        raise Exception("This is a test exception")
    return num


@workflow
def map_workflow():
    map_task(hello, min_success_ratio=0.5)(num=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

A subnode task phase will be stuck in "Running" even though it has terminated/failed.

Emitting the event always fails due to the task phase not getting bumped but the task phase version getting reset.

Copy link
Contributor Author

@pvditt pvditt Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this doesn't occur when the staggered subtask succeeds. Looking back into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out:

this doesn't bubble up for:

@task
def hello(num: int) -> int:
    if num > 9:
        time.sleep(10)
    return num


@workflow
def map_workflow():
    map_task(hello, min_success_ratio=0.5)(num=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

for _, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)
switch nodePhase {
case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRecovered, v1alpha1.NodePhaseSkipped:
successCount++
case v1alpha1.NodePhaseFailing:
failingCount++
case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseTimedOut:
failedCount++
default:
runningCount++
}
}
doesn't check for NodePhaseSucceeding so it counts as a running task so we don't bump the arrayNode phase here:
if len(arrayNodeState.SubNodePhases.GetItems())-failedCount < minSuccesses {
// no chance to reach the minimum number of successes
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing
} else if successCount >= minSuccesses && runningCount == 0 {
// wait until all tasks have completed before declaring success
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding
}

Since we don't do that we bump task phase version instead of resetting to 0 to the event lands in admin. Meanwhile for NodePhaseFailing, we don't increment running so we update arrayNodeState.Phase which then leads for the existing implementation to reset the task phase version.

}

// update array node status
Expand Down
Loading