Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Retry map task subtasks #236

Merged
merged 9 commits into from
Feb 3, 2022
35 changes: 17 additions & 18 deletions go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,24 +256,23 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl
func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus.ArraySummary) Phase {
totalCount := int64(0)
totalSuccesses := int64(0)
totalFailures := int64(0)
totalPermanentFailures := int64(0)
totalRetryableFailures := int64(0)
totalRunning := int64(0)
totalWaitingForResources := int64(0)
for phase, count := range summary {
totalCount += count
if phase.IsTerminal() {
if phase.IsSuccess() {
totalSuccesses += count
} else {
// TODO: Split out retryable failures to be retried without doing the entire array task.
// TODO: Other option: array tasks are only retryable as a full set and to get single task retriability
// TODO: dynamic_task must be updated to not auto-combine to array tasks. For scale reasons, it is
// TODO: preferable to auto-combine to array tasks for now.
totalFailures += count
}
} else if phase.IsWaitingForResources() {

switch phase {
case core.PhaseSuccess:
totalSuccesses += count
case core.PhasePermanentFailure:
totalPermanentFailures += count
case core.PhaseRetryableFailure:
totalRetryableFailures += count
case core.PhaseWaitingForResources:
totalWaitingForResources += count
} else {
default:
totalRunning += count
}
}
Expand All @@ -284,9 +283,9 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
}

// No chance to reach the required success numbers.
if totalRunning+totalSuccesses+totalWaitingForResources < minSuccesses {
logger.Infof(ctx, "Array failed early because total failures > minSuccesses[%v]. Snapshot totalRunning[%v] + totalSuccesses[%v] + totalWaitingForResource[%v]",
minSuccesses, totalRunning, totalSuccesses, totalWaitingForResources)
if totalRunning+totalSuccesses+totalWaitingForResources+totalRetryableFailures < minSuccesses {
logger.Infof(ctx, "Array failed early because total failures > minSuccesses[%v]. Snapshot totalRunning[%v] + totalSuccesses[%v] + totalWaitingForResource[%v] + totalRetryableFailures[%v]",
minSuccesses, totalRunning, totalSuccesses, totalWaitingForResources, totalRetryableFailures)
return PhaseWriteToDiscoveryThenFail
}

Expand All @@ -299,8 +298,8 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
return PhaseWriteToDiscovery
}

logger.Debugf(ctx, "Array is still running [Successes: %v, Failures: %v, Total: %v, MinSuccesses: %v]",
totalSuccesses, totalFailures, totalCount, minSuccesses)
logger.Debugf(ctx, "Array is still running [Successes: %v, PermanentFailures: %v, RetryableFailures: %v, Total: %v, MinSuccesses: %v]",
totalSuccesses, totalPermanentFailures, totalRetryableFailures, totalCount, minSuccesses)
return PhaseCheckingSubTaskExecutions
}

Expand Down
60 changes: 60 additions & 0 deletions go/tasks/plugins/array/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,63 @@ func TestToArrayJob(t *testing.T) {
}))
})
}

func TestSummaryToPhase(t *testing.T) {
minSuccesses := int64(10)
tests := []struct {
name string
phase Phase
summary map[core.Phase]int64
}{
{
"FailOnTooFewTasks",
PhaseWriteToDiscoveryThenFail,
map[core.Phase]int64{},
},
{
"ContinueOnRetryableFailures",
PhaseCheckingSubTaskExecutions,
map[core.Phase]int64{
core.PhaseRetryableFailure: 1,
core.PhaseUndefined: 9,
},
},
{
"FailOnToManyPermanentFailures",
PhaseWriteToDiscoveryThenFail,
map[core.Phase]int64{
core.PhasePermanentFailure: 1,
core.PhaseUndefined: 9,
},
},
{
"CheckWaitingForResources",
PhaseWaitingForResources,
map[core.Phase]int64{
core.PhaseWaitingForResources: 1,
core.PhaseUndefined: 9,
},
},
{
"WaitForAllSubtasksToComplete",
PhaseCheckingSubTaskExecutions,
map[core.Phase]int64{
core.PhaseUndefined: 1,
core.PhaseSuccess: 9,
},
},
{
"SuccessfullyCompleted",
PhaseWriteToDiscovery,
map[core.Phase]int64{
core.PhaseSuccess: 10,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.phase, SummaryToPhase(context.TODO(), minSuccesses, tt.summary))
})
}
}
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
nextState, err = array.WriteToDiscovery(ctx, tCtx, pluginState, arrayCore.PhaseAssembleFinalOutput)

case arrayCore.PhaseAssembleFinalError:
nextState, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhaseRetryableFailure, pluginState)
nextState, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhasePermanentFailure, pluginState)

