Skip to content

Commit

Permalink
Abort subworkflow on subnode failure (flyteorg#468)
Browse files Browse the repository at this point in the history
* using 'failing' state to handle subworkflow aborts

Signed-off-by: Daniel Rammer <[email protected]>

* propogating node failure is subworkflow to subworkflow failure message in ui

Signed-off-by: Daniel Rammer <[email protected]>

* working with other failure scenarios

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>

* updated error message to match

Signed-off-by: Daniel Rammer <[email protected]>

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Sep 8, 2022
1 parent 7727851 commit 98ee0f6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (n nodeStateManager) GetWorkflowNodeState() handler.WorkflowNodeState {
ws := handler.WorkflowNodeState{}
if wn != nil {
ws.Phase = wn.GetWorkflowNodePhase()
ws.Error = wn.GetExecutionError()
}
return ws
}
Expand Down
23 changes: 17 additions & 6 deletions flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler
}

err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState)
if subworkflow.GetOnFailureNode() != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(state.Err, nil)), err
if err != nil {
logger.Warnf(ctx, "failed to store failing subworkflow state with err: [%v]", err)
return handler.UnknownTransition, err
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), err
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil
}

if state.IsComplete() {
Expand Down Expand Up @@ -190,14 +191,24 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, err.Error(), nil)), nil
}

status := nCtx.NodeStatus()
status.GetWorkflowNodeStatus()
if err := s.HandleAbort(ctx, nCtx, "subworkflow failed"); err != nil {
logger.Warnf(ctx, "failed to abort failing subworkflow with err: [%v]", err)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
}

if subWorkflow.GetOnFailureNode() == nil {
logger.Infof(ctx, "Subworkflow has no failure nodes, failing immediately.")
state := nCtx.NodeStateReader().GetWorkflowNodeState()
if state.Error != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailureErr(state.Error, nil)), nil
}

return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailureErr(nCtx.NodeStateReader().GetWorkflowNodeState().Error, nil)), err
handler.PhaseInfoFailure(core.ExecutionError_UNKNOWN, "SubworkflowNodeFailing", "", nil)), nil
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
}
Expand Down

0 comments on commit 98ee0f6

Please sign in to comment.