Skip to content

Commit

Permalink
Construct subnode DataDir to be under parent's node OutputDir to keep…
Browse files Browse the repository at this point in the history
… behavior consistent across (flyteorg#292)
  • Loading branch information
EngHabu authored Jul 14, 2021
1 parent 74a042d commit 051eba7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
n.SetParentTaskID(in.GetParentTaskID())
n.DataReferenceConstructor = in.DataReferenceConstructor
if len(n.GetDataDir()) == 0 {
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id)
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetOutputDir(), id)
if err != nil {
logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id)
return n
Expand Down Expand Up @@ -552,9 +552,10 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
newNodeStatus := &NodeStatus{
MutableStruct: MutableStruct{},
}

newNodeStatus.SetParentTaskID(in.GetParentTaskID())
newNodeStatus.SetParentNodeID(in.GetParentNodeID())
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id)
dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetOutputDir(), id)
if err != nil {
logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id)
return n
Expand Down
62 changes: 62 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package v1alpha1

import (
"context"
"encoding/json"
"testing"

"github.com/flyteorg/flytestdlib/storage"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -190,3 +193,62 @@ func TestDynamicNodeStatus_SetExecutionError(t *testing.T) {
})
}
}

func TestNodeStatus_GetNodeExecutionStatus(t *testing.T) {
ctx := context.Background()
t.Run("First Level", func(t *testing.T) {
t.Run("Not cached", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})

t.Run("cached", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())

newNode = n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})

t.Run("cached but datadir not populated", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{
"abc": {},
},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())
})
})

t.Run("Nested", func(t *testing.T) {
n := NodeStatus{
SubNodeStatus: map[NodeID]*NodeStatus{},
DataReferenceConstructor: storage.URLPathConstructor{},
}

newNode := n.GetNodeExecutionStatus(ctx, "abc")
assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir())

subsubNode := newNode.GetNodeExecutionStatus(ctx, "xyz")
assert.Equal(t, storage.DataReference("/abc/0/xyz/0"), subsubNode.GetOutputDir())
assert.Equal(t, storage.DataReference("/abc/0/xyz"), subsubNode.GetDataDir())
})
}

0 comments on commit 051eba7

Please sign in to comment.