Skip to content

Commit

Permalink
Fixing the output dir in workflow event sent to Admin (flyteorg#58)
Browse files Browse the repository at this point in the history
* test

* .

* new rm

* increment attempt after finalize, just before marking it running again

* test

* updating dependencies
  • Loading branch information
surindersinghp authored Jan 18, 2020
1 parent a88b12e commit e28ade0
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 41 deletions.
64 changes: 32 additions & 32 deletions flytepropeller/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flytepropeller/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ required = [

[[constraint]]
name = "github.com/lyft/flyteplugins"
version = "^0.2.5"
version = "^0.2.8"

[[override]]
name = "github.com/lyft/flytestdlib"
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
nodeStatus.SetDataDir(childNodeStatus.GetDataDir())
nodeStatus.SetOutputDir(childNodeStatus.GetOutputDir())
phase := handler.PhaseInfoSuccess(&handler.ExecutionInfo{
OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetDataDir())},
OutputInfo: &handler.OutputInfo{OutputURI: v1alpha1.GetOutputsFile(childNodeStatus.GetOutputDir())},
})
return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil
}
Expand Down
3 changes: 1 addition & 2 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo
), nil
}

nodeStatus.IncrementAttempts()
// Retrying to clearing all status
nCtx.nsm.clearNodeStatus()
}
Expand Down Expand Up @@ -360,7 +359,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
if err := c.finalize(ctx, h, nCtx); err != nil {
return executors.NodeStatusUndefined, err
}

nodeStatus.IncrementAttempts()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying")
// We are going to retry in the next round, so we should clear all current state
nodeStatus.ClearSubNodeStatus()
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {

{"(retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.UnknownTransition, fmt.Errorf("should not be invoked")
}, false, false, core.NodeExecution_RUNNING, 0},
}, false, false, core.NodeExecution_RUNNING, 1},

{"running->failing", v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("code", "reason", nil)), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *subworkflowHandler) DoInlineSubWorkflow(ctx context.Context, nCtx handl
}

// TODO optimization, we could just point the outputInfo to the path of the subworkflows output
destinationPath := v1alpha1.GetOutputsFile(parentNodeStatus.GetDataDir())
destinationPath := v1alpha1.GetOutputsFile(parentNodeStatus.GetOutputDir())
if err := store.CopyRaw(ctx, sourcePath, destinationPath, storage.Options{}); err != nil {
errMsg := fmt.Sprintf("Failed to copy subworkflow outputs from [%v] to [%v]", sourcePath, destinationPath)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(errors.SubWorkflowExecutionFailed, errMsg, nil)), nil
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler.
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
}

outputDir, err := nCtx.DataStore().ConstructReference(ctx, nodeStatus.GetDataDir(), "0")
outputDir, err := nCtx.DataStore().ConstructReference(ctx, dataDir, "0")
if err != nil {
err = errors2.Wrapf(err, "Failed to create metadata store key. Error [%v]", err)
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (c *workflowExecutor) handleSucceedingWorkflow(ctx context.Context, w *v1al
logger.Infof(ctx, "Workflow completed successfully")
endNodeStatus := w.GetNodeExecutionStatus(ctx, v1alpha1.EndNodeID)
if endNodeStatus.GetPhase() == v1alpha1.NodePhaseSucceeded {
if endNodeStatus.GetDataDir() != "" {
w.Status.SetOutputReference(v1alpha1.GetOutputsFile(endNodeStatus.GetDataDir()))
if endNodeStatus.GetOutputDir() != "" {
w.Status.SetOutputReference(v1alpha1.GetOutputsFile(endNodeStatus.GetOutputDir()))
}
}
return StatusSuccess
Expand Down

0 comments on commit e28ade0

Please sign in to comment.