From c231df4ea6c4503c697f205d5bc52bb881bde410 Mon Sep 17 00:00:00 2001 From: jswxstw Date: Sun, 11 Aug 2024 10:23:49 +0800 Subject: [PATCH] fix: Only apply execution control to nodes that are not part of exit handler. (#13016) Signed-off-by: jswxstw Signed-off-by: oninowang --- pkg/apis/workflow/v1alpha1/workflow_types.go | 25 ++++- test/e2e/hooks_test.go | 96 ++++++++++++++++++++ workflow/controller/exec_control.go | 2 +- workflow/controller/exit_handler_test.go | 2 - workflow/controller/operator.go | 32 +++++-- workflow/controller/operator_test.go | 12 +-- workflow/controller/taskset.go | 14 ++- workflow/controller/workflowpod.go | 2 +- 8 files changed, 155 insertions(+), 30 deletions(-) diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index 1b2d6320eb3c..3fa1e829774f 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -2377,9 +2377,25 @@ func (n NodeStatus) IsDaemoned() bool { return true } +// IsPartOfExitHandler returns whether node is part of exit handler. +func (n *NodeStatus) IsPartOfExitHandler(nodes Nodes) bool { + currentNode := n + for !currentNode.IsExitNode() { + if currentNode.BoundaryID == "" { + return false + } + boundaryNode, err := nodes.Get(currentNode.BoundaryID) + if err != nil { + log.Panicf("was unable to obtain node for %s", currentNode.BoundaryID) + } + currentNode = boundaryNode + } + return true +} + // IsExitNode returns whether or not node run as exit handler. -func (ws NodeStatus) IsExitNode() bool { - return strings.HasSuffix(ws.DisplayName, ".onExit") +func (n NodeStatus) IsExitNode() bool { + return strings.HasSuffix(n.DisplayName, ".onExit") } func (n NodeStatus) Succeeded() bool { @@ -2448,6 +2464,11 @@ func (n *NodeStatus) IsActiveSuspendNode() bool { return n.Type == NodeTypeSuspend && n.Phase == NodeRunning } +// IsTaskSetNode returns whether this node uses the taskset +func (n *NodeStatus) IsTaskSetNode() bool { + return n.Type == NodeTypeHTTP || n.Type == NodeTypePlugin +} + func (n NodeStatus) GetDuration() time.Duration { if n.FinishedAt.IsZero() { return 0 diff --git a/test/e2e/hooks_test.go b/test/e2e/hooks_test.go index 5090fc980fd9..3325f7c47a7c 100644 --- a/test/e2e/hooks_test.go +++ b/test/e2e/hooks_test.go @@ -14,6 +14,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) type HooksSuite struct { @@ -774,6 +775,101 @@ spec: }) } +func (s *HooksSuite) TestExitHandlerWithWorkflowLevelDeadline() { + var onExitNodeName string + (s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: exit-handler-with-workflow-level-deadline +spec: + entrypoint: main + activeDeadlineSeconds: 1 + hooks: + exit: + template: exit-handler + templates: + - name: main + steps: + - - name: sleep + template: sleep + - name: exit-handler + steps: + - - name: sleep + template: sleep + - name: sleep + container: + image: argoproj/argosay:v2 + args: ["sleep", "5"] +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeCompleted). + WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) { + onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name) + onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName) + return onExitNode.Completed(), "exit handler completed" + })). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return status.DisplayName == onExitNodeName + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, true, status.NodeFlag.Hooked) + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + })) +} + +func (s *HooksSuite) TestHttpExitHandlerWithWorkflowLevelDeadline() { + var onExitNodeName string + (s.Given(). + Workflow(`apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: http-exit-handler-with-workflow-level-deadline +spec: + entrypoint: main + activeDeadlineSeconds: 1 + hooks: + exit: + template: exit-handler + templates: + - name: main + steps: + - - name: sleep + template: sleep + - name: sleep + container: + image: argoproj/argosay:v2 + args: ["sleep", "5"] + - name: exit-handler + steps: + - - name: http + template: http + - name: http + http: + url: http://dummy.restapiexample.com/api/v1/employees +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeCompleted). + WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) { + onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name) + onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName) + return onExitNode.Completed(), "exit handler completed" + })). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return status.DisplayName == onExitNodeName + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, true, status.NodeFlag.Hooked) + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + })) +} + func TestHooksSuite(t *testing.T) { suite.Run(t, new(HooksSuite)) } diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index a58ac0376776..b341fa343e2e 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -96,7 +96,7 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc // if node is a pod created from ContainerSet template // then need to fail child nodes so they will not hang in Pending after pod deletion for _, child := range children { - if !child.IsExitNode() && !child.Fulfilled() { + if !child.Fulfilled() { woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg) } } diff --git a/workflow/controller/exit_handler_test.go b/workflow/controller/exit_handler_test.go index daed6956a6b2..799793ed040f 100644 --- a/workflow/controller/exit_handler_test.go +++ b/workflow/controller/exit_handler_test.go @@ -828,8 +828,6 @@ status: type: Steps hello-world-647r7-1045616760: boundaryID: hello-world-647r7-206029318 - children: - - hello-world-647r7-370991976 displayName: '[0]' finishedAt: null id: hello-world-647r7-1045616760 diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index fb8a3cd42809..2fbe3084baa4 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -350,7 +350,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { woc.workflowDeadline = woc.getWorkflowDeadline() err, podReconciliationCompleted := woc.podReconciliation(ctx) if err == nil { - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + // Execution control has been applied to the nodes with created pods after pod reconciliation. + // However, pending and suspended nodes do not have created pods, and taskset nodes use the agent pod. + // Apply execution control to these nodes now since pod reconciliation does not take effect on them. + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() } if err != nil { @@ -468,7 +471,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { if err != nil { woc.markNodeError(node.Name, err) } - // Reconcile TaskSet and Agent for HTTP templates + + // Reconcile TaskSet and Agent for HTTP/Plugin templates woc.taskSetReconciliation(ctx) // Check all hooks are completes @@ -1307,18 +1311,26 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool { (woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError()) } -func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() { - for _, node := range woc.wf.Status.Nodes { - // fail suspended nodes when shuting down - if woc.GetShutdownStrategy().Enabled() && node.IsActiveSuspendNode() { - message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) - woc.markNodePhase(node.Name, wfv1.NodeFailed, message) +// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline. +func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() { + nodes := woc.wf.Status.Nodes + for _, node := range nodes { + if node.Fulfilled() { continue } + // Only fail nodes that are not part of exit handler if we are "Stopping" or all pods if we are "Terminating" + if woc.GetShutdownStrategy().Enabled() && !woc.GetShutdownStrategy().ShouldExecute(node.IsPartOfExitHandler(nodes)) { + // fail suspended nodes or taskset nodes when shutting down + if node.IsActiveSuspendNode() || node.IsTaskSetNode() { + message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy()) + woc.markNodePhase(node.Name, wfv1.NodeFailed, message) + continue + } + } - // fail all pending and suspended nodes when exceeding deadline + // fail pending and suspended nodes that are not part of exit handler when exceeding deadline deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) - if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) { + if deadlineExceeded && !node.IsPartOfExitHandler(nodes) && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) { message := "Step exceeded its deadline" woc.markNodePhase(node.Name, wfv1.NodeFailed, message) continue diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 960652b909f6..29d60c9e150f 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -9867,7 +9867,7 @@ func TestRetryLoopWithOutputParam(t *testing.T) { assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) } -var workflowShuttingDownWithNodesInPendingAfterReconsiliation = `apiVersion: argoproj.io/v1alpha1 +var workflowShuttingDownWithNodesInPendingAfterReconciliation = `apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: annotations: @@ -9964,12 +9964,12 @@ status: type: Container ` -func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { +func TestFailNodesWithoutCreatedPodsAfterDeadlineOrShutdown(t *testing.T) { cancel, controller := newController() defer cancel() t.Run("Shutdown", func(t *testing.T) { - workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation) + workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation) woc := newWorkflowOperationCtx(workflow, controller) woc.execWf.Spec.Shutdown = "Terminate" @@ -9997,14 +9997,14 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step2NodeName].Phase) - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase) }) t.Run("Deadline", func(t *testing.T) { - workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation) + workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation) woc := newWorkflowOperationCtx(workflow, controller) woc.execWf.Spec.Shutdown = "" @@ -10032,7 +10032,7 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) { assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase) - woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown() + woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase) assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase) diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index 2c59cc00e695..de244c139f47 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) { deletedNode := make(map[string]interface{}) for _, node := range woc.wf.Status.Nodes { - if taskSetNode(node) && node.Fulfilled() { + if node.IsTaskSetNode() && node.Fulfilled() { deletedNode[node.ID] = nil } } @@ -55,18 +55,16 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in func (woc *wfOperationCtx) markTaskSetNodesError(err error) { for _, node := range woc.wf.Status.Nodes { - if taskSetNode(node) && !node.Fulfilled() { + if node.IsTaskSetNode() && !node.Fulfilled() { woc.markNodeError(node.Name, err) } } } -func taskSetNode(n wfv1.NodeStatus) bool { - return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin -} - func (woc *wfOperationCtx) hasTaskSetNodes() bool { - return woc.wf.Status.Nodes.Any(taskSetNode) + return woc.wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool { + return node.IsTaskSetNode() + }) } func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) error { @@ -116,7 +114,7 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo return false } // If this node is of type HTTP, it will need an HTTP reconciliation - if taskSetNode(*node) { + if node.IsTaskSetNode() { return true } for _, child := range node.Children { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 1d467e594059..16e87e9a51d3 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -479,7 +479,7 @@ func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time { deadline := time.Time{} - if woc.workflowDeadline != nil { + if woc.workflowDeadline != nil && !opts.onExitPod { deadline = *woc.workflowDeadline } if !opts.executionDeadline.IsZero() && (deadline.IsZero() || opts.executionDeadline.Before(deadline)) {