Skip to content

Commit

Permalink
fix: Patch taskset with subresources to delete completed node status.… (
Browse files Browse the repository at this point in the history
argoproj#12620)

Signed-off-by: oninowang <[email protected]>
Co-authored-by: jswxstw <[email protected]>
Co-authored-by: Isitha Subasinghe <[email protected]>
  • Loading branch information
3 people committed May 7, 2024
1 parent 6194b8a commit 389492b
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 29 deletions.
7 changes: 7 additions & 0 deletions test/e2e/executor_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type ExecutorPluginsSuite struct {
Expand Down Expand Up @@ -76,6 +77,12 @@ func (s *ExecutorPluginsSuite) TestTemplateExecutor() {
}
}
}
}).
ExpectWorkflowTaskSet(func(t *testing.T, wfts *wfv1.WorkflowTaskSet) {
assert.NotNil(t, wfts)
assert.Len(t, wfts.Spec.Tasks, 0)
assert.Len(t, wfts.Status.Nodes, 0)
assert.Equal(t, "true", wfts.Labels[common.LabelKeyCompleted])
})
}

Expand Down
3 changes: 3 additions & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type E2ESuite struct {
wfClient v1alpha1.WorkflowInterface
wfebClient v1alpha1.WorkflowEventBindingInterface
wfTemplateClient v1alpha1.WorkflowTemplateInterface
wftsClient v1alpha1.WorkflowTaskSetInterface
cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface
cronClient v1alpha1.CronWorkflowInterface
KubeClient kubernetes.Interface
Expand All @@ -71,6 +72,7 @@ func (s *E2ESuite) SetupSuite() {
s.wfClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().Workflows(Namespace)
s.wfebClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowEventBindings(Namespace)
s.wfTemplateClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTemplates(Namespace)
s.wftsClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().WorkflowTaskSets(Namespace)
s.cronClient = versioned.NewForConfigOrDie(s.RestConfig).ArgoprojV1alpha1().CronWorkflows(Namespace)
s.Persistence = newPersistence(s.KubeClient, s.Config)
s.hydrator = hydrator.New(s.Persistence.offloadNodeStatusRepo)
Expand Down Expand Up @@ -207,6 +209,7 @@ func (s *E2ESuite) Given() *Given {
client: s.wfClient,
wfebClient: s.wfebClient,
wfTemplateClient: s.wfTemplateClient,
wftsClient: s.wftsClient,
cwfTemplateClient: s.cwfTemplateClient,
cronClient: s.cronClient,
hydrator: s.hydrator,
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Given struct {
client v1alpha1.WorkflowInterface
wfebClient v1alpha1.WorkflowEventBindingInterface
wfTemplateClient v1alpha1.WorkflowTemplateInterface
wftsClient v1alpha1.WorkflowTaskSetInterface
cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface
cronClient v1alpha1.CronWorkflowInterface
hydrator hydrator.Interface
Expand Down Expand Up @@ -223,6 +224,7 @@ func (g *Given) When() *When {
client: g.client,
wfebClient: g.wfebClient,
wfTemplateClient: g.wfTemplateClient,
wftsClient: g.wftsClient,
cwfTemplateClient: g.cwfTemplateClient,
cronClient: g.cronClient,
hydrator: g.hydrator,
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Then struct {
wf *wfv1.Workflow
cronWf *wfv1.CronWorkflow
client v1alpha1.WorkflowInterface
wftsClient v1alpha1.WorkflowTaskSetInterface
cronClient v1alpha1.CronWorkflowInterface
hydrator hydrator.Interface
kubeClient kubernetes.Interface
Expand Down Expand Up @@ -262,6 +263,17 @@ func (t *Then) ExpectPods(f func(t *testing.T, pods []apiv1.Pod)) *Then {
return t
}

func (t *Then) ExpectWorkflowTaskSet(block func(t *testing.T, wfts *wfv1.WorkflowTaskSet)) *Then {
t.t.Helper()
ctx := context.Background()
wfts, err := t.wftsClient.Get(ctx, t.wf.Name, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
}
block(t.t, wfts)
return t
}

func (t *Then) RunCli(args []string, block func(t *testing.T, output string, err error)) *Then {
t.t.Helper()
output, err := Exec("../../dist/argo", append([]string{"-n", Namespace}, args...)...)
Expand All @@ -273,6 +285,7 @@ func (t *Then) When() *When {
return &When{
t: t.t,
client: t.client,
wftsClient: t.wftsClient,
cronClient: t.cronClient,
hydrator: t.hydrator,
wf: t.wf,
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type When struct {
client v1alpha1.WorkflowInterface
wfebClient v1alpha1.WorkflowEventBindingInterface
wfTemplateClient v1alpha1.WorkflowTemplateInterface
wftsClient v1alpha1.WorkflowTaskSetInterface
cwfTemplateClient v1alpha1.ClusterWorkflowTemplateInterface
cronClient v1alpha1.CronWorkflowInterface
hydrator hydrator.Interface
Expand Down Expand Up @@ -620,6 +621,7 @@ func (w *When) Then() *Then {
wf: w.wf,
cronWf: w.cronWf,
client: w.client,
wftsClient: w.wftsClient,
cronClient: w.cronClient,
hydrator: w.hydrator,
kubeClient: w.kubeClient,
Expand All @@ -633,6 +635,7 @@ func (w *When) Given() *Given {
client: w.client,
wfebClient: w.wfebClient,
wfTemplateClient: w.wfTemplateClient,
wftsClient: w.wftsClient,
cwfTemplateClient: w.cwfTemplateClient,
cronClient: w.cronClient,
hydrator: w.hydrator,
Expand Down
6 changes: 0 additions & 6 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,6 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
startTime := time.Now()
woc.operate(ctx)
wfc.metrics.OperationCompleted(time.Since(startTime).Seconds())
if woc.wf.Status.Fulfilled() {
err := woc.completeTaskSet(ctx)
if err != nil {
log.WithError(err).Warn("error to complete the taskset")
}
}

// TODO: operate should return error if it was unable to operate properly
// so we can requeue the work for a later time
Expand Down
36 changes: 18 additions & 18 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ import (
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
)

func (woc *wfOperationCtx) patchTaskSet(ctx context.Context, patch interface{}, pathTypeType types.PatchType) error {
func (woc *wfOperationCtx) mergePatchTaskSet(ctx context.Context, patch interface{}, subresources ...string) error {
patchByte, err := json.Marshal(patch)
if err != nil {
return errors.InternalWrapError(err)
}
_, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Patch(ctx, woc.wf.Name, pathTypeType, patchByte, metav1.PatchOptions{})
_, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets(woc.wf.Namespace).Patch(ctx, woc.wf.Name, types.MergePatchType, patchByte, metav1.PatchOptions{}, subresources...)
if err != nil {
return fmt.Errorf("failed patching taskset: %v", err)
}
return nil
}

func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() map[string]interface{} {
func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() (tasksPatch map[string]interface{}, nodesPatch map[string]interface{}) {
deletedNode := make(map[string]interface{})
for _, node := range woc.wf.Status.Nodes {
if (node.Type == wfv1.NodeTypeHTTP || node.Type == wfv1.NodeTypePlugin) && node.Fulfilled() {
Expand All @@ -40,15 +40,17 @@ func (woc *wfOperationCtx) getDeleteTaskAndNodePatch() map[string]interface{} {
}

// Delete the completed Tasks and nodes status
patch := map[string]interface{}{
tasksPatch = map[string]interface{}{
"spec": map[string]interface{}{
"tasks": deletedNode,
},
}
nodesPatch = map[string]interface{}{
"status": map[string]interface{}{
"nodes": deletedNode,
},
}
return patch
return
}
func taskSetNode(n wfv1.NodeStatus) bool {
return n.Type == wfv1.NodeTypeHTTP || n.Type == wfv1.NodeTypePlugin
Expand All @@ -62,20 +64,18 @@ func (woc *wfOperationCtx) removeCompletedTaskSetStatus(ctx context.Context) err
if !woc.hasTaskSetNodes() {
return nil
}
return woc.patchTaskSet(ctx, woc.getDeleteTaskAndNodePatch(), types.MergePatchType)
}

func (woc *wfOperationCtx) completeTaskSet(ctx context.Context) error {
if !woc.hasTaskSetNodes() {
return nil
tasksPatch, nodesPatch := woc.getDeleteTaskAndNodePatch()
if woc.wf.Status.Fulfilled() {
tasksPatch["metadata"] = metav1.ObjectMeta{
Labels: map[string]string{
common.LabelKeyCompleted: "true",
},
}
}
patch := woc.getDeleteTaskAndNodePatch()
patch["metadata"] = metav1.ObjectMeta{
Labels: map[string]string{
common.LabelKeyCompleted: "true",
},
if err := woc.mergePatchTaskSet(ctx, nodesPatch, "status"); err != nil {
return err
}
return woc.patchTaskSet(ctx, patch, types.MergePatchType)
return woc.mergePatchTaskSet(ctx, tasksPatch)
}

func (woc *wfOperationCtx) getWorkflowTaskSet() (*wfv1.WorkflowTaskSet, error) {
Expand Down Expand Up @@ -202,7 +202,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
"spec": wfv1.WorkflowTaskSetSpec{Tasks: woc.taskSet},
}
// patch the new templates into taskset
err = woc.patchTaskSet(ctx, spec, types.MergePatchType)
err = woc.mergePatchTaskSet(ctx, spec)
if err != nil {
woc.log.WithError(err).Error("Failed to patch WorkflowTaskSet")
return fmt.Errorf("failed to patch TaskSet. %v", err)
Expand Down
5 changes: 0 additions & 5 deletions workflow/controller/taskset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,6 @@ func TestNonHTTPTemplateScenario(t *testing.T) {
err := woc.reconcileTaskSet(ctx)
assert.NoError(t, err)
})
t.Run("completeTaskSet", func(t *testing.T) {
woc.operate(ctx)
err := woc.completeTaskSet(ctx)
assert.NoError(t, err)
})
t.Run("removeCompletedTaskSetStatus", func(t *testing.T) {
woc.operate(ctx)
err := woc.removeCompletedTaskSetStatus(ctx)
Expand Down

0 comments on commit 389492b

Please sign in to comment.