Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] #4198 : Introduce PhaseInfoFailureWithCleanup for Handling Non-Recoverable Pod States #4297

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 116 additions & 75 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"github.com/flyteorg/flyte/flytestdlib/logger"
)



const PodKind = "pod"
const OOMKilled = "OOMKilled"
const Interrupted = "Interrupted"
Expand Down Expand Up @@ -554,6 +556,31 @@
}
}

type PhaseInfoFailureWithCleanup struct {
PhaseVersion pluginsCore.PhaseVersion

Check failure on line 560 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View workflow job for this annotation

GitHub Actions / compile

undefined: core.PhaseVersion
Reason string
Message string
TaskInfo *pluginsCore.TaskInfo
CleanupInfo *CleanupInfo
}

// CleanupInfo represents information for cleanup tasks.
type CleanupInfo struct {
CleanupReason string
CleanupMessage string
}

// NewPhaseInfoFailureWithCleanup creates a new PhaseInfoFailureWithCleanup instance.
func NewPhaseInfoFailureWithCleanup(reason, message string, taskInfo *pluginsCore.TaskInfo, cleanupInfo *CleanupInfo) *PhaseInfoFailureWithCleanup {
return &PhaseInfoFailureWithCleanup{
PhaseVersion: pluginsCore.DefaultPhaseVersion,
Reason: reason,
Message: message,
TaskInfo: taskInfo,
CleanupInfo: cleanupInfo,
}
}

