Skip to content

Commit

Permalink
fix: Skip execution control for nodes of exit handler. Fixes #13060 #…
Browse files Browse the repository at this point in the history
…13252

Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed Jun 27, 2024
1 parent 46728cf commit c86a807
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 12 deletions.
96 changes: 96 additions & 0 deletions test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type HooksSuite struct {
Expand Down Expand Up @@ -777,6 +778,101 @@ spec:
})
}

func (s *HooksSuite) TestExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: exit-handler
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func (s *HooksSuite) TestHttpExitHandlerWithWorkflowLevelDeadline() {
var onExitNodeName string
(s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: http-exit-handler-with-workflow-level-deadline
spec:
entrypoint: main
activeDeadlineSeconds: 1
hooks:
exit:
template: exit-handler
templates:
- name: main
steps:
- - name: sleep
template: sleep
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "5"]
- name: exit-handler
steps:
- - name: http
template: http
- name: http
http:
url: http://dummy.restapiexample.com/api/v1/employees
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeCompleted).
WaitForWorkflow(fixtures.Condition(func(wf *v1alpha1.Workflow) (bool, string) {
onExitNodeName = common.GenerateOnExitNodeName(wf.ObjectMeta.Name)
onExitNode := wf.Status.Nodes.FindByDisplayName(onExitNodeName)
return onExitNode.Completed(), "exit handler completed"
})).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return status.DisplayName == onExitNodeName
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, true, status.NodeFlag.Hooked)
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}))
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (woc *wfOperationCtx) handleExecutionControlError(nodeID string, wfNodesLoc
// if node is a pod created from ContainerSet template
// then need to fail child nodes so they will not hang in Pending after pod deletion
for _, child := range children {
if !child.IsExitNode() && !child.Fulfilled() {
if !child.Fulfilled() {
woc.markNodePhase(child.Name, wfv1.NodeFailed, errorMsg)
}
}
Expand Down
2 changes: 0 additions & 2 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,6 @@ status:
type: Steps
hello-world-647r7-1045616760:
boundaryID: hello-world-647r7-206029318
children:
- hello-world-647r7-370991976
displayName: '[0]'
finishedAt: null
id: hello-world-647r7-1045616760
Expand Down
20 changes: 12 additions & 8 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,20 +1272,24 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool {

// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline.
func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() {
for _, node := range woc.wf.Status.Nodes {
nodes := woc.wf.Status.Nodes
for _, node := range nodes {
if node.Fulfilled() {
continue
}
// fail suspended nodes or taskset nodes when shutting down
if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsTaskSetNode()) {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
// Only fail nodes that are not part of exit handler if we are "Stopping" or all pods if we are "Terminating"
if woc.GetShutdownStrategy().Enabled() && !woc.GetShutdownStrategy().ShouldExecute(node.IsPartOfExitHandler(nodes)) {
// fail suspended nodes or taskset nodes when shutting down
if node.IsActiveSuspendNode() || node.IsTaskSetNode() {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
}
}

// fail all pending and suspended nodes when exceeding deadline
// fail pending and suspended nodes that are not part of exit handler when exceeding deadline
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if deadlineExceeded && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
if deadlineExceeded && !node.IsPartOfExitHandler(nodes) && (node.Phase == wfv1.NodePending || node.IsActiveSuspendNode()) {
message := "Step exceeded its deadline"
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists

func (woc *wfOperationCtx) getDeadline(opts *createWorkflowPodOpts) *time.Time {
deadline := time.Time{}
if woc.workflowDeadline != nil {
if woc.workflowDeadline != nil && !opts.onExitPod {
deadline = *woc.workflowDeadline
}
if !opts.executionDeadline.IsZero() && (deadline.IsZero() || opts.executionDeadline.Before(deadline)) {
Expand Down

0 comments on commit c86a807

Please sign in to comment.