diff --git a/go/tasks/v1/flytek8s/k8splugin_state.go b/go/tasks/v1/flytek8s/k8splugin_state.go index 28e527c5b..24ad2c070 100644 --- a/go/tasks/v1/flytek8s/k8splugin_state.go +++ b/go/tasks/v1/flytek8s/k8splugin_state.go @@ -35,15 +35,15 @@ func retrieveK8sObjectStatus(customState map[string]interface{}) (K8sObjectStatu return k8sObjectUnknown, types.TaskPhaseUnknown, nil } - status := k8sObjectUnknown - terminalTaskPhase := types.TaskPhaseUnknown + status := int(k8sObjectUnknown) + terminalTaskPhase := int(types.TaskPhaseUnknown) foundStatus := false foundPhase := false for k, v := range customState { if k == statusKey { - status, foundStatus = v.(K8sObjectStatus) + status, foundStatus = v.(int) } else if k == terminalTaskPhaseKey { - terminalTaskPhase, foundPhase = v.(types.TaskPhase) + terminalTaskPhase, foundPhase = v.(int) } } @@ -51,16 +51,17 @@ func retrieveK8sObjectStatus(customState map[string]interface{}) (K8sObjectStatu return k8sObjectUnknown, types.TaskPhaseUnknown, fmt.Errorf("invalid custom state %v", mapToString(customState)) } - return status, terminalTaskPhase, nil + return K8sObjectStatus(status), types.TaskPhase(terminalTaskPhase), nil } func storeK8sObjectStatus(status K8sObjectStatus, phase types.TaskPhase) map[string]interface{} { customState := make(map[string]interface{}) - customState[statusKey] = status - customState[terminalTaskPhaseKey] = phase + customState[statusKey] = int(status) + customState[terminalTaskPhaseKey] = int(phase) return customState } +//invalid custom state os=\"2\"\ntp=\"4\"\n","ts":"2019-10-11T00:29:24Z"} func mapToString(m map[string]interface{}) string { b := new(bytes.Buffer) for key, value := range m { diff --git a/go/tasks/v1/flytek8s/k8splugin_state_test.go b/go/tasks/v1/flytek8s/k8splugin_state_test.go new file mode 100644 index 000000000..d6acde33b --- /dev/null +++ b/go/tasks/v1/flytek8s/k8splugin_state_test.go @@ -0,0 +1,31 @@ +package flytek8s + +import ( + "testing" + "github.com/stretchr/testify/assert" + "github.com/lyft/flyteplugins/go/tasks/v1/types" +) + +func TestRetrieveK8sObjectStatus(t *testing.T) { + status := k8sObjectExists + phase := types.TaskPhaseRunning + customState := storeK8sObjectStatus(status, phase) + + retrievedStatus, retrievedPhase, err := retrieveK8sObjectStatus(customState) + assert.NoError(t, err) + assert.Equal(t, status, retrievedStatus) + assert.Equal(t, phase, retrievedPhase) +} + +func TestRetrieveK8sObjectStatus2(t *testing.T) { + status := 2 + phase := 4 + customState := make(map[string]interface{}) + customState[statusKey] = status + customState[terminalTaskPhaseKey] = phase + + retrievedStatus, retrievedPhase, err := retrieveK8sObjectStatus(customState) + assert.NoError(t, err) + assert.Equal(t, status, int(retrievedStatus)) + assert.Equal(t, phase, int(retrievedPhase)) +} \ No newline at end of file diff --git a/go/tasks/v1/flytek8s/plugin_executor_test.go b/go/tasks/v1/flytek8s/plugin_executor_test.go index a38048811..c255b4acc 100755 --- a/go/tasks/v1/flytek8s/plugin_executor_test.go +++ b/go/tasks/v1/flytek8s/plugin_executor_test.go @@ -457,7 +457,7 @@ func TestK8sTaskExecutor_CheckTaskStatus(t *testing.T) { s, err := k.CheckTaskStatus(ctx, tctx, nil) // first time after termination, we expect phase to not change but have custom state populated assert.NotNil(t, s.State) - assert.Equal(t, flytek8s.k8sObjectStatus(2), s.State["os"]) + assert.Equal(t, flytek8s.K8sObjectStatus(2), s.State["os"]) assert.Equal(t, types.TaskPhase(3), s.State["tp"]) assert.NoError(t, err) assert.Equal(t, types.TaskPhaseQueued, s.Phase)