Skip to content

Commit

Permalink
fix: Mark non-fulfilled taskSetNodes error when agent pod failed. Fixes
Browse files Browse the repository at this point in the history
#12703

Signed-off-by: oninowang <[email protected]>
  • Loading branch information
jswxstw authored and oninowang committed Mar 1, 2024
1 parent 33ad82f commit de0a271
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 58 deletions.
16 changes: 8 additions & 8 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func (woc *wfOperationCtx) reconcileAgentPod(ctx context.Context) error {
}
// Check Pod is just created
if pod.Status.Phase != "" {
woc.updateAgentPodStatus(ctx, pod)
woc.updateAgentPodStatus(pod)
}
return nil
}

func (woc *wfOperationCtx) updateAgentPodStatus(ctx context.Context, pod *apiv1.Pod) {
func (woc *wfOperationCtx) updateAgentPodStatus(pod *apiv1.Pod) {
woc.log.Info("updateAgentPodStatus")
newPhase, message := assessAgentPodStatus(pod)
if newPhase == wfv1.WorkflowFailed || newPhase == wfv1.WorkflowError {
woc.markWorkflowError(ctx, fmt.Errorf("agent pod failed with reason %s", message))
if newPhase == wfv1.NodeFailed || newPhase == wfv1.NodeError {
woc.markTaskSetNodesError(fmt.Errorf(`agent pod failed with reason:"%s"`, message))
}
}

func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {
var newPhase wfv1.WorkflowPhase
func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.NodePhase, string) {
var newPhase wfv1.NodePhase
var message string
log.WithField("namespace", pod.Namespace).
WithField("podName", pod.Name).
Expand All @@ -63,10 +63,10 @@ func assessAgentPodStatus(pod *apiv1.Pod) (wfv1.WorkflowPhase, string) {
case apiv1.PodSucceeded, apiv1.PodRunning, apiv1.PodPending:
return "", ""
case apiv1.PodFailed:
newPhase = wfv1.WorkflowFailed
newPhase = wfv1.NodeFailed
message = pod.Status.Message
default:
newPhase = wfv1.WorkflowError
newPhase = wfv1.NodeError
message = fmt.Sprintf("Unexpected pod phase for %s: %s", pod.ObjectMeta.Name, pod.Status.Phase)
}
return newPhase, message
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestAssessAgentPodStatus(t *testing.T) {
Status: apiv1.PodStatus{Phase: apiv1.PodFailed},
}
nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowFailed, nodeStatus)
assert.Equal(t, wfv1.NodeFailed, nodeStatus)
assert.Equal(t, "", msg)
})
t.Run("Running", func(t *testing.T) {
Expand All @@ -156,15 +156,15 @@ func TestAssessAgentPodStatus(t *testing.T) {
}

nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus)
assert.Equal(t, wfv1.NodePhase(""), nodeStatus)
assert.Equal(t, "", msg)
})
t.Run("Success", func(t *testing.T) {
pod1 := &apiv1.Pod{
Status: apiv1.PodStatus{Phase: apiv1.PodSucceeded},
}
nodeStatus, msg := assessAgentPodStatus(pod1)
assert.Equal(t, wfv1.WorkflowPhase(""), nodeStatus)
assert.Equal(t, wfv1.NodePhase(""), nodeStatus)
assert.Equal(t, "", msg)
})

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) (error, bool)
return
}
if woc.isAgentPod(pod) {
woc.updateAgentPodStatus(ctx, pod)
woc.updateAgentPodStatus(pod)
return
}
nodeID := woc.nodeID(pod)
Expand Down
92 changes: 48 additions & 44 deletions workflow/controller/operator_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package controller

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

var httpwf = `apiVersion: argoproj.io/v1alpha1
Expand All @@ -22,61 +23,64 @@ spec:
- name: http
http:
url: https://www.google.com/
`

