diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index 871fecd36a..bf4d270a22 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -79,6 +79,7 @@ func (n nodeStateManager) GetWorkflowNodeState() handler.WorkflowNodeState { ws := handler.WorkflowNodeState{} if wn != nil { ws.Phase = wn.GetWorkflowNodePhase() + ws.Error = wn.GetExecutionError() } return ws } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 3e688e60dd..24d74473f0 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -83,11 +83,12 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler } err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) - if subworkflow.GetOnFailureNode() != nil { - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(state.Err, nil)), err + if err != nil { + logger.Warnf(ctx, "failed to store failing subworkflow state with err: [%v]", err) + return handler.UnknownTransition, err } - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(state.Err, nil)), err + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } if state.IsComplete() { @@ -190,14 +191,24 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.SubWorkflowExecutionFailed, err.Error(), nil)), nil } - status := nCtx.NodeStatus() - status.GetWorkflowNodeStatus() + if err := s.HandleAbort(ctx, nCtx, "subworkflow failed"); err != nil { + logger.Warnf(ctx, "failed to abort failing subworkflow with err: [%v]", err) + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err + } + if subWorkflow.GetOnFailureNode() == nil { logger.Infof(ctx, "Subworkflow has no failure nodes, failing immediately.") + state := nCtx.NodeStateReader().GetWorkflowNodeState() + if state.Error != nil { + return handler.DoTransition(handler.TransitionTypeEphemeral, + handler.PhaseInfoFailureErr(state.Error, nil)), nil + } + return handler.DoTransition(handler.TransitionTypeEphemeral, - handler.PhaseInfoFailureErr(nCtx.NodeStateReader().GetWorkflowNodeState().Error, nil)), err + handler.PhaseInfoFailure(core.ExecutionError_UNKNOWN, "SubworkflowNodeFailing", "", nil)), nil } + status := nCtx.NodeStatus() nodeLookup := executors.NewNodeLookup(subWorkflow, status) return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup) }