diff --git a/test/e2e/executor_plugins_test.go b/test/e2e/executor_plugins_test.go index 2506a479149e..081346bf1b74 100644 --- a/test/e2e/executor_plugins_test.go +++ b/test/e2e/executor_plugins_test.go @@ -14,6 +14,7 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/test/e2e/fixtures" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) type ExecutorPluginsSuite struct { @@ -76,6 +77,12 @@ func (s *ExecutorPluginsSuite) TestTemplateExecutor() { } } } + }). + ExpectWorkflowTaskSet(func(t *testing.T, wfts *wfv1.WorkflowTaskSet) { + assert.NotNil(t, wfts) + assert.Len(t, wfts.Spec.Tasks, 0) + assert.Len(t, wfts.Status.Nodes, 0) + assert.Equal(t, "true", wfts.Labels[common.LabelKeyCompleted]) }) } diff --git a/test/e2e/fixtures/e2e_suite.go b/test/e2e/fixtures/e2e_suite.go index ea210dff3927..be083eea1101 100644 --- a/test/e2e/fixtures/e2e_suite.go +++ b/test/e2e/fixtures/e2e_suite.go @@ -56,6 +56,7 @@ type E2ESuite struct { wfClient v1alpha1.WorkflowInterface wfebClient v1alpha1.WorkflowEventBindingInterface wfTemplateClient v1alpha1.WorkflowTemplateInterface + wftsClient v1alpha1.WorkflowTaskSetInterface cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface cronClient v1alpha1.CronWorkflowInterface KubeClient kubernetes.Interface @@ -79,6 +80,7 @@ func (s *E2ESuite) SetupSuite() { s.wfClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().Workflows(Namespace) s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace) s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace) + s.wftsClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTaskSets(Namespace) s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace) s.Persistence = newPersistence(s.KubeClient, s.Config) s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo) @@ -232,6 +234,7 @@ func (s *E2ESuite) Given() *Given { client: s.wfClient, wfebClient: s.wfebClient, wfTemplateClient: s.wfTemplateClient, + wftsClient: s.wftsClient, cwfTemplateClient: s.cwfTemplateClient, cronClient: s.cronClient, hydrator: s.hydrator, diff --git a/test/e2e/fixtures/given.go b/test/e2e/fixtures/given.go index 863df310a80c..fb6de9cf1b9a 100644 --- a/test/e2e/fixtures/given.go +++ b/test/e2e/fixtures/given.go @@ -23,6 +23,7 @@ type Given struct { client v1alpha1.WorkflowInterface wfebClient v1alpha1.WorkflowEventBindingInterface wfTemplateClient v1alpha1.WorkflowTemplateInterface + wftsClient v1alpha1.WorkflowTaskSetInterface cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface cronClient v1alpha1.CronWorkflowInterface hydrator hydrator.Interface @@ -223,6 +224,7 @@ func (g *Given) When() *When { client: g.client, wfebClient: g.wfebClient, wfTemplateClient: g.wfTemplateClient, + wftsClient: g.wftsClient, cwfTemplateClient: g.cwfTemplateClient, cronClient: g.cronClient, hydrator: g.hydrator, diff --git a/test/e2e/fixtures/then.go b/test/e2e/fixtures/then.go index 8e5e36565c69..ed14b1d80749 100644 --- a/test/e2e/fixtures/then.go +++ b/test/e2e/fixtures/then.go @@ -28,6 +28,7 @@ type Then struct { wf *wfv1.Workflow cronWf *wfv1.CronWorkflow client v1alpha1.WorkflowInterface + wftsClient v1alpha1.WorkflowTaskSetInterface cronClient v1alpha1.CronWorkflowInterface hydrator hydrator.Interface kubeClient kubernetes.Interface @@ -263,6 +264,17 @@ func (t *Then) ExpectPods(f func(t *testing.T, pods []apiv1.Pod)) *Then { return t } +func (t *Then) ExpectWorkflowTaskSet(block func(t *testing.T, wfts *wfv1.WorkflowTaskSet)) *Then { + t.t.Helper() + ctx := context.Background() + wfts, err := t.wftsClient.Get(ctx, t.wf.Name, metav1.GetOptions{}) + if err != nil { + t.t.Fatal(err) + } + block(t.t, wfts) + return t +} + func (t *Then) RunCli(args []string, block func(t *testing.T, output string, err error)) *Then { t.t.Helper() output, err := Exec("../../dist/argo", append([]string{"-n", Namespace}, args...)...) @@ -274,6 +286,7 @@ func (t *Then) When() *When { return &When{ t: t.t, client: t.client, + wftsClient: t.wftsClient, cronClient: t.cronClient, hydrator: t.hydrator, wf: t.wf, diff --git a/test/e2e/fixtures/when.go b/test/e2e/fixtures/when.go index 241f7b7c4279..c65b3668e6e7 100644 --- a/test/e2e/fixtures/when.go +++ b/test/e2e/fixtures/when.go @@ -33,6 +33,7 @@ type When struct { client v1alpha1.WorkflowInterface wfebClient v1alpha1.WorkflowEventBindingInterface wfTemplateClient v1alpha1.WorkflowTemplateInterface + wftsClient v1alpha1.WorkflowTaskSetInterface cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface cronClient v1alpha1.CronWorkflowInterface hydrator hydrator.Interface @@ -621,6 +622,7 @@ func (w *When) Then() *Then { wf: w.wf, cronWf: w.cronWf, client: w.client, + wftsClient: w.wftsClient, cronClient: w.cronClient, hydrator: w.hydrator, kubeClient: w.kubeClient, @@ -634,6 +636,7 @@ func (w *When) Given() *Given { client: w.client, wfebClient: w.wfebClient, wfTemplateClient: w.wfTemplateClient, + wftsClient: w.wftsClient, cwfTemplateClient: w.cwfTemplateClient, cronClient: w.cronClient, hydrator: w.hydrator, diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1c61d64e21fd..3aba1ed93e17 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -883,12 +883,6 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { startTime := time.Now() woc.operate(ctx) wfc.metrics.OperationCompleted(time.Since(startTime).Seconds()) - if woc.wf.Status.Fulfilled() { - err := woc.completeTaskSet(ctx) - if err != nil { - log.WithError(err).Warn("error to complete the taskset") - } - } // TODO: operate should return error if it was unable to operate properly // so we can requeue the work for a later time diff --git a/workflow/controller/taskset.go b/workflow/controller/taskset.go index a239ceca1c6c..1aaa14752180 100644 --- a/workflow/controller/taskset.go +++ b/workflow/controller/taskset.go @@ -19,19 +19,19 @@ import ( controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" ) -func (woc *wfOperationCtx) patchTaskSet(ctx context.Context, patch interface{}, pathTypeType types.PatchType) error { +func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interface{}, subresources ...string) error { patchByte, err := json.Marshal(patch) if err != nil { return errors.InternalWrapError(err) } - _, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Patch(ctx, woc.wf.Name, pathTypeType, patchByte, metav1.PatchOptions{}) + _, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Patch(ctx, woc.wf.Name, types.MergePatchType, patchByte, metav1.PatchOptions{}, subresources...) if err != nil { return fmt.Errorf("failed patching taskset: %v", err) } return nil } -func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() map[string]interface{} { +func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) { deletedNode := make(map[string]interface{}) for _, node := range woc.wf.Status.Nodes { if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() { @@ -40,15 +40,17 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() map[string]interface{} { } // Delete the completed Tasks and nodes status - patch := map[string]interface{}{ + tasksPatch = map[string]interface{}{ "spec": map[string]interface{}{ "tasks": deletedNode, }, + } + nodesPatch = map[string]interface{}{ "status": map[string]interface{}{ "nodes": deletedNode, }, } - return patch + return } func taskSetNode(n wfv1.NodeStatus) bool { return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin @@ -62,20 +64,18 @@ func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) err if !woc.hasTaskSetNodes() { return nil } - return woc.patchTaskSet(ctx, woc.getDeleteTaskAndNodePatch(), types.MergePatchType) -} - -func (woc *wfOperationCtx) completeTaskSet(ctx context.Context) error { - if !woc.hasTaskSetNodes() { - return nil + tasksPatch, nodesPatch := woc.getDeleteTaskAndNodePatch() + if woc.wf.Status.Fulfilled() { + tasksPatch["metadata"] = metav1.ObjectMeta{ + Labels: map[string]string{ + common.LabelKeyCompleted: "true", + }, + } } - patch := woc.getDeleteTaskAndNodePatch() - patch["metadata"] = metav1.ObjectMeta{ - Labels: map[string]string{ - common.LabelKeyCompleted: "true", - }, + if err := woc.mergePatchTaskSet(ctx, nodesPatch, "status"); err != nil { + return err } - return woc.patchTaskSet(ctx, patch, types.MergePatchType) + return woc.mergePatchTaskSet(ctx, tasksPatch) } func (woc *wfOperationCtx) getWorkflowTaskSet() (*wfv1.WorkflowTaskSet, error) { @@ -202,7 +202,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error { "spec": wfv1.WorkflowTaskSetSpec{Tasks: woc.taskSet}, } // patch the new templates into taskset - err = woc.patchTaskSet(ctx, spec, types.MergePatchType) + err = woc.mergePatchTaskSet(ctx, spec) if err != nil { woc.log.WithError(err).Error("Failed to patch WorkflowTaskSet") return fmt.Errorf("failed to patch TaskSet. %v", err) diff --git a/workflow/controller/taskset_test.go b/workflow/controller/taskset_test.go index 0090377284bc..5d8255179580 100644 --- a/workflow/controller/taskset_test.go +++ b/workflow/controller/taskset_test.go @@ -323,11 +323,6 @@ func TestNonHTTPTemplateScenario(t *testing.T) { err := woc.reconcileTaskSet(ctx) assert.NoError(t, err) }) - t.Run("completeTaskSet", func(t *testing.T) { - woc.operate(ctx) - err := woc.completeTaskSet(ctx) - assert.NoError(t, err) - }) t.Run("removeCompletedTaskSetStatus", func(t *testing.T) { woc.operate(ctx) err := woc.removeCompletedTaskSetStatus(ctx)