diff --git a/flytepropeller/pkg/compiler/validators/node.go b/flytepropeller/pkg/compiler/validators/node.go index e9a81f0a01..b4de4be04d 100644 --- a/flytepropeller/pkg/compiler/validators/node.go +++ b/flytepropeller/pkg/compiler/validators/node.go @@ -3,6 +3,7 @@ package validators import ( "fmt" + "sort" flyte "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" c "github.com/flyteorg/flytepropeller/pkg/compiler/common" @@ -120,6 +121,13 @@ func ValidateNode(w c.WorkflowBuilder, n c.NodeBuilder, validateConditionTypes b validateEffectiveOutputParameters(n, errs.NewScope()) } + if n.GetCoreNode().UpstreamNodeIds == nil { + n.GetCoreNode().UpstreamNodeIds = make([]string, 0) + } + + // Order upstream node ids to ensure consistent output of the compiler even if client ordering changes. + sort.Strings(n.GetCoreNode().UpstreamNodeIds) + // Validate branch node conditions and inner nodes. if n.GetBranchNode() != nil { if nodes, ok := ValidateBranchNode(w, n, validateConditionTypes, errs.NewScope()); ok { diff --git a/flytepropeller/pkg/compiler/validators/node_test.go b/flytepropeller/pkg/compiler/validators/node_test.go index f7e6730a1d..3fa1309eec 100644 --- a/flytepropeller/pkg/compiler/validators/node_test.go +++ b/flytepropeller/pkg/compiler/validators/node_test.go @@ -37,13 +37,44 @@ func TestValidateBranchNode(t *testing.T) { } func TestValidateNode(t *testing.T) { - n := &mocks.NodeBuilder{} - n.OnGetId().Return(common.StartNodeID) - - wf := &mocks.WorkflowBuilder{} - errs := errors.NewCompileErrors() - ValidateNode(wf, n, true, errs) - if errs.HasErrors() { - assert.NoError(t, errs) - } + t.Run("Start-node", func(t *testing.T) { + n := &mocks.NodeBuilder{} + n.OnGetId().Return(common.StartNodeID) + + wf := &mocks.WorkflowBuilder{} + errs := errors.NewCompileErrors() + ValidateNode(wf, n, true, errs) + if !assert.False(t, errs.HasErrors()) { + assert.NoError(t, errs) + } + }) + + t.Run("Sort upstream node ids", func(t *testing.T) { + n := &mocks.NodeBuilder{} + n.OnGetId().Return("my-node") + n.OnGetInterface().Return(&core.TypedInterface{ + Outputs: &core.VariableMap{}, + Inputs: &core.VariableMap{}, + }) + n.OnGetOutputAliases().Return(nil) + n.OnGetBranchNode().Return(nil) + n.OnGetWorkflowNode().Return(nil) + n.OnGetTaskNode().Return(nil) + + coreN := &core.Node{} + coreN.UpstreamNodeIds = []string{"n1", "n0"} + n.OnGetCoreNode().Return(coreN) + n.On("GetUpstreamNodeIds").Return(func() []string { + return coreN.UpstreamNodeIds + }) + + wf := &mocks.WorkflowBuilder{} + errs := errors.NewCompileErrors() + ValidateNode(wf, n, true, errs) + if !assert.False(t, errs.HasErrors()) { + assert.NoError(t, errs) + } + + assert.Equal(t, []string{"n0", "n1"}, n.GetUpstreamNodeIds()) + }) }