From 0dcfb3b7a1cb2a9d5061ff649cecda2a7598e01c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 11 Apr 2024 13:59:54 -0700 Subject: [PATCH 1/3] fix(databricks): Check responseBody before unmarshal --- .../go/tasks/plugins/webapi/databricks/plugin.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 5ebe1d0075..d3c12ed15c 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -234,10 +234,14 @@ func (p Plugin) sendRequest(method string, databricksJob map[string]interface{}, return nil, err } var data map[string]interface{} - err = json.Unmarshal(responseBody, &data) - if err != nil { - return nil, fmt.Errorf("failed to parse response with err: [%v]", err) + + if responseBody != nil && len(responseBody) != 0 { + err = json.Unmarshal(responseBody, &data) + if err != nil { + return nil, fmt.Errorf("failed to parse response with err: [%v]", err) + } } + if resp.StatusCode != http.StatusOK { message := "" if v, ok := data["message"]; ok { From d4b5b58af5f8096740f58b56fccd0b20ee028343 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 11 Apr 2024 14:20:23 -0700 Subject: [PATCH 2/3] nit Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index d3c12ed15c..55e08049f7 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -263,10 +263,16 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase taskInfo := createTaskInfo(exec.RunID, jobID, exec.DatabricksInstance) switch lifeCycleState { // Job response format. https://docs.databricks.com/en/workflows/jobs/jobs-2.0-api.html#runlifecyclestate + case "QUEUED": + return core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, message), nil case "PENDING": return core.PhaseInfoInitializing(time.Now(), core.DefaultPhaseVersion, message, taskInfo), nil case "RUNNING": fallthrough + case "BLOCKED": + fallthrough + case "WAITING_FOR_RETRY": + fallthrough case "TERMINATING": return core.PhaseInfoRunning(core.DefaultPhaseVersion, taskInfo), nil case "TERMINATED": From 109e5d83bb8b4dbdda18080fb9eb933b69470cc1 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 12 Apr 2024 14:38:58 -0700 Subject: [PATCH 3/3] lint Signed-off-by: Kevin Su --- flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 55e08049f7..70d7c7e2f9 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -235,7 +235,7 @@ func (p Plugin) sendRequest(method string, databricksJob map[string]interface{}, } var data map[string]interface{} - if responseBody != nil && len(responseBody) != 0 { + if len(responseBody) != 0 { err = json.Unmarshal(responseBody, &data) if err != nil { return nil, fmt.Errorf("failed to parse response with err: [%v]", err)