Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Job plugin -> Pod plugin (#25)
Browse files Browse the repository at this point in the history
- Pods are way more scalable
- Error detection is way better
- [x] Unit tests
- [ ] End to end test
  • Loading branch information
Ketan Umare authored and lyft-buildnotify-16 committed Dec 27, 2018
1 parent d89bc2e commit 01a95bf
Show file tree
Hide file tree
Showing 40 changed files with 2,136 additions and 1,319 deletions.
16 changes: 14 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ ignored = ["k8s.io/spark-on-k8s-operator",
[[override]]
name = "github.com/prometheus/client_golang"
version = "^0.8.0"

[[override]]
name = "github.com/stretchr/objx"
version = "0.1.1"
89 changes: 31 additions & 58 deletions go/tasks/v1/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,29 @@ package errors

import (
"fmt"
)

type ErrorType int

const (
RetryableError ErrorType = iota
NonRetryableError
"github.com/pkg/errors"
)

func (e ErrorType) String() string {
switch e {
case RetryableError:
return "RetryableError"
case NonRetryableError:
return "NonRetryableError"
}
return "UnknownErrorType"
}

type ErrorCode = string

const (
TaskFailedWithError ErrorCode = "TaskFailedWithError"
DownstreamSystemError ErrorCode = "DownstreamSystemError"
TaskFailedUnknownError ErrorCode = "TaskFailedUnknownError"
BadTaskSpecification ErrorCode = "BadTaskSpecification"
TaskFailedWithError ErrorCode = "TaskFailedWithError"
DownstreamSystemError ErrorCode = "DownstreamSystemError"
TaskFailedUnknownError ErrorCode = "TaskFailedUnknownError"
BadTaskSpecification ErrorCode = "BadTaskSpecification"
TaskEventRecordingFailed ErrorCode = "TaskEventRecordingFailed"
MetadataAccessFailed ErrorCode = "MetadataAccessFailed"
MetadataTooLarge ErrorCode = "MetadataTooLarge"
)

type TaskError struct {
Type ErrorType
ErrorCode string
Message string
Code string
Message string
}

func (e *TaskError) Error() string {
return fmt.Sprintf("Task Failure [%v]. ErrorCode [%v] Message [%v]", e.Type.String(), e.ErrorCode, e.Message)
return fmt.Sprintf("task failed, %v: %v", e.Code, e.Message)
}

type TaskErrorWithCause struct {
Expand All @@ -46,55 +33,41 @@ type TaskErrorWithCause struct {
}

func (e *TaskErrorWithCause) Error() string {
return fmt.Sprintf("Task Failure [%v]. ErrorCode [%v] Message [%v]. Cause [%v]", e.Type.String(), e.ErrorCode, e.Message, e.cause)
return fmt.Sprintf("%v, caused by: %v", e.TaskError.Error(), errors.Cause(e))
}

func (e *TaskErrorWithCause) Cause() error {
return e.cause
}

func errorf(eType ErrorType, errorCode string, msgFmt string, args ...interface{}) *TaskError {
func Errorf(errorCode ErrorCode, msgFmt string, args ...interface{}) *TaskError {
return &TaskError{
Type: eType,
ErrorCode: errorCode,
Message: fmt.Sprintf(msgFmt, args...),
Code: errorCode,
Message: fmt.Sprintf(msgFmt, args...),
}
}

func RetryableErrorf(errorCode string, msgFmt string, args ...interface{}) error {
return errorf(RetryableError, errorCode, msgFmt, args...)
}

func NonRetryableErrorf(errorCode string, msgFmt string, args ...interface{}) error {
return errorf(NonRetryableError, errorCode, msgFmt, args...)
}

func WrapRetryableErrorf(causer error, errorCode string, msgFmt string, args ...interface{}) error {
func Wrapf(errorCode ErrorCode, err error, msgFmt string, args ...interface{}) *TaskErrorWithCause {
return &TaskErrorWithCause{
TaskError: errorf(RetryableError, errorCode, msgFmt, args...),
cause: causer,
TaskError: Errorf(errorCode, msgFmt, args...),
cause: err,
}
}

func WrapNonRetryableErrorf(causer error, errorCode string, msgFmt string, args ...interface{}) error {
return &TaskErrorWithCause{
TaskError: errorf(NonRetryableError, errorCode, msgFmt, args...),
cause: causer,
func GetErrorCode(err error) (code ErrorCode, isTaskError bool) {
isTaskError = false
e, ok := err.(*TaskError)
if ok {
code = e.Code
isTaskError = true
return
}
}

func IsRetryableError(err error) bool {
if actualErr, ok := err.(*TaskError); !ok {
if wrapperErr, ok := err.(*TaskErrorWithCause); ok {
return wrapperErr.Type == RetryableError
}
} else {
return actualErr.Type == RetryableError
e2, ok := err.(*TaskError)
if ok {
code = e2.Code
isTaskError = true
return
}

return false
}

func IsNonRetryableError(err error) bool {
return !IsRetryableError(err)
return
}
100 changes: 100 additions & 0 deletions go/tasks/v1/events/event_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package events

import (
"time"

"github.com/golang/protobuf/ptypes/struct"
"github.com/lyft/flyteplugins/go/tasks/v1/errors"
"github.com/lyft/flyteplugins/go/tasks/v1/types"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
)

// Additional info that should be sent to the front end. The Information is sent to the front-end if it meets certain
// criterion, for example currently, it is sent only if an event was not already sent for
type TaskEventInfo struct {
// log information for the task execution
Logs []*core.TaskLog
// Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current
// time at the time of publishing the event.
OccurredAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
}

// Convert all TaskStatus to an ExecutionPhase that is common to Flyte and Admin understands
// NOTE: if we add a TaskStatus entry, we should add it here too
func convertTaskPhaseToExecutionStatus(status types.TaskPhase) core.TaskExecutionPhase {
switch status {
case types.TaskPhaseRunning:
return core.TaskExecutionPhase_TASK_PHASE_RUNNING
case types.TaskPhaseSucceeded:
return core.TaskExecutionPhase_TASK_PHASE_SUCCEEDED
case types.TaskPhaseRetryableFailure, types.TaskPhasePermanentFailure:
return core.TaskExecutionPhase_TASK_PHASE_FAILED
case types.TaskPhaseQueued:
return core.TaskExecutionPhase_TASK_PHASE_QUEUED
default:
return core.TaskExecutionPhase_TASK_PHASE_UNDEFINED
}
}

func CreateEvent(taskCtx types.TaskContext, taskStatus types.TaskStatus, info *TaskEventInfo) *event.TaskExecutionEvent {

newTaskExecutionPhase := convertTaskPhaseToExecutionStatus(taskStatus.Phase)
taskExecutionID := taskCtx.GetTaskExecutionID().GetID()

occurredAt := ptypes.TimestampNow()
logs := make([]*core.TaskLog, 0)
var customInfo *structpb.Struct

if info != nil {
customInfo = info.CustomInfo
if info.OccurredAt != nil {
t, err := ptypes.TimestampProto(*info.OccurredAt)
if err != nil {
occurredAt = t
}
}
for _, l := range info.Logs {
logs = append(logs, l)
}
}

taskEvent := &event.TaskExecutionEvent{
TaskId: taskExecutionID.TaskId,
ParentNodeExecutionId: taskExecutionID.NodeExecutionId,
Phase: newTaskExecutionPhase,
RetryAttempt: taskCtx.GetTaskExecutionID().GetID().RetryAttempt,
InputUri: taskCtx.GetInputsFile().String(),
OccurredAt: occurredAt,
Logs: logs,
CustomInfo: customInfo,
}

if newTaskExecutionPhase == core.TaskExecutionPhase_TASK_PHASE_FAILED {
errorCode := "UnknownTaskError"
message := "unknown reason"
if taskStatus.Err != nil {
ec, ok := errors.GetErrorCode(taskStatus.Err)
if ok {
errorCode = ec
}
message = taskStatus.Err.Error()
}
taskEvent.OutputResult = &event.TaskExecutionEvent_Error{
Error: &core.ExecutionError{
Code: errorCode,
Message: message,
ErrorUri: taskCtx.GetErrorFile().String(),
},
}
} else if newTaskExecutionPhase == core.TaskExecutionPhase_TASK_PHASE_SUCCEEDED {
taskEvent.OutputResult = &event.TaskExecutionEvent_OutputUri{
OutputUri: taskCtx.GetOutputsFile().String(),
}
}
return taskEvent
}
Loading

0 comments on commit 01a95bf

Please sign in to comment.