From fc9e555c93584e6244942fbd0efcd27be4322b95 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Wed, 21 Jul 2021 09:34:43 -0700 Subject: [PATCH] Simplify GetNodeExecutionStatus (#294) * Revert "Construct subnode DataDir to be under parent's node OutputDir to keep behavior consistent across (#292)" (#293) This reverts commit 051eba7181b2aac49b6a83ac5c5854c41c6dda3d. Signed-off-by: Haytham Abuelfutuh * Simplify GetNodeExecutionStatus Signed-off-by: Haytham Abuelfutuh * SetDataDir in cached DynamicWorkflow case Signed-off-by: Haytham Abuelfutuh * Fix case in which downstream nodes may incorrectly fail because of parentNodeID #minor (#288) * Branch canexecute fix Signed-off-by: Ketan Umare * tests Signed-off-by: Ketan Umare * more unit test fixes Signed-off-by: Ketan Umare * fixed tests Signed-off-by: Ketan Umare * more documentation Signed-off-by: Ketan Umare Signed-off-by: Haytham Abuelfutuh * [wip] Working for ev1 to be merged into #294 (#295) Signed-off-by: Ketan Umare Signed-off-by: Haytham Abuelfutuh * bump for DCO Signed-off-by: Haytham Abuelfutuh * Fix DataDir of child nodes Signed-off-by: Haytham Abuelfutuh * Fix mocks for tests Signed-off-by: Haytham Abuelfutuh * fixed unit test Signed-off-by: Ketan Umare Co-authored-by: Ketan Umare <16888709+kumare3@users.noreply.github.com> Co-authored-by: Ketan Umare --- config.yaml | 6 +- .../flyteworkflow/v1alpha1/node_status.go | 76 +++++------ .../v1alpha1/node_status_test.go | 62 +++++++++ .../nodes/dynamic/dynamic_workflow.go | 126 ++++++++++++------ .../nodes/dynamic/dynamic_workflow_test.go | 4 +- pkg/controller/nodes/dynamic/handler_test.go | 6 + 6 files changed, 191 insertions(+), 89 deletions(-) diff --git a/config.yaml b/config.yaml index 01bb7501b7..2ea641a31d 100644 --- a/config.yaml +++ b/config.yaml @@ -3,7 +3,7 @@ propeller: rawoutput-prefix: s3://my-container/test/ metadata-prefix: metadata/propeller/sandbox - workers: 4 + workers: 1 workflow-reeval-duration: 10s downstream-eval-duration: 5s limit-namespace: "all" @@ -23,7 +23,7 @@ propeller: rate: 100 capacity: 1000 # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container - kube-config: "$HOME/kubeconfig/k3s/k3s.yaml" + kube-config: "$HOME/.flyte/k3s/k3s.yaml" publish-k8s-events: true workflowStore: policy: "ResourceVersionCache" @@ -102,5 +102,5 @@ catalog-cache: endpoint: datacatalog:8089 insecure: true logger: - level: 4 + level: 5 show-source: true diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index c720f94c14..3cb12d8019 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "reflect" "strconv" "time" @@ -517,62 +518,53 @@ func (in NodeStatus) GetTaskNodeStatus() ExecutableTaskNodeStatus { return in.TaskNodeStatus } -func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus { - n, ok := in.SubNodeStatus[id] - if ok { - n.SetParentTaskID(in.GetParentTaskID()) - n.DataReferenceConstructor = in.DataReferenceConstructor - if len(n.GetDataDir()) == 0 { - dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id) - if err != nil { - logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id) - return n - } - - n.SetDataDir(dataDir) +func (in *NodeStatus) setEphemeralNodeExecutionStatusAttributes(ctx context.Context, id NodeID, n *NodeStatus) error { + n.SetParentTaskID(in.GetParentTaskID()) + if len(n.GetDataDir()) == 0 { + dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetOutputDir(), id) + if err != nil { + return fmt.Errorf("failed to construct data dir for node [%v]. Error: %w", id, err) } - if len(n.GetOutputDir()) == 0 { - outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(in.Attempts), 10)) - if err != nil { - logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id) - return n - } + n.SetDataDir(dataDir) + } - n.SetOutputDir(outputDir) + if len(n.GetOutputDir()) == 0 { + outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, n.GetDataDir(), strconv.FormatUint(uint64(n.Attempts), 10)) + if err != nil { + return fmt.Errorf("failed to construct output dir for node [%v]. Error: %w", id, err) } - return n + n.SetOutputDir(outputDir) } - if in.SubNodeStatus == nil { - in.SubNodeStatus = make(map[NodeID]*NodeStatus) - } + n.DataReferenceConstructor = in.DataReferenceConstructor - newNodeStatus := &NodeStatus{ - MutableStruct: MutableStruct{}, - } - newNodeStatus.SetParentTaskID(in.GetParentTaskID()) - newNodeStatus.SetParentNodeID(in.GetParentNodeID()) - dataDir, err := in.DataReferenceConstructor.ConstructReference(ctx, in.GetDataDir(), id) - if err != nil { - logger.Errorf(ctx, "Failed to construct data dir for node [%v]", id) - return n + return nil +} + +func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus { + n, ok := in.SubNodeStatus[id] + if !ok { + if in.SubNodeStatus == nil { + in.SubNodeStatus = make(map[NodeID]*NodeStatus) + } + + n = &NodeStatus{ + MutableStruct: MutableStruct{}, + } + + in.SubNodeStatus[id] = n + in.SetDirty() } - outputDir, err := in.DataReferenceConstructor.ConstructReference(ctx, dataDir, "0") + err := in.setEphemeralNodeExecutionStatusAttributes(ctx, id, n) if err != nil { - logger.Errorf(ctx, "Failed to construct output dir for node [%v]", id) + logger.Errorf(ctx, "Failed to set node attributes for node [%v]. Error: %v", id, err) return n } - newNodeStatus.SetDataDir(dataDir) - newNodeStatus.SetOutputDir(outputDir) - newNodeStatus.DataReferenceConstructor = in.DataReferenceConstructor - - in.SubNodeStatus[id] = newNodeStatus - in.SetDirty() - return newNodeStatus + return n } func (in *NodeStatus) IsTerminated() bool { diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go b/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go index 85827a7fc3..4d396cc6b1 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status_test.go @@ -1,9 +1,12 @@ package v1alpha1 import ( + "context" "encoding/json" "testing" + "github.com/flyteorg/flytestdlib/storage" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/stretchr/testify/assert" ) @@ -190,3 +193,62 @@ func TestDynamicNodeStatus_SetExecutionError(t *testing.T) { }) } } + +func TestNodeStatus_GetNodeExecutionStatus(t *testing.T) { + ctx := context.Background() + t.Run("First Level", func(t *testing.T) { + t.Run("Not cached", func(t *testing.T) { + n := NodeStatus{ + SubNodeStatus: map[NodeID]*NodeStatus{}, + DataReferenceConstructor: storage.URLPathConstructor{}, + } + + newNode := n.GetNodeExecutionStatus(ctx, "abc") + assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir()) + }) + + t.Run("cached", func(t *testing.T) { + n := NodeStatus{ + SubNodeStatus: map[NodeID]*NodeStatus{}, + DataReferenceConstructor: storage.URLPathConstructor{}, + } + + newNode := n.GetNodeExecutionStatus(ctx, "abc") + assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir()) + + newNode = n.GetNodeExecutionStatus(ctx, "abc") + assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir()) + }) + + t.Run("cached but datadir not populated", func(t *testing.T) { + n := NodeStatus{ + SubNodeStatus: map[NodeID]*NodeStatus{ + "abc": {}, + }, + DataReferenceConstructor: storage.URLPathConstructor{}, + } + + newNode := n.GetNodeExecutionStatus(ctx, "abc") + assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir()) + }) + }) + + t.Run("Nested", func(t *testing.T) { + n := NodeStatus{ + SubNodeStatus: map[NodeID]*NodeStatus{}, + DataReferenceConstructor: storage.URLPathConstructor{}, + } + + newNode := n.GetNodeExecutionStatus(ctx, "abc") + assert.Equal(t, storage.DataReference("/abc/0"), newNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc"), newNode.GetDataDir()) + + subsubNode := newNode.GetNodeExecutionStatus(ctx, "xyz") + assert.Equal(t, storage.DataReference("/abc/0/xyz/0"), subsubNode.GetOutputDir()) + assert.Equal(t, storage.DataReference("/abc/0/xyz"), subsubNode.GetDataDir()) + }) +} diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index 36b368cf90..fc7b24fe7a 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -34,48 +34,62 @@ type dynamicWorkflowContext struct { const dynamicWfNameTemplate = "dynamic_%s" -func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Context, djSpec *core.DynamicJobSpec, - nCtx handler.NodeExecutionContext, parentNodeStatus v1alpha1.ExecutableNodeStatus) (*core.WorkflowTemplate, error) { - - iface, err := underlyingInterface(ctx, nCtx.TaskReader()) - if err != nil { - return nil, err +func setEphemeralNodeExecutionStatusAttributes(ctx context.Context, djSpec *core.DynamicJobSpec, + nCtx handler.NodeExecutionContext, parentNodeStatus v1alpha1.ExecutableNodeStatus) error { + if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 { + return nil } currentAttemptStr := strconv.Itoa(int(nCtx.CurrentAttempt())) // Modify node IDs to include lineage, the entire system assumes node IDs are unique per parent WF. - // We keep track of the original node ids because that's where inputs are written to. + // We keep track of the original node ids because that's where flytekit inputs are written to in the case of legacy + // map tasks. The modern map tasks do not write input files any longer and this entire piece of code can be removed. parentNodeID := nCtx.NodeID() for _, node := range djSpec.Nodes { nodeID := node.Id var subNodeStatus v1alpha1.ExecutableNodeStatus - if nCtx.ExecutionContext().GetEventVersion() == v1alpha1.EventVersion0 { - newID, err := hierarchicalNodeID(parentNodeID, currentAttemptStr, nodeID) - if err != nil { - return nil, err - } - // Instantiate a nodeStatus using the modified name but set its data directory using the original name. - subNodeStatus = parentNodeStatus.GetNodeExecutionStatus(ctx, newID) - node.Id = newID - } else { - subNodeStatus = parentNodeStatus.GetNodeExecutionStatus(ctx, nodeID) + newID, err := hierarchicalNodeID(parentNodeID, currentAttemptStr, nodeID) + if err != nil { + return err } + // Instantiate a nodeStatus using the modified name but set its data directory using the original name. + subNodeStatus = parentNodeStatus.GetNodeExecutionStatus(ctx, newID) + node.Id = newID // NOTE: This is the second step of 2-step-dynamic-node execution. Input dir for this step is generated by // parent task as a sub-directory(n.Id) in the parent node's output dir. originalNodePath, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), nodeID) if err != nil { - return nil, err + return err } outputDir, err := nCtx.DataStore().ConstructReference(ctx, originalNodePath, strconv.Itoa(int(subNodeStatus.GetAttempts()))) if err != nil { - return nil, err + return err } + subNodeStatus.SetDataDir(originalNodePath) subNodeStatus.SetOutputDir(outputDir) } + return nil +} + +func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Context, djSpec *core.DynamicJobSpec, + nCtx handler.NodeExecutionContext, parentNodeStatus v1alpha1.ExecutableNodeStatus) (*core.WorkflowTemplate, error) { + + iface, err := underlyingInterface(ctx, nCtx.TaskReader()) + if err != nil { + return nil, err + } + + err = setEphemeralNodeExecutionStatusAttributes(ctx, djSpec, nCtx, parentNodeStatus) + if err != nil { + return nil, err + } + + parentNodeID := nCtx.NodeID() + currentAttemptStr := strconv.Itoa(int(nCtx.CurrentAttempt())) if nCtx.TaskReader().GetTaskID() != nil { // If the parent is a task, pass down data children nodes should inherit. parentTask, err := nCtx.TaskReader().Read(ctx) @@ -134,6 +148,8 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C dynamicNodeStatus.SetDataDir(nCtx.NodeStatus().GetDataDir()) dynamicNodeStatus.SetOutputDir(nCtx.NodeStatus().GetOutputDir()) dynamicNodeStatus.SetParentTaskID(execID) + id := nCtx.NodeID() + dynamicNodeStatus.SetParentNodeID(&id) cacheHitStopWatch := d.metrics.CacheHit.Start(ctx) // Check if we have compiled the workflow before: @@ -148,12 +164,26 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C logger.Warnf(ctx, "Failed to load cached flyte workflow, this will cause the dynamic workflow to be recompiled. Error: %v", err) d.metrics.CacheError.Inc(ctx) } else { - cacheHitStopWatch.Stop() + // We know for sure that futures file was generated. Lets read it + djSpec, err := f.Read(ctx) + if err != nil { + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted") + } + + err = setEphemeralNodeExecutionStatusAttributes(ctx, djSpec, nCtx, dynamicNodeStatus) + if err != nil { + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to set ephemeral node execution attributions") + } + newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) if err != nil { return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") } + compiledWf := workflowCacheContents.WorkflowCRD + + cacheHitStopWatch.Stop() + return dynamicWorkflowContext{ isDynamic: true, subWorkflow: compiledWf, @@ -171,27 +201,53 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted") } + closure, dynamicWf, workflowContext, err := d.buildDynamicWorkflow(ctx, nCtx, djSpec, dynamicNodeStatus) + if err != nil { + return workflowContext, err + } + + if err := f.Cache(ctx, dynamicWf, closure); err != nil { + logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error()) + } + + // The current node would end up becoming the parent for the dynamic task nodes. + // This is done to track the lineage. For level zero, the CreateParentInfo will return nil + newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + if err != nil { + return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") + } + return dynamicWorkflowContext{ + isDynamic: true, + subWorkflow: dynamicWf, + subWorkflowClosure: closure, + execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()), + nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus), + }, nil +} + +func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, + djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, dynamicWorkflowContext, error) { wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus) if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template") + return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template") } compiledTasks, err := compileTasks(ctx, djSpec.Tasks) if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks") + return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks") } // Get the requirements, that is, a list of all the task IDs and the launch plan IDs that will be called as part of this dynamic task. // The definition of these will need to be fetched from Admin (in order to get the interface). requirements, err := compiler.GetRequirements(wf, djSpec.Subworkflows) if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows") + return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows") } // This method handles user vs system errors internally launchPlanInterfaces, err := d.getLaunchPlanInterfaces(ctx, requirements.GetRequiredLaunchPlanIds()) if err != nil { - return dynamicWorkflowContext{}, err + return nil, nil, dynamicWorkflowContext{}, err } // TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec. @@ -201,31 +257,15 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C var closure *core.CompiledWorkflowClosure closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces) if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow") + return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow") } dynamicWf, err := k8s.BuildFlyteWorkflow(closure, &core.LiteralMap{}, nil, "") if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow") - } - - if err := f.Cache(ctx, dynamicWf, closure); err != nil { - logger.Errorf(ctx, "Failed to cache Dynamic workflow [%s]", err.Error()) + return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow") } - // The current node would end up becoming the parent for the dynamic task nodes. - // This is done to track the lineage. For level zero, the CreateParentInfo will return nil - newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) - if err != nil { - return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") - } - return dynamicWorkflowContext{ - isDynamic: true, - subWorkflow: dynamicWf, - subWorkflowClosure: closure, - execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()), - nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus), - }, nil + return closure, dynamicWf, dynamicWorkflowContext{}, nil } func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup, diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 8fd2954631..74a72396be 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -111,12 +111,14 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t subNs.On("ResetDirty").Return() subNs.OnGetOutputDir().Return(finalOutput) subNs.On("SetParentTaskID", mock.Anything).Return() + subNs.On("SetParentNodeID", mock.Anything).Return() subNs.OnGetAttempts().Return(0) dynamicNS := &mocks2.ExecutableNodeStatus{} dynamicNS.On("SetDataDir", mock.Anything).Return() dynamicNS.On("SetOutputDir", mock.Anything).Return() dynamicNS.On("SetParentTaskID", mock.Anything).Return() + dynamicNS.On("SetParentNodeID", mock.Anything).Return() dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "Node_1").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, v1alpha1.EndNodeID).Return(endNodeStatus) @@ -153,7 +155,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t finalOutput := storage.DataReference("/subnode") nCtx := createNodeContext("test", finalOutput, nil) s := &dynamicNodeStateHolder{} - nCtx.On("NodeStateWriter").Return(s) + nCtx.OnNodeStateWriter().Return(s) f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") assert.NoError(t, err) assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, djSpec)) diff --git a/pkg/controller/nodes/dynamic/handler_test.go b/pkg/controller/nodes/dynamic/handler_test.go index dca24c239c..4ccd228b22 100644 --- a/pkg/controller/nodes/dynamic/handler_test.go +++ b/pkg/controller/nodes/dynamic/handler_test.go @@ -467,6 +467,7 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) { subNs := &flyteMocks.ExecutableNodeStatus{} subNs.On("SetDataDir", mock.Anything).Return() subNs.On("SetOutputDir", mock.Anything).Return() + subNs.On("SetParentNodeID", mock.Anything).Return() subNs.On("ResetDirty").Return() subNs.OnGetOutputDir().Return(finalOutput) subNs.On("SetParentTaskID", mock.Anything).Return() @@ -476,6 +477,7 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) { dynamicNS.On("SetDataDir", mock.Anything).Return() dynamicNS.On("SetOutputDir", mock.Anything).Return() dynamicNS.On("SetParentTaskID", mock.Anything).Return() + dynamicNS.On("SetParentNodeID", mock.Anything).Return() dynamicNS.OnGetNodeExecutionStatus(ctx, "Node_1").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "Node_2").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "Node_3").Return(subNs) @@ -658,12 +660,14 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { subNs.On("ResetDirty").Return() subNs.OnGetOutputDir().Return(finalOutput) subNs.On("SetParentTaskID", mock.Anything).Return() + subNs.On("SetParentNodeID", mock.Anything).Return() subNs.OnGetAttempts().Return(0) dynamicNS := &flyteMocks.ExecutableNodeStatus{} dynamicNS.On("SetDataDir", mock.Anything).Return() dynamicNS.On("SetOutputDir", mock.Anything).Return() dynamicNS.On("SetParentTaskID", mock.Anything).Return() + dynamicNS.On("SetParentNodeID", mock.Anything).Return() dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_2").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_3").Return(subNs) @@ -907,12 +911,14 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { subNs.On("ResetDirty").Return() subNs.OnGetOutputDir().Return(finalOutput) subNs.On("SetParentTaskID", mock.Anything).Return() + subNs.On("SetParentNodeID", mock.Anything).Return() subNs.OnGetAttempts().Return(0) dynamicNS := &flyteMocks.ExecutableNodeStatus{} dynamicNS.On("SetDataDir", mock.Anything).Return() dynamicNS.On("SetOutputDir", mock.Anything).Return() dynamicNS.On("SetParentTaskID", mock.Anything).Return() + dynamicNS.On("SetParentNodeID", mock.Anything).Return() dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_2").Return(subNs) dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_3").Return(subNs)