var taskSet = `apiVersion: argoproj.io/v1alpha1
kind: WorkflowTaskSet
metadata:
creationTimestamp: "2021-04-23T21:49:05Z"
generation: 1
name: hello-world
namespace: default
ownerReferences:
- apiVersion: argoproj.io/v1alpha1
kind: Workflow
name: hello-world
uid: 0b451726-8ddd-4ba3-8d69-c3b5b43e93a3
resourceVersion: "11581184"
selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflowtasksets/hello-world
uid: b80385b8-8b72-4f13-af6d-f429a2cad443
spec:
tasks:
http-template-nxvtg-1265710817:
http:
url: http://www.google.com
status:
nodes:
hello-world:
phase: Succeed
outputs:
parameters:
- name: test
value: "welcome"
`

func TestHTTPTemplate(t *testing.T) {
var ts v1alpha1.WorkflowTaskSet
err := yaml.UnmarshalStrict([]byte(taskSet), &ts)
wf := v1alpha1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf, ts)
wf := wfv1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf, defaultServiceAccount)
defer cancel()

assert.NoError(t, err)
t.Run("ExecuteHTTPTemplate", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
pods, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).List(ctx, metav1.ListOptions{})
pod, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{})
assert.NoError(t, err)
for _, pod := range pods.Items {
assert.Equal(t, pod.Name, "hello-world-1340600742-agent")
}
// tss, err :=controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).List(ctx, metav1.ListOptions{})
assert.NotNil(t, pod)
ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Len(t, ts.Spec.Tasks, 1)

// simulate a scenario where the agent pod failed.
pod.Status.Phase = v1.PodFailed
pod.Status.Message = "manual termination"
pod, err = controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
assert.Nil(t, err)
assert.Equal(t, v1.PodFailed, pod.Status.Phase)
woc.operate(ctx)
assert.Equal(t, wfv1.WorkflowError, woc.wf.Status.Phase)
assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Message)
assert.Len(t, woc.wf.Status.Nodes, 1)
assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase)
assert.Equal(t, `agent pod failed with reason:"manual termination"`, woc.wf.Status.Nodes["hello-world"].Message)
ts, err = controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Empty(t, ts.Spec.Tasks)
assert.Empty(t, ts.Status.Nodes)
})
}

func TestHTTPTemplateWithoutServiceAccount(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(httpwf)
cancel, controller := newController(wf)
defer cancel()

t.Run("ExecuteHTTPTemplateWithoutServiceAccount", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
_, err := controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(ctx, woc.getAgentPodName(), metav1.GetOptions{})
assert.Error(t, err, fmt.Sprintf(`pods "%s" not found`, woc.getAgentPodName()))
ts, err := controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(wf.Namespace).Get(ctx, "hello-world", metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, ts)
assert.Empty(t, ts.Spec.Tasks)
assert.Empty(t, ts.Status.Nodes)
assert.Len(t, woc.wf.Status.Nodes, 1)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.NodeError, woc.wf.Status.Nodes["hello-world"].Phase)
assert.Equal(t, `create agent pod failed with reason:"failed to get token volumes: serviceaccounts "default" not found"`, woc.wf.Status.Nodes["hello-world"].Message)
})
}
13 changes: 11 additions & 2 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interfac
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() {
if taskSetNode(node) && node.Fulfilled() {
deletedNode[node.ID] = nil
}
}
Expand All @@ -52,6 +52,15 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]in
}
return
}

func (woc *wfOperationCtx) markTaskSetNodesError(err error) {
for _, node := range woc.wf.Status.Nodes {
if taskSetNode(node) && !node.Fulfilled() {
woc.markNodeError(node.Name, err)
}
}
}

func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
}
Expand Down Expand Up @@ -96,7 +105,7 @@ func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) {
}
if err := woc.reconcileAgentPod(ctx); err != nil {
woc.log.WithError(err).Error("error in agent pod reconciliation")
woc.markWorkflowError(ctx, err)
woc.markTaskSetNodesError(fmt.Errorf(`create agent pod failed with reason:"%s"`, err))
return
}
}
Expand Down

0 comments on commit de0a271

Please sign in to comment.