Skip to content

Commit

Permalink
Adding output-dir for node output for each retry (flyteorg#48)
Browse files Browse the repository at this point in the history
* Fixing data dir for retries

* mock

* fixing tests

* reworked retry attempt. Now we have 2 different paths- one for input and the other for output. New output dir is created for every attempt while input dir remains the same for the complete node execution

* bug fix

* .

* .

* .

* fix

* logging

* missed file

* fixing tests and removing bogus logging

* yet another fix

* fixing the comment

* .

* .

* fix endnode test
  • Loading branch information
surindersinghp authored Jan 9, 2020
1 parent c627551 commit 7b32cbb
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 24 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type MutableNodeStatus interface {
Mutable
// Mutation API's
SetDataDir(DataReference)
SetOutputDir(d DataReference)
SetParentNodeID(n *NodeID)
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string)
Expand Down Expand Up @@ -250,6 +251,7 @@ type ExecutableNodeStatus interface {
GetParentNodeID() *NodeID
GetParentTaskID() *core.TaskExecutionIdentifier
GetDataDir() DataReference
GetOutputDir() DataReference
GetMessage() string
GetAttempts() uint32
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
Expand Down
37 changes: 37 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 30 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"reflect"
"strconv"

"github.com/lyft/flytestdlib/storage"

Expand Down Expand Up @@ -155,6 +156,7 @@ type NodeStatus struct {
LastAttemptStartedAt *metav1.Time `json:"laStartedAt,omitempty"`
Message string `json:"message,omitempty"`
DataDir DataReference `json:"-"`
OutputDir DataReference `json:"-"`
Attempts uint32 `json:"attempts"`
Cached bool `json:"cached"`

Expand Down Expand Up @@ -473,6 +475,16 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
n.SetDataDir(dataDir)
}

if len(n.GetOutputDir()) == 0 {
outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(in.Attempts), 10))
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
return n
}

n.SetOutputDir(outputDir)
}

return n
}

Expand All @@ -491,7 +503,14 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
return n
}

outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, dataDir, "0")
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
return n
}

newNodeStatus.SetDataDir(dataDir)
newNodeStatus.SetOutputDir(outputDir)
newNodeStatus.DataReferenceConstructor = in.DataReferenceConstructor

in.SubNodeStatus[id] = newNodeStatus
Expand All @@ -509,9 +528,14 @@ func (in *NodeStatus) GetDataDir() DataReference {

func (in *NodeStatus) SetDataDir(d DataReference) {
in.DataDir = d
}

func (in *NodeStatus) GetOutputDir() DataReference {
return in.OutputDir
}

// We do not need to set Dirty here because this field is not persisted.
//in.SetDirty()
func (in *NodeStatus) SetOutputDir(d DataReference) {
in.OutputDir = d
}

func (in *NodeStatus) Equals(other *NodeStatus) bool {
Expand Down Expand Up @@ -546,6 +570,10 @@ func (in *NodeStatus) Equals(other *NodeStatus) bool {
return false
}

if in.OutputDir != other.OutputDir {
return false
}

if in.ParentNode != nil && other.ParentNode != nil {
if *in.ParentNode != *other.ParentNode {
return false
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"context"
"strconv"

"github.com/lyft/flytestdlib/logger"

Expand Down Expand Up @@ -103,6 +104,13 @@ func (in *WorkflowStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID)
n.SetDataDir(dataDir)
}

outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(n.Attempts), 10))
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
return n
}
n.SetOutputDir(outputDir)

return n
}

Expand All @@ -120,7 +128,14 @@ func (in *WorkflowStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID)
return n
}

outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, dataDir, "0")
if err != nil {
logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id)
return n
}

newNodeStatus.SetDataDir(dataDir)
newNodeStatus.SetOutputDir(outputDir)
newNodeStatus.DataReferenceConstructor = in.DataReferenceConstructor

