Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Only apply execution control to nodes that are not part of exit handler. #13016

Merged
merged 3 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,9 +2381,25 @@ func (n NodeStatus) IsDaemoned() bool {
return true
}

// IsPartOfExitHandler returns whether node is part of exit handler.
func (n *NodeStatus) IsPartOfExitHandler(nodes Nodes) bool {
currentNode := n
for !currentNode.IsExitNode() {
if currentNode.BoundaryID == "" {
return false
}
boundaryNode, err := nodes.Get(currentNode.BoundaryID)
if err != nil {
log.Panicf("was unable to obtain node for %s", currentNode.BoundaryID)
}
currentNode = boundaryNode
}
return true
}

// IsExitNode returns whether or not node run as exit handler.
func (ws NodeStatus) IsExitNode() bool {
return strings.HasSuffix(ws.DisplayName, ".onExit")
func (n NodeStatus) IsExitNode() bool {
return strings.HasSuffix(n.DisplayName, ".onExit")
}

func (n NodeStatus) Succeeded() bool {
Expand Down Expand Up @@ -2452,9 +2468,9 @@ func (n *NodeStatus) IsActiveSuspendNode() bool {
return n.Type == NodeTypeSuspend && n.Phase == NodeRunning
}

// IsActivePluginNode returns whether this node is an active plugin node
func (n *NodeStatus) IsActivePluginNode() bool {
return n.Type == NodeTypePlugin && (n.Phase == NodeRunning || n.Phase == NodePending)
// IsTaskSetNode returns whether this node uses the taskset
func (n *NodeStatus) IsTaskSetNode() bool {
return n.Type == NodeTypeHTTP || n.Type == NodeTypePlugin
}

func (n NodeStatus) GetDuration() time.Duration {
Expand Down
96 changes: 96 additions & 0 deletions test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type HooksSuite struct {
Expand Down Expand Up @@ -776,6 +777,101 @@ spec:
})
}

func (s *HooksSuite) TestExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: exit-handler
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func (s *HooksSuite) TestHttpExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: http-exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
- name: exit-handler
steps:
- - name: http
template: http
- name: http
http:
url: http://dummy.restapiexample.com/api/v1/employees
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc
// if node is a pod created from ContainerSet template
// then need to fail child nodes so they will not hang in Pending after pod deletion
for _, child := range children {
if !child.IsExitNode() && !child.Fulfilled() {
if !child.Fulfilled() {
woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg)
}
}
Expand Down
2 changes: 0 additions & 2 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,6 @@ status:
type: Steps
hello-world-647r7-1045616760:
boundaryID: hello-world-647r7-206029318
children:
- hello-world-647r7-370991976
displayName: '[0]'
finishedAt: null
id: hello-world-647r7-1045616760
Expand Down
31 changes: 21 additions & 10 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.workflowDeadline = woc.getWorkflowDeadline()
err, podReconciliationCompleted := woc.podReconciliation(ctx)
if err == nil {
woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
// Execution control has been applied to the nodes with created pods after pod reconciliation.
// However, pending and suspended nodes do not have created pods, and taskset nodes use the agent pod.
// Apply execution control to these nodes now since pod reconciliation does not take effect on them.
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()
}

if err != nil {
Expand Down Expand Up @@ -424,7 +427,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
if err != nil {
woc.markNodeError(node.Name, err)
}
// Reconcile TaskSet and Agent for HTTP templates when is not shutdown
// Reconcile TaskSet and Agent for HTTP/Plugin templates when is not shutdown
if !woc.execWf.Spec.Shutdown.Enabled() {
woc.taskSetReconciliation(ctx)
}
Expand Down Expand Up @@ -1267,18 +1270,26 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool {
(woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError())
}

func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
for _, node := range woc.wf.Status.Nodes {
// fail suspended nodes or plugin nodes when shuting down
if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsActivePluginNode()) {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline.
func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() {
nodes := woc.wf.Status.Nodes
for _, node := range nodes {
if node.Fulfilled() {
continue
}
// Only fail nodes that are not part of exit handler if we are "Stopping" or all pods if we are "Terminating"
if woc.GetShutdownStrategy().Enabled() && !woc.GetShutdownStrategy().ShouldExecute(node.IsPartOfExitHandler(nodes)) {
// fail suspended nodes or taskset nodes when shutting down
if node.IsActiveSuspendNode() || node.IsTaskSetNode() {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
}
}

// fail all pending and suspended nodes when exceeding deadline
// fail pending and suspended nodes that are not part of exit handler when exceeding deadline
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
if deadlineExceeded && !node.IsPartOfExitHandler(nodes) && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
message := "Step exceeded its deadline"
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10056,7 +10056,7 @@ func TestRetryLoopWithOutputParam(t *testing.T) {
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var workflowShuttingDownWithNodesInPendingAfterReconsiliation = `apiVersion: argoproj.io/v1alpha1
var workflowShuttingDownWithNodesInPendingAfterReconciliation = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
Expand Down Expand Up @@ -10153,12 +10153,12 @@ status:
type: Container
`

func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
func TestFailNodesWithoutCreatedPodsAfterDeadlineOrShutdown(t *testing.T) {
cancel, controller := newController()
defer cancel()

t.Run("Shutdown", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = "Terminate"
Expand Down Expand Up @@ -10186,14 +10186,14 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
})

t.Run("Deadline", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = ""
Expand Down Expand Up @@ -10221,7 +10221,7 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
Expand Down
14 changes: 6 additions & 8 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && node.Fulfilled() {
if node.IsTaskSetNode() && node.Fulfilled() {
deletedNode[node.ID] = nil
}
}
Expand All @@ -55,18 +55,16 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in

func (woc *wfOperationCtx) markTaskSetNodesError(err error) {
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && !node.Fulfilled() {
if node.IsTaskSetNode() && !node.Fulfilled() {
woc.markNodeError(node.Name, err)
}
}
}

func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
}

func (woc *wfOperationCtx) hasTaskSetNodes() bool {
return woc.wf.Status.Nodes.Any(taskSetNode)
return woc.wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.IsTaskSetNode()
})
}

func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) error {
Expand Down Expand Up @@ -116,7 +114,7 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo
return false
}
// If this node is of type HTTP, it will need an HTTP reconciliation
if taskSetNode(*node) {
if node.IsTaskSetNode() {
return true
}
for _, child := range node.Children {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists

func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time {
deadline := time.Time{}
if woc.workflowDeadline != nil {
if woc.workflowDeadline != nil && !opts.onExitPod {
deadline = *woc.workflowDeadline
}
if !opts.executionDeadline.IsZero() && (deadline.IsZero() || opts.executionDeadline.Before(deadline)) {
Expand Down
Loading