Skip to content

Commit

Permalink
Fix Presto status bug that prevents task from moving forward (flyteor…
Browse files Browse the repository at this point in the history
…g#73)

* Add more logging for Presto plugin

* logs

* fix

* int format

* cleanup
  • Loading branch information
lu4nm3 authored Apr 1, 2020
1 parent 951459d commit 71f9fdd
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (p noopPrestoClient) KillCommand(ctx context.Context, commandID string) err
}

func (p noopPrestoClient) GetCommandStatus(ctx context.Context, commandID string) (PrestoStatus, error) {
return NewPrestoStatus(ctx, "UNKNOWN"), nil
return PrestoStatusUnknown, nil
}

func NewNoopPrestoClient(cfg *config.Config) PrestoClient {
Expand Down
7 changes: 2 additions & 5 deletions flyteplugins/go/tasks/plugins/presto/client/presto_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package client

import "context"

type PrestoStatus string

// Contains information needed to execute a Presto query
type PrestoExecuteArgs struct {
RoutingGroup string `json:"routingGroup,omitempty"`
Expand All @@ -15,9 +13,8 @@ type PrestoExecuteArgs struct {

// Representation of a response after submitting a query to Presto
type PrestoExecuteResponse struct {
ID string `json:"id,omitempty"`
Status PrestoStatus `json:"status,omitempty"`
NextURI string `json:"nextUri,omitempty"`
ID string `json:"id,omitempty"`
NextURI string `json:"nextUri,omitempty"`
}

//go:generate mockery -all -case=snake
Expand Down
46 changes: 10 additions & 36 deletions flyteplugins/go/tasks/plugins/presto/client/presto_status.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,17 @@
package client

import (
"context"
"strings"

"github.com/lyft/flytestdlib/logger"
)

// This type is meant only to encapsulate the response coming from Presto as a type, it is
// not meant to be stored locally.
const (
PrestoStatusUnknown PrestoStatus = "UNKNOWN"
PrestoStatusWaiting PrestoStatus = "WAITING"
PrestoStatusRunning PrestoStatus = "RUNNING"
PrestoStatusFinished PrestoStatus = "FINISHED"
PrestoStatusFailed PrestoStatus = "FAILED"
PrestoStatusCancelled PrestoStatus = "CANCELLED"
)

var PrestoStatuses = map[PrestoStatus]struct{}{
PrestoStatusUnknown: {},
PrestoStatusWaiting: {},
PrestoStatusRunning: {},
PrestoStatusFinished: {},
PrestoStatusFailed: {},
PrestoStatusCancelled: {},
}
//go:generate enumer --type=PrestoStatus

func NewPrestoStatus(ctx context.Context, state string) PrestoStatus {
upperCased := strings.ToUpper(state)
type PrestoStatus int

// Presto has different failure modes so this maps them all to a single Failure on the
// Flyte side
if strings.Contains(upperCased, "FAILED") {
return PrestoStatusFailed
} else if _, ok := PrestoStatuses[PrestoStatus(upperCased)]; ok {
return PrestoStatus(upperCased)
} else {
logger.Warnf(ctx, "Invalid Presto Status found: %v", state)
return PrestoStatusUnknown
}
}
const (
PrestoStatusUnknown PrestoStatus = iota
PrestoStatusWaiting
PrestoStatusRunning
PrestoStatusFinished
PrestoStatusFailed
PrestoStatusCancelled
)
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/plugins/presto/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ func TestKickOffQuery(t *testing.T) {
var prestoCalled = false

prestoExecuteResponse := client.PrestoExecuteResponse{
ID: "1234567",
Status: client.PrestoStatusWaiting,
ID: "1234567",
}
mockPresto := &prestoMocks.PrestoClient{}
mockPresto.OnExecuteCommandMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything,
Expand Down

0 comments on commit 71f9fdd

Please sign in to comment.