default:
nextState = pluginState
Expand Down
14 changes: 12 additions & 2 deletions go/tasks/plugins/array/k8s/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"context"
"fmt"
"strconv"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils"

Expand Down Expand Up @@ -33,8 +34,17 @@ var arrayJobEnvVars = []corev1.EnvVar{
},
}

func formatSubTaskName(_ context.Context, parentName, suffix string) (subTaskName string) {
return utils.ConvertToDNS1123SubdomainCompatibleString(fmt.Sprintf("%v-%v", parentName, suffix))
func formatSubTaskName(_ context.Context, parentName string, index int, retryAttempt uint64) (subTaskName string) {
indexStr := strconv.Itoa(index)

// If the retryAttempt is 0 we do not include it in the pod name. The gives us backwards
// compatibility in the ability to dynamically transition running map tasks to use subtask retries.
if retryAttempt == 0 {
return utils.ConvertToDNS1123SubdomainCompatibleString(fmt.Sprintf("%v-%v", parentName, indexStr))
}
Comment on lines +42 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!


retryAttemptStr := strconv.FormatUint(retryAttempt, 10)
return utils.ConvertToDNS1123SubdomainCompatibleString(fmt.Sprintf("%v-%v-%v", parentName, indexStr, retryAttemptStr))
}

func ApplyPodPolicies(_ context.Context, cfg *Config, pod *corev1.Pod) *corev1.Pod {
Expand Down
23 changes: 23 additions & 0 deletions go/tasks/plugins/array/k8s/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package k8s

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -39,3 +40,25 @@ func TestApplyPodTolerations(t *testing.T) {

assert.Equal(t, pod.Spec.Tolerations, cfg.Tolerations)
}

func TestFormatSubTaskName(t *testing.T) {
ctx := context.Background()
parentName := "foo"

tests := []struct {
index int
retryAttempt uint64
want string
}{
{0, 0, fmt.Sprintf("%v-%v", parentName, 0)},
{1, 0, fmt.Sprintf("%v-%v", parentName, 1)},
{0, 1, fmt.Sprintf("%v-%v-%v", parentName, 0, 1)},
{1, 1, fmt.Sprintf("%v-%v-%v", parentName, 1, 1)},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("format-subtask-name-%v", i), func(t *testing.T) {
assert.Equal(t, tt.want, formatSubTaskName(ctx, parentName, tt.index, tt.retryAttempt))
})
}
}
44 changes: 34 additions & 10 deletions go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package k8s
import (
"context"
"fmt"
"strconv"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
Expand Down Expand Up @@ -73,10 +72,8 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return currentState, logLinks, subTaskIDs, nil
}

// Currently if any subtask fails then all subtasks are retried up to MaxAttempts. Therefore, all
// subtasks have an identical RetryAttempt, namely that of the map task execution metadata. Once
// retries over individual subtasks are implemented we should revisit this logic and instead
// increment the RetryAttempt for each subtask everytime a new pod is created.
// Set subtask retryAttempts using the existing task context retry attempt. For new tasks
// this will initialize to 0, but running tasks will use the existing retry attempt.
retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt)
for i := 0; i < currentState.GetExecutionArraySize(); i++ {
retryAttemptsArray.SetItem(i, retryAttempt)
Expand All @@ -93,20 +90,38 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon

for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() {
existingPhase := core.Phases[existingPhaseIdx]
indexStr := strconv.Itoa(childIdx)
podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr)
originalIdx := arrayCore.CalculateOriginalIndex(childIdx, newState.GetIndexesToCache())

retryAttempt := currentState.RetryAttempts.GetItem(childIdx)
podName := formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), childIdx, retryAttempt)

if existingPhase.IsTerminal() {
// If we get here it means we have already "processed" this terminal phase since we will only persist
// the phase after all processing is done (e.g. check outputs/errors file, record events... etc.).

// Since we know we have already "processed" this terminal phase we can safely deallocate resource
err = deallocateResource(ctx, tCtx, config, childIdx)
err = deallocateResource(ctx, tCtx, config, podName)
if err != nil {
logger.Errorf(ctx, "Error releasing allocation token [%s] in LaunchAndCheckSubTasks [%s]", podName, err)
return currentState, logLinks, subTaskIDs, errors2.Wrapf(ErrCheckPodStatus, err, "Error releasing allocation token.")
}

// If a subtask is marked as a retryable failure we check if the number of retries
// exceeds the maximum attempts. If so, transition the task to a permanent failure
// so that is not attempted again. If it can be retried, increment the retry attempts
// value and transition the task to "Undefined" so that it is reevaluated.
if existingPhase == core.PhaseRetryableFailure {
if uint32(retryAttempt+1) < tCtx.TaskExecutionMetadata().GetMaxAttempts() {
newState.RetryAttempts.SetItem(childIdx, retryAttempt+1)

newArrayStatus.Summary.Inc(core.PhaseUndefined)
newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(core.PhaseUndefined))
continue
} else {
existingPhase = core.PhasePermanentFailure
}
}

