From 8b8e5a8a3db1bdc7a2ad0e66ba78015e6120e97b Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 8 Sep 2022 08:44:19 -0500 Subject: [PATCH] Abort subworkflow on subnode failure (#468) * using 'failing' state to handle subworkflow aborts Signed-off-by: Daniel Rammer * propogating node failure is subworkflow to subworkflow failure message in ui Signed-off-by: Daniel Rammer * working with other failure scenarios Signed-off-by: Daniel Rammer * fixed lint issue Signed-off-by: Daniel Rammer * updated error message to match Signed-off-by: Daniel Rammer Signed-off-by: Daniel Rammer --- pkg/controller/nodes/node_state_manager.go | 1 + .../nodes/subworkflow/subworkflow.go | 23 ++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/controller/nodes/node_state_manager.go b/pkg/controller/nodes/node_state_manager.go index 871fecd36..bf4d270a2 100644 --- a/pkg/controller/nodes/node_state_manager.go +++ b/pkg/controller/nodes/node_state_manager.go @@ -79,6 +79,7 @@ func (n nodeStateManager) GetWorkflowNodeState() handler.WorkflowNodeState { ws := handler.WorkflowNodeState{} if wn != nil { ws.Phase = wn.GetWorkflowNodePhase() + ws.Error = wn.GetExecutionError() } return ws } diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 3e688e60d..24d74473f 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -83,11 +83,12 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler } err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) - if subworkflow.GetOnFailureNode() != nil { - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(state.Err, nil)), err + if err != nil { + logger.Warnf(ctx, "failed to store failing subworkflow state with err: [%v]", err) + return handler.UnknownTransition, err } - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), err + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } if state.IsComplete() { @@ -190,14 +191,24 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, err.Error(), nil)), nil } - status := nCtx.NodeStatus() - status.GetWorkflowNodeStatus() + if err := s.HandleAbort(ctx, nCtx, "subworkflow failed"); err != nil { + logger.Warnf(ctx, "failed to abort failing subworkflow with err: [%v]", err) + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err + } + if subWorkflow.GetOnFailureNode() == nil { logger.Infof(ctx, "Subworkflow has no failure nodes, failing immediately.") + state := nCtx.NodeStateReader().GetWorkflowNodeState() + if state.Error != nil { + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailureErr(state.Error, nil)), nil + } + return handler.DoTransition(handler.TransitionTypeEphemeral, - handler.PhaseInfoFailureErr(nCtx.NodeStateReader().GetWorkflowNodeState().Error, nil)), err + handler.PhaseInfoFailure(core.ExecutionError_UNKNOWN, "SubworkflowNodeFailing", "", nil)), nil } + status := nCtx.NodeStatus() nodeLookup := executors.NewNodeLookup(subWorkflow, status) return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup) }