Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge f450131 into 206013a
Browse files Browse the repository at this point in the history
  • Loading branch information
hamersaw authored Jul 6, 2023
2 parents 206013a + f450131 commit 8e18bfe
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ type ExecutableDynamicNodeStatus interface {
GetDynamicNodePhase() DynamicNodePhase
GetDynamicNodeReason() string
GetExecutionError() *core.ExecutionError
GetIsFailurePermanent() bool
}

type MutableDynamicNodeStatus interface {
Expand All @@ -235,6 +236,7 @@ type MutableDynamicNodeStatus interface {
SetDynamicNodePhase(phase DynamicNodePhase)
SetDynamicNodeReason(reason string)
SetExecutionError(executionError *core.ExecutionError)
SetIsFailurePermanent(isFailurePermanent bool)
}

// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableDynamicNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ const (

type DynamicNodeStatus struct {
MutableStruct
Phase DynamicNodePhase `json:"phase,omitempty"`
Reason string `json:"reason,omitempty"`
Error *ExecutionError `json:"error,omitempty"`
Phase DynamicNodePhase `json:"phase,omitempty"`
Reason string `json:"reason,omitempty"`
Error *ExecutionError `json:"error,omitempty"`
IsFailurePermanent bool `json:"permFailure,omitempty"`
}

func (in *DynamicNodeStatus) GetDynamicNodePhase() DynamicNodePhase {
Expand All @@ -116,6 +117,10 @@ func (in *DynamicNodeStatus) GetExecutionError() *core.ExecutionError {
return in.Error.ExecutionError
}

func (in *DynamicNodeStatus) GetIsFailurePermanent() bool {
return in.IsFailurePermanent
}

func (in *DynamicNodeStatus) SetDynamicNodeReason(reason string) {
if in.Reason != reason {
in.SetDirty()
Expand All @@ -138,6 +143,13 @@ func (in *DynamicNodeStatus) SetExecutionError(err *core.ExecutionError) {
}
}

func (in *DynamicNodeStatus) SetIsFailurePermanent(isFailurePermanent bool) {
if in.IsFailurePermanent != isFailurePermanent {
in.SetDirty()
in.IsFailurePermanent = isFailurePermanent
}
}

func (in *DynamicNodeStatus) Equals(o *DynamicNodeStatus) bool {
if in == nil && o == nil {
return true
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,12 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,

// As we do not support Failure Node, we can just return failure in this case
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoDynamicRunning(nil)),
handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "Dynamic workflow failed", Error: state.Err},
nil
handler.DynamicNodeState{
Phase: v1alpha1.DynamicNodePhaseFailing,
Reason: "Dynamic workflow failed",
Error: state.Err,
IsFailurePermanent: state.HasFailed(),
}, nil
}

if state.IsComplete() {
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,18 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod
return trns, err
}

// TODO: Use Execution Error for ds.Error type to propagate the recoverable flag and determine if the error is retryable.
// if DynamicNodeStatus is noted with permanent failures we report a non-recoverable failure
phaseInfoFailureFunc := handler.PhaseInfoRetryableFailure
phaseInfoFailureFuncErr := handler.PhaseInfoRetryableFailureErr
if ds.IsFailurePermanent {
phaseInfoFailureFunc = handler.PhaseInfoFailure
phaseInfoFailureFuncErr = handler.PhaseInfoFailureErr
}

if ds.Error != nil {
trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailureErr(ds.Error, nil))
trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFuncErr(ds.Error, nil))
} else {
trns = handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil))
trns = handler.DoTransition(handler.TransitionTypeEphemeral, phaseInfoFailureFunc(core.ExecutionError_UNKNOWN, "DynamicNodeFailing", ds.Reason, nil))
}
case v1alpha1.DynamicNodePhaseParentFinalizing:
if err := d.finalizeParentNode(ctx, nCtx); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/storage"
)

// This is the legacy state structure that gets translated to node status
Expand All @@ -31,9 +30,10 @@ type BranchNodeState struct {
type DynamicNodePhase uint8

type DynamicNodeState struct {
Phase v1alpha1.DynamicNodePhase
Reason string
Error *core.ExecutionError
Phase v1alpha1.DynamicNodePhase
Reason string
Error *core.ExecutionError
IsFailurePermanent bool
}

type WorkflowNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (n nodeStateManager) GetDynamicNodeState() handler.DynamicNodeState {
ds.Phase = dn.GetDynamicNodePhase()
ds.Reason = dn.GetDynamicNodeReason()
ds.Error = dn.GetExecutionError()
ds.IsFailurePermanent = dn.GetIsFailurePermanent()
}

return ds
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetDynamicNodePhase(n.d.Phase)
t.SetDynamicNodeReason(n.d.Reason)
t.SetExecutionError(n.d.Error)
t.SetIsFailurePermanent(n.d.IsFailurePermanent)
}

// Update branch node status
Expand Down

0 comments on commit 8e18bfe

Please sign in to comment.