newArrayStatus.Summary.Inc(existingPhase)
newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(existingPhase))

Expand All @@ -117,6 +132,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
},
originalIdx,
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt,
retryAttempt,
logPlugin)

if err != nil {
Expand Down Expand Up @@ -209,7 +225,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
return newState, logLinks, subTaskIDs, nil
}

func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8sTypes.NamespacedName, index int, retryAttempt uint32, logPlugin tasklog.Plugin) (
func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8sTypes.NamespacedName, index int, retryAttempt uint32, subtaskRetryAttempt uint64, logPlugin tasklog.Plugin) (
info core.PhaseInfo, err error) {

pod := &v1.Pod{
Expand Down Expand Up @@ -244,12 +260,20 @@ func FetchPodStatusAndLogs(ctx context.Context, client core.KubeClient, name k8s
}

if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
// We append the subtaskRetryAttempt to the log name only when it is > 0 to ensure backwards
// compatibility when dynamically transitioning running map tasks to use subtask retry attempts.
var logName string
if subtaskRetryAttempt == 0 {
logName = fmt.Sprintf(" #%d-%d", retryAttempt, index)
} else {
logName = fmt.Sprintf(" #%d-%d-%d", retryAttempt, index, subtaskRetryAttempt)
}

if logPlugin != nil {
o, err := logPlugin.GetTaskLogs(tasklog.Input{
PodName: pod.Name,
Namespace: pod.Namespace,
LogName: fmt.Sprintf(" #%d-%d", retryAttempt, index),
LogName: logName,
PodUnixStartTime: pod.CreationTimestamp.Unix(),
})

Expand Down
71 changes: 71 additions & 0 deletions go/tasks/plugins/array/k8s/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,77 @@ func TestCheckSubTasksState(t *testing.T) {
resourceManager.AssertNumberOfCalls(t, "AllocateResource", 5)
assert.Empty(t, subTaskIDs, "subtask ids are only populated when monitor is called for a successfully launched task")
})

t.Run("RetryableSubtaskFailure", func(t *testing.T) {
failureIndex := 2

config := Config{
MaxArrayJobSize: 100,
MaxErrorStringLength: 200,
}

detailed := arrayCore.NewPhasesCompactArray(uint(5))
detailed.SetItem(failureIndex, bitarray.Item(core.PhaseRetryableFailure))

retryAttemptsArray, err := bitarray.NewCompactArray(5, bitarray.Item(1))
assert.NoError(t, err)

cacheIndexes := bitarray.NewBitSet(5)
newState, _, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
ExecutionArraySize: 5,
OriginalArraySize: 10,
OriginalMinSuccesses: 5,
IndexesToCache: cacheIndexes,
ArrayStatus: arraystatus.ArrayStatus{
Detailed: detailed,
},
RetryAttempts: retryAttemptsArray,
})

assert.Nil(t, err)

p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())
assert.Equal(t, core.PhaseUndefined, core.Phases[newState.ArrayStatus.Detailed.GetItem(failureIndex)])
assert.Equal(t, uint64(1), newState.RetryAttempts.GetItem(failureIndex))
})

t.Run("PermanentSubtaskFailure", func(t *testing.T) {
failureIndex := 2

config := Config{
MaxArrayJobSize: 100,
MaxErrorStringLength: 200,
}

detailed := arrayCore.NewPhasesCompactArray(uint(5))
detailed.SetItem(failureIndex, bitarray.Item(core.PhaseRetryableFailure))

retryAttemptsArray, err := bitarray.NewCompactArray(5, bitarray.Item(1))
assert.NoError(t, err)
retryAttemptsArray.SetItem(failureIndex, bitarray.Item(1))

cacheIndexes := bitarray.NewBitSet(5)
newState, _, _, err := LaunchAndCheckSubTasksState(ctx, tCtx, &kubeClient, &config, nil, "/prefix/", "/prefix-sand/", &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
ExecutionArraySize: 5,
OriginalArraySize: 10,
OriginalMinSuccesses: 5,
IndexesToCache: cacheIndexes,
ArrayStatus: arraystatus.ArrayStatus{
Detailed: detailed,
},
RetryAttempts: retryAttemptsArray,
})

assert.Nil(t, err)

p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())
assert.Equal(t, core.PhasePermanentFailure, core.Phases[newState.ArrayStatus.Detailed.GetItem(failureIndex)])
assert.Equal(t, uint64(1), newState.RetryAttempts.GetItem(failureIndex))
})
}

func TestCheckSubTasksStateResourceGranted(t *testing.T) {
Expand Down
Loading