in.NodeStatus[id] = newNodeStatus
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
// For branch node we set the output node to be the same as the child nodes output
childNodeStatus := w.GetNodeExecutionStatus(ctx, branchTakenNode.GetID())
nodeStatus.SetDataDir(childNodeStatus.GetDataDir())
nodeStatus.SetOutputDir(childNodeStatus.GetOutputDir())
phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{
OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetDataDir())},
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodes/branch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
branch := New(m, promutils.NewTestScope()).(*branchHandler)
childNodeID := "child"
childDatadir := v1alpha1.DataReference("test")

dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{
Status: v1alpha1.WorkflowStatus{
NodeStatus: map[v1alpha1.NodeID]*v1alpha1.NodeStatus{
Expand All @@ -136,6 +139,7 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
},
},
},
DataReferenceConstructor: dataStore,
}

res := &v12.ResourceRequirements{}
Expand Down
24 changes: 16 additions & 8 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevSt
}

if trns.Info().GetPhase() == handler.EPhaseSuccess {
f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetDataDir(), nCtx.DataStore())
f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore())
if err != nil {
return handler.UnknownTransition, prevState, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n

if trns.Info().GetPhase() == handler.EPhaseSuccess {
logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID())
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetDataDir())
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
execID := task.GetTaskExecutionIdentifier(nCtx)
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
Expand Down Expand Up @@ -233,12 +233,19 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Con

// Instantiate a nodeStatus using the modified name but set its data directory using the original name.
subNodeStatus := parentNodeStatus.GetNodeExecutionStatus(ctx, newID)
originalNodePath, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetDataDir(), n.Id)

// NOTE: This is the second step of 2-step-dynamic-node execution. Input dir for this step is generated by
// parent task as a sub-directory(n.Id) in the parent node's output dir.
originalNodePath, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), n.Id)
if err != nil {
return nil, err
}
outputDir, err := nCtx.DataStore().ConstructReference(ctx, originalNodePath, "0")
if err != nil {
return nil, err
}

subNodeStatus.SetDataDir(originalNodePath)
subNodeStatus.SetOutputDir(outputDir)
n.Id = newID
}

Expand Down Expand Up @@ -289,7 +296,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
t := d.metrics.buildDynamicWorkflow.Start(ctx)
defer t.Stop()

f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetDataDir(), nCtx.DataStore())
f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore())
if err != nil {
return nil, false, err
}
Expand All @@ -298,6 +305,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
execID := task.GetTaskExecutionIdentifier(nCtx)
nStatus := nCtx.NodeStatus().GetNodeExecutionStatus(ctx, dynamicNodeID)
nStatus.SetDataDir(nCtx.NodeStatus().GetDataDir())
nStatus.SetOutputDir(nCtx.NodeStatus().GetOutputDir())
nStatus.SetParentTaskID(execID)

// cacheHitStopWatch := d.metrics.CacheHit.Start(ctx)
Expand Down Expand Up @@ -392,18 +400,18 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,
nil
}

sourcePath := v1alpha1.GetOutputsFile(endNodeStatus.GetDataDir())
sourcePath := v1alpha1.GetOutputsFile(endNodeStatus.GetOutputDir())
if metadata, err := nCtx.DataStore().Head(ctx, sourcePath); err == nil {
if !metadata.Exists() {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("DynamicWorkflowOutputsNotFound", " is expected to produce outputs but no outputs file was written to %v.", nil)),
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure("DynamicWorkflowOutputsNotFound", fmt.Sprintf(" is expected to produce outputs but no outputs file was written to %v.", sourcePath), nil)),
handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: "DynamicWorkflow is expected to produce outputs but no outputs file was written"},
nil
}
} else {
return handler.UnknownTransition, prevState, err
}