adarsh-jha-dev marked this conversation as resolved.
Show resolved Hide resolved
// DemystifyPending is one the core functions, that helps FlytePropeller determine if a pending pod is indeed pending,
// or it is actually stuck in a un-reparable state. In such a case the pod should be marked as dead and the task should
// be retried. This has to be handled sadly, as K8s is still largely designed for long running services that should
Expand Down Expand Up @@ -753,83 +780,97 @@
// DemystifyFailure resolves the various Kubernetes pod failure modes to determine
// the most appropriate course of action
func DemystifyFailure(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
code := "UnknownError"
message := "Pod failed. No message received from kubernetes."
if len(status.Reason) > 0 {
code = status.Reason
}

if len(status.Message) > 0 {
message = status.Message
}

//
// Handle known pod statuses
//
// This is useful for handling node interruption events
// which can be different between providers and versions of Kubernetes. Given that
// we don't have a consistent way of detecting interruption events, we will be
// documenting all possibilities as follows. We will also be handling these as
// system retryable failures that do not count towards user-specified task retries,
// for now. This is required for FlytePropeller to correctly transition
// interruptible nodes to non-interruptible ones after the
// `interruptible-failure-threshold` is exceeded. See:
// https://github.com/flyteorg/flytepropeller/blob/a3c6e91f19c19601a957b29891437112868845de/pkg/controller/nodes/node_exec_context.go#L213

// GKE (>= v1.20) Kubelet graceful node shutdown
// See: https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms#graceful-shutdown
// Cloud audit log for patch of Pod object during graceful node shutdown:
// request: {
// @type: "k8s.io/Patch"
// status: {
// conditions: null
// message: "Pod Node is in progress of shutting down, not admitting any new pods"
// phase: "Failed"
// qosClass: null
// reason: "Shutdown"
// startTime: "2022-01-30T14:24:07Z"
// }
// }
//
// In some versions of GKE the reason can also be "Terminated"
if code == "Shutdown" || code == "Terminated" {
return pluginsCore.PhaseInfoSystemRetryableFailure(Interrupted, message, &info), nil
}

//
// Handle known container statuses
//
for _, c := range append(
append(status.InitContainerStatuses, status.ContainerStatuses...), status.EphemeralContainerStatuses...) {
var containerState v1.ContainerState
if c.LastTerminationState.Terminated != nil {
containerState = c.LastTerminationState
} else if c.State.Terminated != nil {
containerState = c.State
}
if containerState.Terminated != nil {
if strings.Contains(containerState.Terminated.Reason, OOMKilled) {
code = OOMKilled
} else if containerState.Terminated.ExitCode == SIGKILL {
// in some setups, node termination sends SIGKILL to all the containers running on that node. Capturing and
// tagging that correctly.
code = Interrupted
}

if containerState.Terminated.ExitCode == 0 {
message += fmt.Sprintf("\r\n[%v] terminated with ExitCode 0.", c.Name)
} else {
message += fmt.Sprintf("\r\n[%v] terminated with exit code (%v). Reason [%v]. Message: \n%v.",
c.Name,
containerState.Terminated.ExitCode,
containerState.Terminated.Reason,
containerState.Terminated.Message)
}
}
}
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
code := "UnknownError"
message := "Pod failed. No message received from Kubernetes."
if len(status.Reason) > 0 {
code = status.Reason
}

if len(status.Message) > 0 {
message = status.Message
}

//
// Handle known pod statuses
//
// This is useful for handling node interruption events
// which can be different between providers and versions of Kubernetes. Given that
// we don't have a consistent way of detecting interruption events, we will be
// documenting all possibilities as follows. We will also be handling these as
// system retryable failures that do not count towards user-specified task retries,
// for now. This is required for FlytePropeller to correctly transition
// interruptible nodes to non-interruptible ones after the
// `interruptible-failure-threshold` is exceeded. See:
// https://github.com/flyteorg/flytepropeller/blob/a3c6e91f19c19601a957b29891437112868845de/pkg/controller/nodes/node_exec_context.go#L213

// GKE (>= v1.20) Kubelet graceful node shutdown
// See: https://cloud.google.com/kubernetes-engine/docs/how-to/preemptible-vms#graceful-shutdown
// Cloud audit log for patch of Pod object during graceful node shutdown:
// request: {
// @type: "k8s.io/Patch"
// status: {
// conditions: null
// message: "Pod Node is in progress of shutting down, not admitting any new pods"
// phase: "Failed"
// qosClass: null
// reason: "Shutdown"
// startTime: "2022-01-30T14:24:07Z"
// }
// }
//
// In some versions of GKE, the reason can also be "Terminated"
if code == "Shutdown" || code == "Terminated" {
return pluginsCore.PhaseInfoSystemRetryableFailure(Interrupted, message, &info), nil
}

//
// Handle known container statuses
//
for _, c := range append(
append(status.InitContainerStatuses, status.ContainerStatuses...), status.EphemeralContainerStatuses...) {
var containerState v1.ContainerState
if c.LastTerminationState.Terminated != nil {
containerState = c.LastTerminationState
} else if c.State.Terminated != nil {
containerState = c.State
}
if containerState.Terminated != nil {
if strings.Contains(containerState.Terminated.Reason, OOMKilled) {
code = OOMKilled
} else if containerState.Terminated.ExitCode == SIGKILL {
// In some setups, node termination sends SIGKILL to all the containers running on that node. Capturing and
// tagging that correctly.
code = Interrupted
}

if containerState.Terminated.ExitCode == 0 {
message += fmt.Sprintf("\r\n[%v] terminated with ExitCode 0.", c.Name)
} else {
message += fmt.Sprintf("\r\n[%v] terminated with exit code (%v). Reason [%v]. Message: \n%v.",
c.Name,
containerState.Terminated.ExitCode,
containerState.Terminated.Reason,
containerState.Terminated.Message)
}
}
}

// Add a new condition to recognize when cleanup is required
if code == "CleanupRequired" {
cleanupInfo := &CleanupInfo{
CleanupReason: code,
CleanupMessage: message,
}
return NewPhaseInfoFailureWithCleanup(code, message, &info, cleanupInfo), nil

Check failure on line 864 in flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go

View workflow job for this annotation

GitHub Actions / compile

cannot use NewPhaseInfoFailureWithCleanup(code, message, &info, cleanupInfo) (value of type *PhaseInfoFailureWithCleanup) as type "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core".PhaseInfo in return statement
}

// Continue handling other known failure conditions

// Handle unknown failure conditions by default
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
}


func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {
var lastTransitionTime metav1.Time
containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...)
Expand Down
14 changes: 14 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,20 @@ func TestUpdatePodWithDefaultAffinityAndInterruptibleNodeSelectorRequirement(t *
}
}

func TestPhaseInfoFailureWithCleanup(t *testing.T) {
t.Run("cleanup-required", func(t *testing.T) {
phaseInfo := PhaseInfoFailureWithCleanup("CleanupRequired", "Pod cleanup is required", &pluginsCore.TaskInfo{})
assert.Equal(t, pluginsCore.PhaseFailure, phaseInfo.Phase())
assert.Equal(t, "CleanupRequired", phaseInfo.Err().Code)
// Check if the cleanupInfo is not nil
cleanupInfo, ok := phaseInfo.Err().GetInfo().(*CleanupInfo)
assert.True(t, ok)
assert.NotNil(t, cleanupInfo)
assert.Equal(t, "CleanupRequired", cleanupInfo.CleanupReason)
assert.Equal(t, "Pod cleanup is required", cleanupInfo.CleanupMessage)
})
}

func toK8sPodInterruptible(t *testing.T) {
ctx := context.TODO()

Expand Down
Loading