diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index c7ac0c2e4f27..a09eea8230b3 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -2381,9 +2381,25 @@ func (n NodeStatus) IsDaemoned() bool { return true } +// IsPartOfExitHandler returns whether node is part of exit handler. +func (n *NodeStatus) IsPartOfExitHandler(nodes Nodes) bool { + currentNode := n + for !currentNode.IsExitNode() { + if currentNode.BoundaryID == "" { + return false + } + boundaryNode, err := nodes.Get(currentNode.BoundaryID) + if err != nil { + log.Panicf("was unable to obtain node for %s", currentNode.BoundaryID) + } + currentNode = boundaryNode + } + return true +} + // IsExitNode returns whether or not node run as exit handler. -func (ws NodeStatus) IsExitNode() bool { - return strings.HasSuffix(ws.DisplayName, ".onExit") +func (n NodeStatus) IsExitNode() bool { + return strings.HasSuffix(n.DisplayName, ".onExit") } func (n NodeStatus) Succeeded() bool { @@ -2452,9 +2468,9 @@ func (n *NodeStatus) IsActiveSuspendNode() bool { return n.Type == NodeTypeSuspend && n.Phase == NodeRunning } -// IsActivePluginNode returns whether this node is an active plugin node -func (n *NodeStatus) IsActivePluginNode() bool { - return n.Type == NodeTypePlugin && (n.Phase == NodeRunning || n.Phase == NodePending) +// IsTaskSetNode returns whether this node uses the taskset +func (n *NodeStatus) IsTaskSetNode() bool { + return n.Type == NodeTypeHTTP || n.Type == NodeTypePlugin } func (n NodeStatus) GetDuration() time.Duration { diff --git a/test/e2e/hooks_test.go b/test/e2e/hooks_test.go index 28e7e8cf7415..8f36790becc0 100644 --- a/test/e2e/hooks_test.go +++ b/test/e2e/hooks_test.go @@ -13,6 +13,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) type HooksSuite struct { @@ -776,6 +777,101 @@ spec: }) } +func (s *HooksSuite) TestExitHandlerWithWorkflowLevelDeadline() { + var onExitNodeName string + (s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: exit-handler-with-workflow-level-deadline +spec: + entrypoint: main + activeDeadlineSeconds: 1 + hooks: + exit: + template: exit-handler + templates: + - name: main + steps: + - - name: sleep + template: sleep + - name: exit-handler + steps: + - - name: sleep + template: sleep + - name: sleep + container: + image: argoproj/argosay:v2 + args: ["sleep", "5"] +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeCompleted). + WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) { + onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name) + onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName) + return onExitNode.Completed(), "exit handler completed" + })). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return status.DisplayName == onExitNodeName + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, true, status.NodeFlag.Hooked) + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + })) +} + +func (s *HooksSuite) TestHttpExitHandlerWithWorkflowLevelDeadline() { + var onExitNodeName string + (s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: http-exit-handler-with-workflow-level-deadline +spec: + entrypoint: main + activeDeadlineSeconds: 1 + hooks: + exit: + template: exit-handler + templates: + - name: main + steps: + - - name: sleep + template: sleep + - name: sleep + container: + image: argoproj/argosay:v2 + args: ["sleep", "5"] + - name: exit-handler + steps: + - - name: http + template: http + - name: http + http: + url: http://dummy.restapiexample.com/api/v1/employees +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeCompleted). + WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) { + onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name) + onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName) + return onExitNode.Completed(), "exit handler completed" + })). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return status.DisplayName == onExitNodeName + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, true, status.NodeFlag.Hooked) + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + })) +} + func TestHooksSuite(t *testing.T) { suite.Run(t, new(HooksSuite)) } diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index a58ac0376776..b341fa343e2e 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -96,7 +96,7 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc // if node is a pod created from ContainerSet template // then need to fail child nodes so they will not hang in Pending after pod deletion for _, child := range children { - if !child.IsExitNode() && !child.Fulfilled() { + if !child.Fulfilled() { woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg) } } diff --git a/workflow/controller/exit_handler_test.go b/workflow/controller/exit_handler_test.go index 62dde6a74570..336a9a68fde3 100644 --- a/workflow/controller/exit_handler_test.go +++ b/workflow/controller/exit_handler_test.go @@ -828,8 +828,6 @@ status: type: Steps hello-world-647r7-1045616760: boundaryID: hello-world-647r7-206029318 - children: - - hello-world-647r7-370991976 displayName: '[0]' finishedAt: null id: hello-world-647r7-1045616760 diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 84279172351a..c0f6fa2fe328 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -306,7 +306,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { woc.workflowDeadline = woc.getWorkflowDeadline() err, podReconciliationCompleted := woc.podReconciliation(ctx) if err == nil { - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + // Execution control has been applied to the nodes with created pods after pod reconciliation. + // However, pending and suspended nodes do not have created pods, and taskset nodes use the agent pod. + // Apply execution control to these nodes now since pod reconciliation does not take effect on them. + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() } if err != nil { @@ -424,7 +427,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { if err != nil { woc.markNodeError(node.Name, err) } - // Reconcile TaskSet and Agent for HTTP templates when is not shutdown + // Reconcile TaskSet and Agent for HTTP/Plugin templates when is not shutdown if !woc.execWf.Spec.Shutdown.Enabled() { woc.taskSetReconciliation(ctx) } @@ -1267,18 +1270,26 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool { (woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError()) } -func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() { - for _, node := range woc.wf.Status.Nodes { - // fail suspended nodes or plugin nodes when shuting down - if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsActivePluginNode()) { - message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) - woc.markNodePhase(node.Name, wfv1.NodeFailed, message) +// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline. +func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() { + nodes := woc.wf.Status.Nodes + for _, node := range nodes { + if node.Fulfilled() { continue } + // Only fail nodes that are not part of exit handler if we are "Stopping" or all pods if we are "Terminating" + if woc.GetShutdownStrategy().Enabled() && !woc.GetShutdownStrategy().ShouldExecute(node.IsPartOfExitHandler(nodes)) { + // fail suspended nodes or taskset nodes when shutting down + if node.IsActiveSuspendNode() || node.IsTaskSetNode() { + message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) + woc.markNodePhase(node.Name, wfv1.NodeFailed, message) + continue + } + } - // fail all pending and suspended nodes when exceeding deadline + // fail pending and suspended nodes that are not part of exit handler when exceeding deadline deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) - if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) { + if deadlineExceeded && !node.IsPartOfExitHandler(nodes) && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) { message := "Step exceeded its deadline" woc.markNodePhase(node.Name, wfv1.NodeFailed, message) continue diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index b8b007be17fb..7126a1874655 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -10056,7 +10056,7 @@ func TestRetryLoopWithOutputParam(t *testing.T) { assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) } -var workflowShuttingDownWithNodesInPendingAfterReconsiliation = `apiVersion: argoproj.io/v1alpha1 +var workflowShuttingDownWithNodesInPendingAfterReconciliation = `apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: @@ -10153,12 +10153,12 @@ status: type: Container ` -func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { +func TestFailNodesWithoutCreatedPodsAfterDeadlineOrShutdown(t *testing.T) { cancel, controller := newController() defer cancel() t.Run("Shutdown", func(t *testing.T) { - workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation) + workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation) woc := newWorkflowOperationCtx(workflow, controller) woc.execWf.Spec.Shutdown = "Terminate" @@ -10186,14 +10186,14 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step2NodeName].Phase) - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase) }) t.Run("Deadline", func(t *testing.T) { - workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation) + workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation) woc := newWorkflowOperationCtx(workflow, controller) woc.execWf.Spec.Shutdown = "" @@ -10221,7 +10221,7 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase) - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase) diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 2c59cc00e695..de244c139f47 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) { deletedNode := make(map[string]interface{}) for _, node := range woc.wf.Status.Nodes { - if taskSetNode(node) && node.Fulfilled() { + if node.IsTaskSetNode() && node.Fulfilled() { deletedNode[node.ID] = nil } } @@ -55,18 +55,16 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in func (woc *wfOperationCtx) markTaskSetNodesError(err error) { for _, node := range woc.wf.Status.Nodes { - if taskSetNode(node) && !node.Fulfilled() { + if node.IsTaskSetNode() && !node.Fulfilled() { woc.markNodeError(node.Name, err) } } } -func taskSetNode(n wfv1.NodeStatus) bool { - return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin -} - func (woc *wfOperationCtx) hasTaskSetNodes() bool { - return woc.wf.Status.Nodes.Any(taskSetNode) + return woc.wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool { + return node.IsTaskSetNode() + }) } func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) error { @@ -116,7 +114,7 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo return false } // If this node is of type HTTP, it will need an HTTP reconciliation - if taskSetNode(*node) { + if node.IsTaskSetNode() { return true } for _, child := range node.Children { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 1b68c19f5966..52690df7ffed 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -565,7 +565,7 @@ func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time { deadline := time.Time{} - if woc.workflowDeadline != nil { + if woc.workflowDeadline != nil && !opts.onExitPod { deadline = *woc.workflowDeadline } if !opts.executionDeadline.IsZero() && (deadline.IsZero() || opts.executionDeadline.Before(deadline)) {