Skip to content

Commit

Permalink
Order UpstreamNodeIds to ensure consistent compiler output (flyteorg#323
Browse files Browse the repository at this point in the history
)

* Order UpstreamNodeIds to ensure consistent compiler output

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Unit test & lint

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Sep 13, 2021
1 parent 248f4cd commit 7370e4d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
8 changes: 8 additions & 0 deletions flytepropeller/pkg/compiler/validators/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package validators

import (
"fmt"
"sort"

flyte "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
c "github.com/flyteorg/flytepropeller/pkg/compiler/common"
Expand Down Expand Up @@ -120,6 +121,13 @@ func ValidateNode(w c.WorkflowBuilder, n c.NodeBuilder, validateConditionTypes b
validateEffectiveOutputParameters(n, errs.NewScope())
}

if n.GetCoreNode().UpstreamNodeIds == nil {
n.GetCoreNode().UpstreamNodeIds = make([]string, 0)
}

// Order upstream node ids to ensure consistent output of the compiler even if client ordering changes.
sort.Strings(n.GetCoreNode().UpstreamNodeIds)

// Validate branch node conditions and inner nodes.
if n.GetBranchNode() != nil {
if nodes, ok := ValidateBranchNode(w, n, validateConditionTypes, errs.NewScope()); ok {
Expand Down
49 changes: 40 additions & 9 deletions flytepropeller/pkg/compiler/validators/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,44 @@ func TestValidateBranchNode(t *testing.T) {
}

func TestValidateNode(t *testing.T) {
n := &mocks.NodeBuilder{}
n.OnGetId().Return(common.StartNodeID)

wf := &mocks.WorkflowBuilder{}
errs := errors.NewCompileErrors()
ValidateNode(wf, n, true, errs)
if errs.HasErrors() {
assert.NoError(t, errs)
}
t.Run("Start-node", func(t *testing.T) {
n := &mocks.NodeBuilder{}
n.OnGetId().Return(common.StartNodeID)

wf := &mocks.WorkflowBuilder{}
errs := errors.NewCompileErrors()
ValidateNode(wf, n, true, errs)
if !assert.False(t, errs.HasErrors()) {
assert.NoError(t, errs)
}
})

t.Run("Sort upstream node ids", func(t *testing.T) {
n := &mocks.NodeBuilder{}
n.OnGetId().Return("my-node")
n.OnGetInterface().Return(&core.TypedInterface{
Outputs: &core.VariableMap{},
Inputs: &core.VariableMap{},
})
n.OnGetOutputAliases().Return(nil)
n.OnGetBranchNode().Return(nil)
n.OnGetWorkflowNode().Return(nil)
n.OnGetTaskNode().Return(nil)

coreN := &core.Node{}
coreN.UpstreamNodeIds = []string{"n1", "n0"}
n.OnGetCoreNode().Return(coreN)
n.On("GetUpstreamNodeIds").Return(func() []string {
return coreN.UpstreamNodeIds
})

wf := &mocks.WorkflowBuilder{}
errs := errors.NewCompileErrors()
ValidateNode(wf, n, true, errs)
if !assert.False(t, errs.HasErrors()) {
assert.NoError(t, errs)
}

assert.Equal(t, []string{"n0", "n1"}, n.GetUpstreamNodeIds())
})
}

0 comments on commit 7370e4d

Please sign in to comment.