Skip to content

Commit

Permalink
qubole validation for older versions of the sdk (flyteorg#34)
Browse files Browse the repository at this point in the history
Prior versions of the SDK did not fill in this value. When this code was written, it was assumed that all prior versions of the SDK would've been deprecated, which was a bad assumption.
  • Loading branch information
wild-endeavor authored Dec 3, 2019
1 parent fbc2ee3 commit caa1219
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
12 changes: 12 additions & 0 deletions flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceName
return newState, nil
}

func validateQuboleHiveJob(hiveJob plugins.QuboleHiveJob) error {
if hiveJob.Query == nil {
return errors.Errorf(errors.BadTaskSpecification,
"Query could not be found. Please ensure that you are at least on Flytekit version 0.3.0 or later.")
}
return nil
}

// This function is the link between the output written by the SDK, and the execution side. It extracts the query
// out of the task template.
func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (
Expand All @@ -182,6 +190,10 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) (
return "", "", []string{}, 0, err
}

if err := validateQuboleHiveJob(hiveJob); err != nil {
return "", "", []string{}, 0, err
}

query = hiveJob.Query.GetQuery()
cluster = hiveJob.ClusterLabel
tags = hiveJob.Tags
Expand Down
11 changes: 11 additions & 0 deletions flyteplugins/go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hive

import (
"context"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"
"net/url"
"testing"

Expand Down Expand Up @@ -77,6 +78,16 @@ func TestGetQueryInfo(t *testing.T) {
assert.Equal(t, 500, int(timeout))
}

func TestValidateQuboleHiveJob(t *testing.T) {
hiveJob := plugins.QuboleHiveJob{
ClusterLabel: "default",
Tags: []string{"flyte_plugin_test"},
Query: nil,
}
err := validateQuboleHiveJob(hiveJob)
assert.Error(t, err)
}

func TestConstructTaskLog(t *testing.T) {
expected := "https://wellness.qubole.com/v2/analyze?command_id=123"
u, err := url.Parse(expected)
Expand Down

0 comments on commit caa1219

Please sign in to comment.