Skip to content

Commit

Permalink
fix: Apply execution control to taskset nodes.
Browse files Browse the repository at this point in the history
Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed May 9, 2024
1 parent 7e26fa4 commit 6cba1c9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
6 changes: 3 additions & 3 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2449,9 +2449,9 @@ func (n *NodeStatus) IsActiveSuspendNode() bool {
return n.Type == NodeTypeSuspend && n.Phase == NodeRunning
}

// IsActivePluginNode returns whether this node is an active plugin node
func (n *NodeStatus) IsActivePluginNode() bool {
return n.Type == NodeTypePlugin && (n.Phase == NodeRunning || n.Phase == NodePending)
// IsTaskSetNode returns whether this node uses the taskset
func (n *NodeStatus) IsTaskSetNode() bool {
return n.Type == NodeTypeHTTP || n.Type == NodeTypePlugin
}

func (n NodeStatus) GetDuration() time.Duration {
Expand Down
17 changes: 12 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.workflowDeadline = woc.getWorkflowDeadline()
err, podReconciliationCompleted := woc.podReconciliation(ctx)
if err == nil {
woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
// Execution control has been applied to the nodes with created pods after pod reconciliation.
// However, pending and suspended nodes do not have created pods, and taskset nodes use the agent pod.
// Apply execution control to these nodes now since pod reconciliation does not take effect on them.
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()
}

if err != nil {
Expand Down Expand Up @@ -424,7 +427,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
if err != nil {
woc.markNodeError(node.Name, err)
}
// Reconcile TaskSet and Agent for HTTP templates when is not shutdown
// Reconcile TaskSet and Agent for HTTP/Plugin templates when is not shutdown
if !woc.execWf.Spec.Shutdown.Enabled() {
woc.taskSetReconciliation(ctx)
}
Expand Down Expand Up @@ -1262,10 +1265,14 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node *wfv1.NodeStatus) bool {
(woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError())
}

func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
// failNodesWithoutCreatedPodsAfterDeadlineOrShutdown mark the nodes without created pods failed when shutting down or exceeding deadline.
func (woc *wfOperationCtx) failNodesWithoutCreatedPodsAfterDeadlineOrShutdown() {
for _, node := range woc.wf.Status.Nodes {
// fail suspended nodes or plugin nodes when shuting down
if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsActivePluginNode()) {
if node.Fulfilled() {
continue
}
// fail suspended nodes or taskset nodes when shutting down
if woc.GetShutdownStrategy().Enabled() && (node.IsActiveSuspendNode() || node.IsTaskSetNode()) {
message := fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
continue
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9846,7 +9846,7 @@ func TestRetryLoopWithOutputParam(t *testing.T) {
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var workflowShuttingDownWithNodesInPendingAfterReconsiliation = `apiVersion: argoproj.io/v1alpha1
var workflowShuttingDownWithNodesInPendingAfterReconciliation = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
Expand Down Expand Up @@ -9943,12 +9943,12 @@ status:
type: Container
`

func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
func TestFailNodesWithoutCreatedPodsAfterDeadlineOrShutdown(t *testing.T) {
cancel, controller := newController()
defer cancel()

t.Run("Shutdown", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = "Terminate"
Expand Down Expand Up @@ -9976,14 +9976,14 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
})

t.Run("Deadline", func(t *testing.T) {
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconsiliation)
workflow := wfv1.MustUnmarshalWorkflow(workflowShuttingDownWithNodesInPendingAfterReconciliation)
woc := newWorkflowOperationCtx(workflow, controller)

woc.execWf.Spec.Shutdown = ""
Expand Down Expand Up @@ -10011,7 +10011,7 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodePending, woc.wf.Status.Nodes[step2NodeName].Phase)

woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
woc.failNodesWithoutCreatedPodsAfterDeadlineOrShutdown()

assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Nodes[step1NodeName].Phase)
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
Expand Down
11 changes: 5 additions & 6 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() {
if node.IsTaskSetNode() && node.Fulfilled() {
deletedNode[node.ID] = nil
}
}
Expand All @@ -52,12 +52,11 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in
}
return
}
func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
}

func (woc *wfOperationCtx) hasTaskSetNodes() bool {
return woc.wf.Status.Nodes.Any(taskSetNode)
return woc.wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.IsTaskSetNode()
})
}

func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) error {
Expand Down Expand Up @@ -107,7 +106,7 @@ func (woc *wfOperationCtx) nodeRequiresTaskSetReconciliation(nodeName string) bo
return false
}
// If this node is of type HTTP, it will need an HTTP reconciliation
if taskSetNode(*node) {
if node.IsTaskSetNode() {
return true
}
for _, child := range node.Children {
Expand Down

0 comments on commit 6cba1c9

Please sign in to comment.