Skip to content

Commit

Permalink
Refactor to remove Workflow from Node executor (flyteorg#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Apr 16, 2020
1 parent 7a2b71a commit 7ba3e89
Show file tree
Hide file tree
Showing 63 changed files with 4,264 additions and 1,509 deletions.
2 changes: 2 additions & 0 deletions cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

gotree "github.com/DiSiqueira/GoTree"
"github.com/lyft/flytestdlib/storage"
"github.com/spf13/cobra"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,6 +65,7 @@ func (g *GetOpts) getWorkflow(ctx context.Context, name string) error {
}
wp := printers.WorkflowPrinter{}
tree := gotree.New("Workflow")
w.DataReferenceConstructor = storage.URLPathConstructor{}
if err := wp.Print(ctx, tree, w); err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

gotree "github.com/DiSiqueira/GoTree"
"github.com/fatih/color"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task"
"github.com/lyft/flytepropeller/pkg/utils"
)
Expand Down Expand Up @@ -113,8 +113,7 @@ func (p NodePrinter) traverseNode(ctx context.Context, tree gotree.Tree, w v1alp
if node.GetWorkflowNode().GetSubWorkflowRef() != nil {
s := w.FindSubWorkflow(*node.GetWorkflowNode().GetSubWorkflowRef())
wp := WorkflowPrinter{}
cw := executors.NewSubContextualWorkflow(w, s, nodeStatus)
return wp.Print(ctx, tree, cw)
return wp.PrintSubWorkflow(ctx, tree, w, s, nodeStatus)
}
case v1alpha1.NodeKindTask:
sub := tree.Add(strings.Join(p.NodeInfo(w.GetName(), node, nodeStatus), " | "))
Expand Down
25 changes: 24 additions & 1 deletion cmd/kubectl-flyte/cmd/printers/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

gotree "github.com/DiSiqueira/GoTree"
"github.com/fatih/color"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/visualize"
)
Expand All @@ -25,7 +26,7 @@ func ColorizeWorkflowPhase(p v1alpha1.WorkflowPhase) string {
return color.CyanString("%s", p.String())
}

func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
func CalculateWorkflowRuntime(s v1alpha1.ExecutionTimeInfo) string {
if s.GetStartedAt() != nil {
if s.GetStoppedAt() != nil {
return s.GetStoppedAt().Sub(s.GetStartedAt().Time).String()
Expand All @@ -35,6 +36,12 @@ func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
return "na"
}

type ContextualWorkflow struct {
v1alpha1.MetaExtended
v1alpha1.ExecutableSubWorkflow
v1alpha1.NodeStatusGetter
}

type WorkflowPrinter struct {
}

Expand All @@ -53,6 +60,22 @@ func (p WorkflowPrinter) Print(ctx context.Context, tree gotree.Tree, w v1alpha1
return np.PrintList(ctx, newTree, w, sortedNodes)
}

func (p WorkflowPrinter) PrintSubWorkflow(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, swf v1alpha1.ExecutableSubWorkflow, ns v1alpha1.ExecutableNodeStatus) error {
sortedNodes, err := visualize.TopologicalSort(swf)
if err != nil {
return err
}
newTree := gotree.New(fmt.Sprintf("SubWorkflow [%s] (%s %s %s)",
swf.GetID(), CalculateWorkflowRuntime(ns),
ColorizeNodePhase(ns.GetPhase()), ns.GetMessage()))
if tree != nil {
tree.AddTree(newTree)
}
np := NodePrinter{}

return np.PrintList(ctx, newTree, &ContextualWorkflow{MetaExtended: w, ExecutableSubWorkflow: swf, NodeStatusGetter: ns}, sortedNodes)
}

func (p WorkflowPrinter) PrintShort(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
if tree == nil {
return fmt.Errorf("bad state in printer")
Expand Down
52 changes: 25 additions & 27 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1alpha1

import (
"context"

"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -238,16 +237,20 @@ type MutableNodeStatus interface {
ClearSubNodeStatus()
}

type ExecutionTimeInfo interface {
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
}

// Interface for a Node p. This provides a mutable API.
type ExecutableNodeStatus interface {
NodeStatusGetter
MutableNodeStatus
NodeStatusVisitor
ExecutionTimeInfo
GetPhase() NodePhase
GetQueuedAt() *metav1.Time
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
GetLastAttemptStartedAt() *metav1.Time
GetParentNodeID() *NodeID
GetParentTaskID() *core.TaskExecutionIdentifier
Expand Down Expand Up @@ -324,11 +327,9 @@ type ExecutableNode interface {
// Interface for the Workflow p. This is the mutable portion for a Workflow
type ExecutableWorkflowStatus interface {
NodeStatusGetter
ExecutionTimeInfo
UpdatePhase(p WorkflowPhase, msg string)
GetPhase() WorkflowPhase
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
IsTerminated() bool
GetMessage() string
SetDataDir(DataReference)
Expand All @@ -340,13 +341,18 @@ type ExecutableWorkflowStatus interface {
ConstructNodeDataDir(ctx context.Context, name NodeID) (storage.DataReference, error)
}

type NodeGetter interface {
GetNode(nodeID NodeID) (ExecutableNode, bool)
}

type BaseWorkflow interface {
NodeGetter
StartNode() ExecutableNode
GetID() WorkflowID
// From returns all nodes that can be reached directly
// from the node with the given unique name.
FromNode(name NodeID) ([]NodeID, error)
GetNode(nodeID NodeID) (ExecutableNode, bool)
ToNode(name NodeID) ([]NodeID, error)
}

type BaseWorkflowWithStatus interface {
Expand All @@ -365,9 +371,9 @@ type ExecutableSubWorkflow interface {
GetOutputs() *OutputVarMap
}

// WorkflowMeta provides an interface to retrieve labels, annotations and other concepts that are declared only once
// Meta provides an interface to retrieve labels, annotations and other concepts that are declared only once
// for the top level workflow
type WorkflowMeta interface {
type Meta interface {
GetExecutionID() ExecutionID
GetK8sWorkflowID() types.NamespacedName
GetOwnerReference() metav1.OwnerReference
Expand All @@ -384,17 +390,21 @@ type TaskDetailsGetter interface {
GetTask(id TaskID) (ExecutableTask, error)
}

type WorkflowMetaExtended interface {
WorkflowMeta
TaskDetailsGetter
type SubWorkflowGetter interface {
FindSubWorkflow(subID WorkflowID) ExecutableSubWorkflow
}

type MetaExtended interface {
Meta
TaskDetailsGetter
SubWorkflowGetter
GetExecutionStatus() ExecutableWorkflowStatus
}

// A Top level Workflow is a combination of WorkflowMeta and an ExecutableSubWorkflow
// A Top level Workflow is a combination of Meta and an ExecutableSubWorkflow
type ExecutableWorkflow interface {
ExecutableSubWorkflow
WorkflowMetaExtended
MetaExtended
NodeStatusGetter
}

Expand All @@ -420,15 +430,3 @@ func GetOutputsFile(outputDir DataReference) DataReference {
func GetInputsFile(inputDir DataReference) DataReference {
return inputDir + "/inputs.pb"
}

func GetOutputErrorFile(inputDir DataReference) DataReference {
return inputDir + "/error.pb"
}

func GetFutureFile() string {
return "futures.pb"
}

func GetCompiledFutureFile() string {
return "futures_compiled.pb"
}
41 changes: 41 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/BaseWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/BaseWorkflowWithStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableSubWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7ba3e89

Please sign in to comment.