Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add pod pending timeout config #4590

Merged
merged 12 commits into from
Dec 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
ImagePullBackoffGracePeriod: config2.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config2.Duration{
Duration: 0,
},
GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator",
GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size",
GpuResourceName: ResourceNvidiaGPU,
Expand Down Expand Up @@ -149,6 +152,11 @@ type K8sPluginConfig struct {
// one, and the corresponding task marked as failed
ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."`

// Time to wait while pod is in pending phase. If the pod is stuck in
// pending phase past this timeout, it will be inferred to be a permanent
// issue, and the corresponding task marked as failed
PodPendingTimeout config2.Duration `json:"pod-pending-timeout" pflag:"-,Time to wait while pod is stuck in pending."`

// The node label that specifies the attached GPU device.
GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."`

Expand Down
55 changes: 37 additions & 18 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,38 @@ func BuildIdentityPod() *v1.Pod {
// and hence input gates. We should not allow bad requests that Request for large number of resource through.
// In the case it makes through, we will fail after timeout
func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
phaseInfo, t := demystifyPendingHelper(status)

if phaseInfo.Phase().IsTerminal() {
return phaseInfo, nil
}

podPendingTimeout := config.GetK8sPluginConfig().PodPendingTimeout.Duration
if podPendingTimeout > 0 && time.Since(t) >= podPendingTimeout {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup("PodPendingTimeout", phaseInfo.Reason(), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}

if phaseInfo.Phase() != pluginsCore.PhaseUndefined {
return phaseInfo, nil
}

return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
}

func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Time) {
// Search over the difference conditions in the status object. Note that the 'Pending' this function is
// demystifying is the 'phase' of the pod status. This is different than the PodReady condition type also used below
phaseInfo := pluginsCore.PhaseInfoUndefined
t := time.Now()
for _, c := range status.Conditions {
t = c.LastTransitionTime.Time
switch c.Type {
case v1.PodScheduled:
if c.Status == v1.ConditionFalse {
// Waiting to be scheduled. This usually refers to inability to acquire resources.
return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil
return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t
}

case v1.PodReasonUnschedulable:
Expand All @@ -592,7 +616,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// reason: Unschedulable
// status: "False"
// type: PodScheduled
return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil
return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t

case v1.PodReady:
if c.Status == v1.ConditionFalse {
Expand Down Expand Up @@ -637,7 +661,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil
return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t

case "CreateContainerError":
// This may consist of:
Expand All @@ -659,77 +683,72 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// synced, and therefore, only provides an
// approximation of the elapsed time since the last
// transition.
t := c.LastTransitionTime.Time

gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

case "CreateContainerConfigError":
t := c.LastTransitionTime.Time
gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

case "InvalidImageName":
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t

case "ImagePullBackOff":
t := c.LastTransitionTime.Time
gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}

return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

default:
// Since we are not checking for all error states, we may end up perpetually
// in the queued state returned at the bottom of this function, until the Pod is reaped
// by K8s and we get elusive 'pod not found' errors
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}

}
}
}
}
}
}

return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
return phaseInfo, t
}

func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string {
Expand Down
35 changes: 35 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,9 @@ func TestDemystifyPending(t *testing.T) {
ImagePullBackoffGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config1.Duration{
Duration: 0,
},
}))

t.Run("PodNotScheduled", func(t *testing.T) {
Expand Down Expand Up @@ -1478,6 +1481,38 @@ func TestDemystifyPending(t *testing.T) {
})
}

func TestDemystifyPendingTimeout(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
CreateContainerErrorGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
ImagePullBackoffGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config1.Duration{
Duration: 10,
},
}))

s := v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
}
s.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().PodPendingTimeout.Duration)

t.Run("PodPendingExceedsTimeout", func(t *testing.T) {
taskStatus, err := DemystifyPending(s)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code)
})
}

func TestDemystifySuccess(t *testing.T) {
t.Run("OOMKilled", func(t *testing.T) {
phaseInfo, err := DemystifySuccess(v1.PodStatus{
Expand Down
Loading