From 84030bd81d5434efdbb50667c7fb29dda8a8317a Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 20 Apr 2023 15:20:50 -0500 Subject: [PATCH] adding cleanupOnFailure to PhaseInfo (#333) Signed-off-by: Daniel Rammer --- go/tasks/pluginmachinery/core/phase.go | 49 +++++++++++++------ .../pluginmachinery/flytek8s/pod_helper.go | 2 +- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 5b9a7e110..51d3e4e81 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -117,6 +117,10 @@ type PhaseInfo struct { err *core.ExecutionError // reason why the current phase exists. reason string + // cleanupOnFailure indicates that this task should be cleaned up even though the phase indicates a failure. This + // applies to situations where a task is marked a failure but is still running, for example an ImagePullBackoff in + // a k8s Pod where the image does not exist will continually reattempt the pull even though it will never succeed. + cleanupOnFailure bool } func (p PhaseInfo) Phase() Phase { @@ -139,6 +143,10 @@ func (p PhaseInfo) Err() *core.ExecutionError { return p.err } +func (p PhaseInfo) CleanupOnFailure() bool { + return p.cleanupOnFailure +} + func (p PhaseInfo) WithVersion(version uint32) PhaseInfo { return PhaseInfo{ phase: p.phase, @@ -159,7 +167,7 @@ func (p PhaseInfo) String() string { // PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error var PhaseInfoUndefined = PhaseInfo{phase: PhaseUndefined} -func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) PhaseInfo { +func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo { if info == nil { info = &TaskInfo{} } @@ -168,69 +176,74 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) Phas info.OccurredAt = &t } return PhaseInfo{ - phase: p, - version: v, - info: info, - err: err, + phase: p, + version: v, + info: info, + err: err, + cleanupOnFailure: cleanupOnFailure, } } // Return in the case the plugin is not ready to start func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo { - pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}) + pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false) pi.reason = reason return pi } // Deprecated: Please use PhaseInfoWaitingForResourcesInfo instead func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfo { - pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t}) + pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t}, false) pi.reason = reason return pi } // Return in the case the plugin is not ready to start func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo { - pi := phaseInfo(PhaseWaitingForResources, version, nil, info) + pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false) pi.reason = reason return pi } func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo { - pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t}) + pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t}, false) pi.reason = reason return pi } func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo { - pi := phaseInfo(PhaseQueued, version, nil, info) + pi := phaseInfo(PhaseQueued, version, nil, info, false) pi.reason = reason return pi } func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo { - pi := phaseInfo(PhaseInitializing, version, nil, info) + pi := phaseInfo(PhaseInitializing, version, nil, info, false) pi.reason = reason return pi } -func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo { +func phaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo { if err == nil { err = &core.ExecutionError{ Code: "Unknown", Message: "Unknown error message", } } - return phaseInfo(p, DefaultPhaseVersion, err, info) + return phaseInfo(p, DefaultPhaseVersion, err, info, cleanupOnFailure) +} + +func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo { + return phaseInfo(p, DefaultPhaseVersion, err, info, false) } func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo { - return phaseInfo(PhaseRunning, version, nil, info) + return phaseInfo(PhaseRunning, version, nil, info, false) } func PhaseInfoSuccess(info *TaskInfo) PhaseInfo { - return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info) + return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false) } func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo { @@ -245,11 +258,15 @@ func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo { return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info) } +func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo { + return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true) +} + func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo { return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info) } // Creates a new PhaseInfo with phase set to PhaseWaitingForCache func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo { - return phaseInfo(PhaseWaitingForCache, version, nil, info) + return phaseInfo(PhaseWaitingForCache, version, nil, info, false) } diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 2e4493ccc..df75b92a9 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -521,7 +521,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { case "ImagePullBackOff": t := c.LastTransitionTime.Time - return pluginsCore.PhaseInfoRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ + return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, }), nil default: