From 64819b01ca1986a43f989fcffc13df86ac5ee5d9 Mon Sep 17 00:00:00 2001 From: bstadlbauer <11799671+bstadlbauer@users.noreply.github.com> Date: Thu, 8 Jun 2023 21:11:40 +0200 Subject: [PATCH] Fix initial dask job state (#357) * Fix initial dask state Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> * Fix linting Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --------- Signed-off-by: Bernhard Stadlbauer <11799671+bstadlbauer@users.noreply.github.com> --- flyteplugins/go/tasks/plugins/k8s/dask/dask.go | 7 ++++++- flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go | 9 ++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/dask/dask.go b/flyteplugins/go/tasks/plugins/k8s/dask/dask.go index cbf61c09e..b9cbc3460 100755 --- a/flyteplugins/go/tasks/plugins/k8s/dask/dask.go +++ b/flyteplugins/go/tasks/plugins/k8s/dask/dask.go @@ -320,7 +320,10 @@ func (p daskResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s OccurredAt: &occurredAt, } - isQueued := status == daskAPI.DaskJobCreated || + // There is a short period between the `DaskJob` resource being created and `Status.JobStatus` being set by the `dask-operator`. + // In that period, the `JobStatus` will be an empty string. We're treating this as Initializing/Queuing. + isQueued := status == "" || + status == daskAPI.DaskJobCreated || status == daskAPI.DaskJobClusterCreated if !isQueued { @@ -337,6 +340,8 @@ func (p daskResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s } switch status { + case "": + return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, "unknown", &info), nil case daskAPI.DaskJobCreated: return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, "job created", &info), nil case daskAPI.DaskJobClusterCreated: diff --git a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go index a04292fef..e59f0bf5e 100644 --- a/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/dask/dask_test.go @@ -491,7 +491,14 @@ func TestGetTaskPhaseDask(t *testing.T) { daskResourceHandler := daskResourceHandler{} ctx := context.TODO() - taskPhase, err := daskResourceHandler.GetTaskPhase(ctx, nil, dummyDaskJob(daskAPI.DaskJobCreated)) + taskPhase, err := daskResourceHandler.GetTaskPhase(ctx, nil, dummyDaskJob("")) + assert.NoError(t, err) + assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseInitializing) + assert.NotNil(t, taskPhase.Info()) + assert.Nil(t, taskPhase.Info().Logs) + assert.Nil(t, err) + + taskPhase, err = daskResourceHandler.GetTaskPhase(ctx, nil, dummyDaskJob(daskAPI.DaskJobCreated)) assert.NoError(t, err) assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseInitializing) assert.NotNil(t, taskPhase.Info())