Skip to content

Commit

Permalink
Sub error classification (flyteorg#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Apr 29, 2020
1 parent 74afea0 commit aa8c9b1
Show file tree
Hide file tree
Showing 21 changed files with 654 additions and 549 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/hack/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
package tools

// Uncomment this to make code-generator work
// import _ "k8s.io/code-generator"
//import _ "k8s.io/code-generator"
1 change: 1 addition & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
)

// Wrapper around core.Execution error. Execution Error has a protobuf enum and hence needs to be wrapped by custom marshaller
type ExecutionError struct {
*core.ExecutionError
}
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type MutableBranchNodeStatus interface {
type ExecutableDynamicNodeStatus interface {
GetDynamicNodePhase() DynamicNodePhase
GetDynamicNodeReason() string
GetExecutionError() *core.ExecutionError
}

type MutableDynamicNodeStatus interface {
Expand All @@ -183,6 +184,7 @@ type MutableDynamicNodeStatus interface {

SetDynamicNodePhase(phase DynamicNodePhase)
SetDynamicNodeReason(reason string)
SetExecutionError(executionError *core.ExecutionError)
}

// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
Expand Down
14 changes: 13 additions & 1 deletion flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ const (
type DynamicNodeStatus struct {
MutableStruct
Phase DynamicNodePhase `json:"phase"`
Reason string `json:"reason"`
Reason string `json:"reason,omitempty"`
Error *ExecutionError `json:"error,omitempty"`
}

func (in *DynamicNodeStatus) GetDynamicNodePhase() DynamicNodePhase {
Expand All @@ -105,6 +106,13 @@ func (in *DynamicNodeStatus) GetDynamicNodeReason() string {
return in.Reason
}

func (in *DynamicNodeStatus) GetExecutionError() *core.ExecutionError {
if in.Error == nil {
return nil
}
return in.Error.ExecutionError
}

func (in *DynamicNodeStatus) SetDynamicNodeReason(reason string) {
if in.Reason != reason {
in.SetDirty()
Expand All @@ -119,6 +127,10 @@ func (in *DynamicNodeStatus) SetDynamicNodePhase(phase DynamicNodePhase) {
}
}

func (in *DynamicNodeStatus) SetExecutionError(err *core.ExecutionError) {
in.Error = &ExecutionError{ExecutionError: err}
}

func (in *DynamicNodeStatus) Equals(o *DynamicNodeStatus) bool {
if in == nil && o == nil {
return true
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions flytepropeller/pkg/controller/nodes/branch/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"reflect"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/pkg/errors"
"github.com/lyft/flytestdlib/errors"
)

type comparator func(lValue *core.Primitive, rValue *core.Primitive) bool
Expand Down Expand Up @@ -75,7 +75,7 @@ func Evaluate(lValue *core.Primitive, rValue *core.Primitive, op core.Comparison
lValueType := reflect.TypeOf(lValue.Value)
rValueType := reflect.TypeOf(rValue.Value)
if lValueType != rValueType {
return false, errors.Errorf("Comparison between different primitives types. lVal[%v]:rVal[%v]", lValueType, rValueType)
return false, errors.Errorf(ErrorCodeMalformedBranch, "Comparison between different primitives types. lVal[%v]:rVal[%v]", lValueType, rValueType)
}
comps, ok := perTypeComparators[lValueType.String()]
if !ok {
Expand All @@ -88,52 +88,52 @@ func Evaluate(lValue *core.Primitive, rValue *core.Primitive, op core.Comparison
switch op {
case core.ComparisonExpression_GT:
if isBoolean {
return false, errors.Errorf("[GT] not defined for boolean operands.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "[GT] not defined for boolean operands.")
}
return comps.gt(lValue, rValue), nil
case core.ComparisonExpression_GTE:
if isBoolean {
return false, errors.Errorf("[GTE] not defined for boolean operands.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "[GTE] not defined for boolean operands.")
}
return comps.eq(lValue, rValue) || comps.gt(lValue, rValue), nil
case core.ComparisonExpression_LT:
if isBoolean {
return false, errors.Errorf("[LT] not defined for boolean operands.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "[LT] not defined for boolean operands.")
}
return !(comps.gt(lValue, rValue) || comps.eq(lValue, rValue)), nil
case core.ComparisonExpression_LTE:
if isBoolean {
return false, errors.Errorf("[LTE] not defined for boolean operands.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "[LTE] not defined for boolean operands.")
}
return !comps.gt(lValue, rValue), nil
case core.ComparisonExpression_EQ:
return comps.eq(lValue, rValue), nil
case core.ComparisonExpression_NEQ:
return !comps.eq(lValue, rValue), nil
}
return false, errors.Errorf("Unsupported operator type in Propeller. System error.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "Unsupported operator type in Propeller. System error.")
}

func Evaluate1(lValue *core.Primitive, rValue *core.Literal, op core.ComparisonExpression_Operator) (bool, error) {
if rValue.GetScalar() == nil || rValue.GetScalar().GetPrimitive() == nil {
return false, errors.Errorf("Only primitives can be compared. RHS Variable is non primitive.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "Only primitives can be compared. RHS Variable is non primitive.")
}
return Evaluate(lValue, rValue.GetScalar().GetPrimitive(), op)
}

func Evaluate2(lValue *core.Literal, rValue *core.Primitive, op core.ComparisonExpression_Operator) (bool, error) {
if lValue.GetScalar() == nil || lValue.GetScalar().GetPrimitive() == nil {
return false, errors.Errorf("Only primitives can be compared. LHS Variable is non primitive.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "Only primitives can be compared. LHS Variable is non primitive.")
}
return Evaluate(lValue.GetScalar().GetPrimitive(), rValue, op)
}

func EvaluateLiterals(lValue *core.Literal, rValue *core.Literal, op core.ComparisonExpression_Operator) (bool, error) {
if lValue.GetScalar() == nil || lValue.GetScalar().GetPrimitive() == nil {
return false, errors.Errorf("Only primitives can be compared. LHS Variable is non primitive.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "Only primitives can be compared. LHS Variable is non primitive.")
}
if rValue.GetScalar() == nil || rValue.GetScalar().GetPrimitive() == nil {
return false, errors.Errorf("Only primitives can be compared. RHS Variable is non primitive")
return false, errors.Errorf(ErrorCodeMalformedBranch, "Only primitives can be compared. RHS Variable is non primitive")
}
return Evaluate(lValue.GetScalar().GetPrimitive(), rValue.GetScalar().GetPrimitive(), op)
}
24 changes: 13 additions & 11 deletions flytepropeller/pkg/controller/nodes/branch/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"

regErrors "github.com/pkg/errors"
)

const ErrorCodeUserProvidedError = "UserProvidedError"
const ErrorCodeMalformedBranch = "MalformedBranchUserError"
const ErrorCodeCompilerError = "CompilerError"

func EvaluateComparison(expr *core.ComparisonExpression, nodeInputs *core.LiteralMap) (bool, error) {
var lValue *core.Literal
var rValue *core.Literal
Expand All @@ -22,23 +24,23 @@ func EvaluateComparison(expr *core.ComparisonExpression, nodeInputs *core.Litera

if expr.GetLeftValue().GetPrimitive() == nil {
if nodeInputs == nil {
return false, regErrors.Errorf("Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
return false, errors.Errorf(ErrorCodeMalformedBranch, "Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
}
lValue = nodeInputs.Literals[expr.GetLeftValue().GetVar()]
if lValue == nil {
return false, regErrors.Errorf("Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
return false, errors.Errorf(ErrorCodeMalformedBranch, "Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
}
} else {
lPrim = expr.GetLeftValue().GetPrimitive()
}

if expr.GetRightValue().GetPrimitive() == nil {
if nodeInputs == nil {
return false, regErrors.Errorf("Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
return false, errors.Errorf(ErrorCodeMalformedBranch, "Failed to find Value for Variable [%v]", expr.GetLeftValue().GetVar())
}
rValue = nodeInputs.Literals[expr.GetRightValue().GetVar()]
if rValue == nil {
return false, regErrors.Errorf("Failed to find Value for Variable [%v]", expr.GetRightValue().GetVar())
return false, errors.Errorf(ErrorCodeMalformedBranch, "Failed to find Value for Variable [%v]", expr.GetRightValue().GetVar())
}
} else {
rPrim = expr.GetRightValue().GetPrimitive()
Expand All @@ -61,7 +63,7 @@ func EvaluateBooleanExpression(expr *core.BooleanExpression, nodeInputs *core.Li
return EvaluateComparison(expr.GetComparison(), nodeInputs)
}
if expr.GetConjunction() == nil {
return false, regErrors.Errorf("No Comparison or Conjunction found in Branch node expression.")
return false, errors.Errorf(ErrorCodeMalformedBranch, "No Comparison or Conjunction found in Branch node expression.")
}
lvalue, err := EvaluateBooleanExpression(expr.GetConjunction().GetLeftExpression(), nodeInputs)
if err != nil {
Expand Down Expand Up @@ -122,7 +124,7 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1.
skippedNodeID := *nodeIDPtr
n, ok := nl.GetNode(skippedNodeID)
if !ok {
return nil, errors.Errorf(errors.DownstreamNodeNotFoundError, nodeID, "Downstream node [%v] not found", skippedNodeID)
return nil, errors.Errorf(ErrorCodeCompilerError, "Downstream node [%v] not found", skippedNodeID)
}
nStatus := nl.GetNodeExecutionStatus(ctx, n.GetID())
logger.Infof(ctx, "Branch Setting Node[%v] status to Skipped!", skippedNodeID)
Expand All @@ -131,9 +133,9 @@ func DecideBranch(ctx context.Context, nl executors.NodeLookup, nodeID v1alpha1.

if selectedNodeID == nil {
if node.GetElseFail() != nil {
return nil, errors.Errorf(errors.UserProvidedError, nodeID, node.GetElseFail().Message)
return nil, errors.Errorf(ErrorCodeUserProvidedError, node.GetElseFail().Message)
}
return nil, errors.Errorf(errors.NoBranchTakenError, nodeID, "No branch satisfied")
return nil, errors.Errorf(ErrorCodeMalformedBranch, "No branch satisfied")
}
logger.Infof(ctx, "Branch Node[%v] selected!", *selectedNodeID)
return selectedNodeID, nil
Expand Down
18 changes: 13 additions & 5 deletions flytepropeller/pkg/controller/nodes/branch/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package branch

import (
"context"
"fmt"
"testing"

"github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/utils"
)

Expand Down Expand Up @@ -384,7 +385,10 @@ func TestDecideBranch(t *testing.T) {
b, err := DecideBranch(ctx, w, "n1", branchNode, inputs)
assert.Error(t, err)
assert.Nil(t, b)
assert.Equal(t, errors.NoBranchTakenError, err.(*errors.NodeError).ErrCode)
e, ok := errors.GetErrorCode(err)
assert.True(t, ok)
assert.NotNil(t, e)
assert.Equal(t, ErrorCodeMalformedBranch, e)
})

t.Run("WithThenNode", func(t *testing.T) {
Expand Down Expand Up @@ -620,7 +624,9 @@ func TestDecideBranch(t *testing.T) {
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
assert.Error(t, err)
assert.Nil(t, b)
assert.Equal(t, errors.DownstreamNodeNotFoundError, err.(*errors.NodeError).ErrCode)
ec, ok := errors.GetErrorCode(err)
assert.True(t, ok)
assert.Equal(t, ErrorCodeCompilerError, ec)
})

t.Run("ElseFailCase", func(t *testing.T) {
Expand Down Expand Up @@ -677,8 +683,10 @@ func TestDecideBranch(t *testing.T) {
b, err := DecideBranch(ctx, w, "n", branchNode, inputs)
assert.Error(t, err)
assert.Nil(t, b)
assert.Equal(t, errors.UserProvidedError, err.(*errors.NodeError).ErrCode)
assert.Equal(t, userError, err.(*errors.NodeError).Message)
ec, ok := errors.GetErrorCode(err)
assert.True(t, ok)
assert.Equal(t, ErrorCodeUserProvidedError, ec)
assert.Equal(t, fmt.Sprintf("[UserProvidedError] %s", userError), err.Error())
assert.Equal(t, v1alpha1.NodePhaseSkipped, w.Status.NodeStatus[n1].GetPhase())
assert.Equal(t, v1alpha1.NodePhaseSkipped, w.Status.NodeStatus[n2].GetPhase())
})
Expand Down
14 changes: 11 additions & 3 deletions flytepropeller/pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
stdErrors "github.com/lyft/flytestdlib/errors"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"

Expand Down Expand Up @@ -41,7 +42,12 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha
}
finalNodeID, err := DecideBranch(ctx, nl, nCtx.NodeID(), branchNode, nodeInputs)
if err != nil {
// TODO @kumare differentiate branch error user vs system. We should define errors for branch only
ec, ok := stdErrors.GetErrorCode(err)
if ok {
if ec == ErrorCodeMalformedBranch || ec == ErrorCodeUserProvidedError {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_USER, ec, err.Error(), nil)), nil
}
}
errMsg := fmt.Sprintf("Branch evaluation failed. Error [%s]", err)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.IllegalStateError, errMsg, nil)), nil
}
Expand Down Expand Up @@ -153,7 +159,8 @@ func (b *branchHandler) Abort(ctx context.Context, nCtx handler.NodeExecutionCon
if branch.GetElseFail() != nil {
errMsg = branch.GetElseFail().Message
}
return errors.Errorf(errors.UserProvidedError, nCtx.NodeID(), errMsg)
logger.Errorf(ctx, errMsg)
return nil
}

finalNodeID := branchNodeState.FinalizedNodeID
Expand Down Expand Up @@ -188,7 +195,8 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution
if branch.GetElseFail() != nil {
errMsg = branch.GetElseFail().Message
}
return errors.Errorf(errors.UserProvidedError, nCtx.NodeID(), errMsg)
logger.Errorf(ctx, "failed to evaluate branch - user error: %s", errMsg)
return nil
}

finalNodeID := branchNodeState.FinalizedNodeID
Expand Down
4 changes: 1 addition & 3 deletions flytepropeller/pkg/controller/nodes/branch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
mocks2 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
"github.com/lyft/flytepropeller/pkg/controller/executors"
execMocks "github.com/lyft/flytepropeller/pkg/controller/executors/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks"
)
Expand Down Expand Up @@ -257,8 +256,7 @@ func TestBranchHandler_AbortNode(t *testing.T) {
nCtx, _ := createNodeContext(v1alpha1.BranchNodeError, nil, n, nil, nil)
branch := New(mockNodeExecutor, promutils.NewTestScope())
err := branch.Abort(ctx, nCtx, "")
assert.Error(t, err)
assert.True(t, errors.Matches(err, errors.UserProvidedError))
assert.NoError(t, err)
})

t.Run("BranchNodeSuccess", func(t *testing.T) {
Expand Down
Loading

0 comments on commit aa8c9b1

Please sign in to comment.