diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 4e777ee154..55f9cfa68c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -55,6 +55,9 @@ var ( ImagePullBackoffGracePeriod: config2.Duration{ Duration: time.Minute * 3, }, + PodPendingTimeout: config2.Duration{ + Duration: 0, + }, GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator", GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size", GpuResourceName: ResourceNvidiaGPU, @@ -149,6 +152,11 @@ type K8sPluginConfig struct { // one, and the corresponding task marked as failed ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."` + // Time to wait while pod is in pending phase. If the pod is stuck in + // pending phase past this timeout, it will be inferred to be a permanent + // issue, and the corresponding task marked as failed + PodPendingTimeout config2.Duration `json:"pod-pending-timeout" pflag:"-,Time to wait while pod is stuck in pending."` + // The node label that specifies the attached GPU device. GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."` diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index d8cc4dcc7f..2f3447ad0e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -572,14 +572,38 @@ func BuildIdentityPod() *v1.Pod { // and hence input gates. We should not allow bad requests that Request for large number of resource through. // In the case it makes through, we will fail after timeout func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { + phaseInfo, t := demystifyPendingHelper(status) + + if phaseInfo.Phase().IsTerminal() { + return phaseInfo, nil + } + + podPendingTimeout := config.GetK8sPluginConfig().PodPendingTimeout.Duration + if podPendingTimeout > 0 && time.Since(t) >= podPendingTimeout { + return pluginsCore.PhaseInfoRetryableFailureWithCleanup("PodPendingTimeout", phaseInfo.Reason(), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), nil + } + + if phaseInfo.Phase() != pluginsCore.PhaseUndefined { + return phaseInfo, nil + } + + return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil +} + +func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Time) { // Search over the difference conditions in the status object. Note that the 'Pending' this function is // demystifying is the 'phase' of the pod status. This is different than the PodReady condition type also used below + phaseInfo := pluginsCore.PhaseInfoUndefined + t := time.Now() for _, c := range status.Conditions { + t = c.LastTransitionTime.Time switch c.Type { case v1.PodScheduled: if c.Status == v1.ConditionFalse { // Waiting to be scheduled. This usually refers to inability to acquire resources. - return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil + return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t } case v1.PodReasonUnschedulable: @@ -592,7 +616,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // reason: Unschedulable // status: "False" // type: PodScheduled - return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil + return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t case v1.PodReady: if c.Status == v1.ConditionFalse { @@ -637,7 +661,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // ErrImagePull -> Transitionary phase to ImagePullBackOff // ContainerCreating -> Image is being downloaded // PodInitializing -> Init containers are running - return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil + return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t case "CreateContainerError": // This may consist of: @@ -659,48 +683,45 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // synced, and therefore, only provides an // approximation of the elapsed time since the last // transition. - t := c.LastTransitionTime.Time + gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t case "CreateContainerConfigError": - t := c.LastTransitionTime.Time gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t case "InvalidImageName": - t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t case "ImagePullBackOff": - t := c.LastTransitionTime.Time gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration if time.Since(t) >= gracePeriod { return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } return pluginsCore.PhaseInfoInitializing( @@ -708,7 +729,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}, - ), nil + ), t default: // Since we are not checking for all error states, we may end up perpetually @@ -716,12 +737,10 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // by K8s and we get elusive 'pod not found' errors // So be default if the container is not waiting with the PodInitializing/ContainerCreating // reasons, then we will assume a failure reason, and fail instantly - t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, - }), nil + }), t } - } } } @@ -729,7 +748,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { } } - return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil + return phaseInfo, t } func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index a98bfe6b4f..925cb00186 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -1181,6 +1181,9 @@ func TestDemystifyPending(t *testing.T) { ImagePullBackoffGracePeriod: config1.Duration{ Duration: time.Minute * 3, }, + PodPendingTimeout: config1.Duration{ + Duration: 0, + }, })) t.Run("PodNotScheduled", func(t *testing.T) { @@ -1478,6 +1481,38 @@ func TestDemystifyPending(t *testing.T) { }) } +func TestDemystifyPendingTimeout(t *testing.T) { + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + CreateContainerErrorGracePeriod: config1.Duration{ + Duration: time.Minute * 3, + }, + ImagePullBackoffGracePeriod: config1.Duration{ + Duration: time.Minute * 3, + }, + PodPendingTimeout: config1.Duration{ + Duration: 10, + }, + })) + + s := v1.PodStatus{ + Phase: v1.PodPending, + Conditions: []v1.PodCondition{ + { + Type: v1.PodScheduled, + Status: v1.ConditionFalse, + }, + }, + } + s.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().PodPendingTimeout.Duration) + + t.Run("PodPendingExceedsTimeout", func(t *testing.T) { + taskStatus, err := DemystifyPending(s) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) + assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code) + }) +} + func TestDemystifySuccess(t *testing.T) { t.Run("OOMKilled", func(t *testing.T) { phaseInfo, err := DemystifySuccess(v1.PodStatus{