Skip to content

Commit

Permalink
fix: Set default value to output parameters if suspend node timeout. F…
Browse files Browse the repository at this point in the history
…ixes #12230 (#12960)

Signed-off-by: oninowang <[email protected]>
(cherry picked from commit 3df05eb)
  • Loading branch information
jswxstw authored and Joibel committed Nov 22, 2024
1 parent 1a3a5c2 commit 8a94f2e
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 11 deletions.
128 changes: 128 additions & 0 deletions test/e2e/suspend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//go:build functional
// +build functional

package e2e

import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type TestSuspendSitue struct {
fixtures.E2ESuite
}

func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithoutDefaultValue() {
s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: suspend-node-timeout-without-default-value
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: approve
template: approve
- - name: release
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.approve.outputs.parameters.message}}"
- name: approve
suspend:
duration: 5s
outputs:
parameters:
- name: message
valueFrom:
supplied: {}
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Contains(t, "raw output parameter 'message' has not been set and does not have a default value", status.Message)
})
}

func (s *TestSuspendSitue) TestSuspendNodeTimeoutWithDefaultValue() {
s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: suspend-node-timeout-with-default-value
spec:
entrypoint: suspend
templates:
- name: suspend
steps:
- - name: approve
template: approve
- - name: release
template: whalesay
arguments:
parameters:
- name: message
value: "{{steps.approve.outputs.parameters.message}}"
- name: approve
suspend:
duration: 5s
outputs:
parameters:
- name: message
valueFrom:
default: default message
supplied: {}
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
assert.Equal(t, status.Progress, wfv1.Progress("2/2"))
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return status.Name == "suspend-node-timeout-with-default-value[0].approve"
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, 1, len(status.Outputs.Parameters))
assert.Equal(t, "message", status.Outputs.Parameters[0].Name)
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Outputs.Parameters[0].Value)
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return status.Name == "suspend-node-timeout-with-default-value[1].release"
}, func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
assert.Equal(t, 1, len(status.Inputs.Parameters))
assert.Equal(t, "message", status.Inputs.Parameters[0].Name)
assert.Equal(t, wfv1.AnyStringPtr("default message"), status.Inputs.Parameters[0].Value)
})
}
3 changes: 3 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3455,6 +3455,9 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string,
if time.Now().UTC().After(suspendDeadline) {
// Suspension is expired, node can be resumed
woc.log.Infof("auto resuming node %s", nodeName)
if err := wfutil.OverrideOutputParametersWithDefault(node.Outputs); err != nil {
return node, err
}
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return node, nil
}
Expand Down
30 changes: 19 additions & 11 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,23 @@ func SuspendWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, workf
return err
}

func OverrideOutputParametersWithDefault(outputs *wfv1.Outputs) error {
if outputs == nil {
return nil
}
for i, param := range outputs.Parameters {
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
if param.ValueFrom.Default != nil {
outputs.Parameters[i].Value = param.ValueFrom.Default
outputs.Parameters[i].ValueFrom = nil
} else {
return fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
}
}
}
return nil
}

// ResumeWorkflow resumes a workflow by setting spec.suspend to nil and any suspended nodes to Successful.
// Retries conflict errors
func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrator hydrator.Interface, workflowName string, nodeFieldSelector string) error {
Expand Down Expand Up @@ -408,17 +425,8 @@ func ResumeWorkflow(ctx context.Context, wfIf v1alpha1.WorkflowInterface, hydrat
// To resume a workflow with a suspended node we simply mark the node as Successful
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
if node.Outputs != nil {
for i, param := range node.Outputs.Parameters {
if param.ValueFrom != nil && param.ValueFrom.Supplied != nil {
if param.ValueFrom.Default != nil {
node.Outputs.Parameters[i].Value = param.ValueFrom.Default
node.Outputs.Parameters[i].ValueFrom = nil
} else {
return false, fmt.Errorf("raw output parameter '%s' has not been set and does not have a default value", param.Name)
}
}
}
if err := OverrideOutputParametersWithDefault(node.Outputs); err != nil {
return false, err
}
node.Phase = wfv1.NodeSucceeded
if node.Message != "" {
Expand Down

0 comments on commit 8a94f2e

Please sign in to comment.