destinationPath := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetDataDir())
destinationPath := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir())
if err := nCtx.DataStore().CopyRaw(ctx, sourcePath, destinationPath, storage.Options{}); err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailure(errors.OutputsNotFoundError,
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {

ns := &flyteMocks.ExecutableNodeStatus{}
ns.On("GetDataDir").Return(storage.DataReference("data-dir"))
ns.On("GetOutputDir").Return(storage.DataReference("data-dir"))

res := &v12.ResourceRequirements{}
n := &flyteMocks.ExecutableNode{}
Expand Down Expand Up @@ -332,15 +333,18 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {

endNodeStatus := &flyteMocks.ExecutableNodeStatus{}
endNodeStatus.On("GetDataDir").Return(storage.DataReference("end-node"))
endNodeStatus.On("GetOutputDir").Return(storage.DataReference("end-node"))

subNs := &flyteMocks.ExecutableNodeStatus{}
subNs.On("SetDataDir", mock.Anything).Return()
subNs.On("SetOutputDir", mock.Anything).Return()
subNs.On("ResetDirty").Return()
subNs.On("GetDataDir").Return(finalOutput)
subNs.On("GetOutputDir").Return(finalOutput)
subNs.On("SetParentTaskID", mock.Anything).Return()

dynamicNS := &flyteMocks.ExecutableNodeStatus{}
dynamicNS.On("SetDataDir", mock.Anything).Return()
dynamicNS.On("SetOutputDir", mock.Anything).Return()
dynamicNS.On("SetParentTaskID", mock.Anything).Return()
dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs)
dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_2").Return(subNs)
Expand All @@ -349,6 +353,8 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {

ns := &flyteMocks.ExecutableNodeStatus{}
ns.On("GetDataDir").Return(storage.DataReference("data-dir"))
ns.On("GetOutputDir").Return(storage.DataReference("output-dir"))
ns.On("GetNodeExecutionStatus", dynamicNodeID).Return(dynamicNS)
ns.OnGetNodeExecutionStatus(ctx, dynamicNodeID).Return(dynamicNS)
nCtx.On("NodeStatus").Return(ns)

Expand Down Expand Up @@ -400,7 +406,7 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
nCtx := createNodeContext("test", finalOutput)
s := &dynamicNodeStateHolder{}
nCtx.On("NodeStateWriter").Return(s)
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb")
f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetOutputDir(), "futures.pb")
assert.NoError(t, err)
if tt.args.dj != nil {
assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, tt.args.dj))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/end/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (e endHandler) Handle(ctx context.Context, executionContext handler.NodeExe
if inputs != nil {
logger.Debugf(ctx, "Workflow has outputs. Storing them.")
// TODO we should use OutputWriter here
o := v1alpha1.GetOutputsFile(executionContext.NodeStatus().GetDataDir())
o := v1alpha1.GetOutputsFile(executionContext.NodeStatus().GetOutputDir())
so := storage.Options{}
if err := executionContext.DataStore().WriteProtobuf(ctx, o, so, inputs); err != nil {
logger.Errorf(ctx, "Failed to store workflow outputs. Error [%s]", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/end/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestEndHandler_Handle(t *testing.T) {
nCtx.On("DataStore").Return(store)
ns := &mocks3.ExecutableNodeStatus{}
ns.On("GetDataDir").Return(outputRef)
ns.On("GetOutputDir").Return(outputRef)
nCtx.On("NodeStatus").Return(ns)
nCtx.On("NodeID").Return("end-node")
return nCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,8 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, w v1alpha1.Exe
if len(nodeStatus.GetDataDir()) == 0 {
return executors.NodeStatusUndefined, errors.Errorf(errors.IllegalStateError, startNode.GetID(), "no data-dir set, cannot store inputs")
}
outputFile := v1alpha1.GetOutputsFile(nodeStatus.GetOutputDir())

outputFile := v1alpha1.GetOutputsFile(nodeStatus.GetDataDir())
so := storage.Options{}
if err := c.store.WriteProtobuf(ctx, outputFile, so, inputs); err != nil {
logger.Errorf(ctx, "Failed to write protobuf (metadata). Error [%v]", err)
Expand Down
Loading

0 comments on commit 7b32cbb

Please sign in to comment.