Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Adding cleanupOnFailure to PhaseInfo #333

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type PhaseInfo struct {
err *core.ExecutionError
// reason why the current phase exists.
reason string
// cleanupOnFailure indicates that this task should be cleaned up even though the phase indicates a failure. This
// applies to situations where a task is marked a failure but is still running, for example an ImagePullBackoff in
// a k8s Pod where the image does not exist will continually reattempt the pull even though it will never succeed.
cleanupOnFailure bool
}

func (p PhaseInfo) Phase() Phase {
Expand All @@ -139,6 +143,10 @@ func (p PhaseInfo) Err() *core.ExecutionError {
return p.err
}

func (p PhaseInfo) CleanupOnFailure() bool {
return p.cleanupOnFailure
}

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
return PhaseInfo{
phase: p.phase,
Expand All @@ -159,7 +167,7 @@ func (p PhaseInfo) String() string {
// PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error
var PhaseInfoUndefined = PhaseInfo{phase: PhaseUndefined}

func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo {
if info == nil {
info = &TaskInfo{}
}
Expand All @@ -168,69 +176,74 @@ func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) Phas
info.OccurredAt = &t
}
return PhaseInfo{
phase: p,
version: v,
info: info,
err: err,
phase: p,
version: v,
info: info,
err: err,
cleanupOnFailure: cleanupOnFailure,
}
}

// Return in the case the plugin is not ready to start
func PhaseInfoNotReady(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseNotReady, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

// Deprecated: Please use PhaseInfoWaitingForResourcesInfo instead
func PhaseInfoWaitingForResources(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseWaitingForResources, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

// Return in the case the plugin is not ready to start
func PhaseInfoWaitingForResourcesInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseWaitingForResources, version, nil, info)
pi := phaseInfo(PhaseWaitingForResources, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t})
pi := phaseInfo(PhaseQueued, version, nil, &TaskInfo{OccurredAt: &t}, false)
pi.reason = reason
return pi
}

func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, info)
pi := phaseInfo(PhaseQueued, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {

pi := phaseInfo(PhaseInitializing, version, nil, info)
pi := phaseInfo(PhaseInitializing, version, nil, info, false)
pi.reason = reason
return pi
}

func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
func phaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo {
if err == nil {
err = &core.ExecutionError{
Code: "Unknown",
Message: "Unknown error message",
}
}
return phaseInfo(p, DefaultPhaseVersion, err, info)
return phaseInfo(p, DefaultPhaseVersion, err, info, cleanupOnFailure)
}

func PhaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
return phaseInfo(p, DefaultPhaseVersion, err, info, false)
}

func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseRunning, version, nil, info)
return phaseInfo(PhaseRunning, version, nil, info, false)
}

func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info)
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false)
}

func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo {
Expand All @@ -245,11 +258,15 @@ func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info)
}

func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true)
}

func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info)
}

// Creates a new PhaseInfo with phase set to PhaseWaitingForCache
func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseWaitingForCache, version, nil, info)
return phaseInfo(PhaseWaitingForCache, version, nil, info, false)
}
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {

case "ImagePullBackOff":
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
default:
Expand Down