Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce maptask transitions between WaitingForResources and CheckingSubtaskExecutions #4790

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@
case arrayCore.PhasePreLaunch:
nextState = pluginState.SetPhase(arrayCore.PhaseLaunch, version+1).SetReason("Nothing to do in PreLaunch phase.")

case arrayCore.PhaseWaitingForResources:
fallthrough

case arrayCore.PhaseLaunch:
// In order to maintain backwards compatibility with the state transitions
// in the aws batch plugin. Forward to PhaseCheckingSubTasksExecutions where the launching
// is actually occurring.
nextState = pluginState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, version+1).SetReason("Nothing to do in Launch phase.")
err = nil

case arrayCore.PhaseWaitingForResources:
fallthrough

Check warning on line 114 in flyteplugins/go/tasks/plugins/array/k8s/executor.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/executor.go#L113-L114

Added lines #L113 - L114 were not covered by tests

case arrayCore.PhaseCheckingSubTaskExecutions:
nextState, externalResources, err = LaunchAndCheckSubTasksState(ctx, tCtx, e.kubeClient, pluginConfig,
tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState)
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}

_, version := currentState.GetPhase()
if phase == arrayCore.PhaseCheckingSubTaskExecutions {
if phase == arrayCore.PhaseCheckingSubTaskExecutions || phase == arrayCore.PhaseWaitingForResources {
newSubTaskPhaseHash, err := newState.GetArrayStatus().HashCode()
if err != nil {
return currentState, externalResources, err
Expand All @@ -316,7 +316,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
version++
}

newState = newState.SetPhase(phase, version).SetReason("Task is still running")
newState = newState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, version).SetReason("Task is still running")
} else {
newState = newState.SetPhase(phase, version+1)
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestCheckSubTasksState(t *testing.T) {
// validate results
assert.Nil(t, err)
p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseWaitingForResources.String(), p.String())
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())
resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)
for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {
assert.Equal(t, core.PhaseWaitingForResources, core.Phases[subtaskPhaseIndex])
Expand Down
Loading