Skip to content

Commit

Permalink
Statemachine & Status fixes (Discretization, status size... etc.) (fl…
Browse files Browse the repository at this point in the history
…yteorg#49)

This PR fixes a few issues uncovered during the investigation of the statemachine inconsistency issues last week. Specifically:

- [X] Ensure each node can a progress at most once per round (IsDirty flag)
- [X] Remove ParentTaskID and DataDir from NodeStatus field (Causing workflow etcd. obj size to bloat)
- [X] Add Parent RetryAttempt in the generated hierarchal name of dynamic sub-nodes to ensure retries do not reuse an existing sub-node status.

Details: https://docs.google.com/document/d/1ISaxIZeYLcBaeapEmeTqb-g0x04pJbf5t3i30qMfk6U/edit?usp=sharing
  • Loading branch information
EngHabu authored Jan 2, 2020
1 parent 178cd0e commit 3574a4d
Show file tree
Hide file tree
Showing 70 changed files with 1,110 additions and 385 deletions.
9 changes: 6 additions & 3 deletions flytepropeller/cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -33,9 +34,11 @@ func NewGetCommand(opts *RootOptions) *cobra.Command {
Short: "Gets a single workflow or lists all workflows currently in execution",
Long: `use labels to filter`,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

if len(args) > 0 {
name := args[0]
return getOpts.getWorkflow(name)
return getOpts.getWorkflow(ctx, name)
}
return getOpts.listWorkflows()
},
Expand All @@ -49,7 +52,7 @@ func NewGetCommand(opts *RootOptions) *cobra.Command {
return getCmd
}

func (g *GetOpts) getWorkflow(name string) error {
func (g *GetOpts) getWorkflow(ctx context.Context, name string) error {
parts := strings.Split(name, "/")
if len(parts) > 1 {
g.ConfigOverrides.Context.Namespace = parts[0]
Expand All @@ -61,7 +64,7 @@ func (g *GetOpts) getWorkflow(name string) error {
}
wp := printers.WorkflowPrinter{}
tree := gotree.New("Workflow")
if err := wp.Print(tree, w); err != nil {
if err := wp.Print(ctx, tree, w); err != nil {
return err
}
fmt.Print(tree.Print())
Expand Down
13 changes: 7 additions & 6 deletions flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package printers

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (p NodePrinter) BranchNodeInfo(node v1alpha1.ExecutableNode, nodeStatus v1a

}

func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) error {
func (p NodePrinter) traverseNode(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) error {
switch node.GetKind() {
case v1alpha1.NodeKindBranch:
subTree := tree.Add(strings.Join(p.BranchNodeInfo(node, nodeStatus), " | "))
Expand All @@ -89,7 +90,7 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
if !ok {
return fmt.Errorf("failed to find branch node %s", *nodeID)
}
if err := p.traverseNode(subTree, w, ifNode, nodeStatus.GetNodeExecutionStatus(*nodeID)); err != nil {
if err := p.traverseNode(ctx, subTree, w, ifNode, nodeStatus.GetNodeExecutionStatus(ctx, *nodeID)); err != nil {
return err
}
}
Expand All @@ -113,7 +114,7 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
s := w.FindSubWorkflow(*node.GetWorkflowNode().GetSubWorkflowRef())
wp := WorkflowPrinter{}
cw := executors.NewSubContextualWorkflow(w, s, nodeStatus)
return wp.Print(tree, cw)
return wp.Print(ctx, tree, cw)
}
case v1alpha1.NodeKindTask:
sub := tree.Add(strings.Join(p.NodeInfo(w.GetName(), node, nodeStatus), " | "))
Expand All @@ -126,10 +127,10 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
return nil
}

func (p NodePrinter) PrintList(tree gotree.Tree, w v1alpha1.ExecutableWorkflow, nodes []v1alpha1.ExecutableNode) error {
func (p NodePrinter) PrintList(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, nodes []v1alpha1.ExecutableNode) error {
for _, n := range nodes {
s := w.GetNodeExecutionStatus(n.GetID())
if err := p.traverseNode(tree, w, n, s); err != nil {
s := w.GetNodeExecutionStatus(ctx, n.GetID())
if err := p.traverseNode(ctx, tree, w, n, s); err != nil {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package printers

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
type WorkflowPrinter struct {
}

func (p WorkflowPrinter) Print(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
func (p WorkflowPrinter) Print(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
sortedNodes, err := visualize.TopologicalSort(w)
if err != nil {
return err
Expand All @@ -49,7 +50,7 @@ func (p WorkflowPrinter) Print(tree gotree.Tree, w v1alpha1.ExecutableWorkflow)
tree.AddTree(newTree)
}
np := NodePrinter{}
return np.PrintList(newTree, w, sortedNodes)
return np.PrintList(ctx, newTree, w, sortedNodes)
}

func (p WorkflowPrinter) PrintShort(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
Expand Down
16 changes: 13 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type ExecutableBranchNodeStatus interface {
}

type MutableBranchNodeStatus interface {
Mutable
ExecutableBranchNodeStatus

SetBranchNodeError()
Expand All @@ -178,6 +179,7 @@ type ExecutableDynamicNodeStatus interface {
}

type MutableDynamicNodeStatus interface {
Mutable
ExecutableDynamicNodeStatus

SetDynamicNodePhase(phase DynamicNodePhase)
Expand All @@ -198,11 +200,17 @@ type ExecutableWorkflowNodeStatus interface {
}

type MutableWorkflowNodeStatus interface {
Mutable
ExecutableWorkflowNodeStatus
SetWorkflowNodePhase(phase WorkflowNodePhase)
}

type Mutable interface {
IsDirty() bool
}

type MutableNodeStatus interface {
Mutable
// Mutation API's
SetDataDir(DataReference)
SetParentNodeID(n *NodeID)
Expand All @@ -225,6 +233,7 @@ type MutableNodeStatus interface {
GetDynamicNodeStatus() MutableDynamicNodeStatus
ClearDynamicNodeStatus()
ClearLastAttemptStartedAt()
ClearSubNodeStatus()
}

// Interface for a Node p. This provides a mutable API.
Expand All @@ -247,14 +256,14 @@ type ExecutableNodeStatus interface {
GetTaskNodeStatus() ExecutableTaskNodeStatus

IsCached() bool
IsDirty() bool
}

type ExecutableSubWorkflowNodeStatus interface {
GetPhase() WorkflowPhase
}

type MutableSubWorkflowNodeStatus interface {
Mutable
ExecutableSubWorkflowNodeStatus
SetPhase(phase WorkflowPhase)
}
Expand All @@ -268,6 +277,7 @@ type ExecutableTaskNodeStatus interface {
}

type MutableTaskNodeStatus interface {
Mutable
ExecutableTaskNodeStatus
SetPhase(phase int)
SetPhaseVersion(version uint32)
Expand Down Expand Up @@ -320,7 +330,7 @@ type ExecutableWorkflowStatus interface {
SetOutputReference(reference DataReference)
IncFailedAttempts()
SetMessage(msg string)
ConstructNodeDataDir(ctx context.Context, constructor storage.ReferenceConstructor, name NodeID) (storage.DataReference, error)
ConstructNodeDataDir(ctx context.Context, name NodeID) (storage.DataReference, error)
}

type BaseWorkflow interface {
Expand Down Expand Up @@ -381,7 +391,7 @@ type ExecutableWorkflow interface {
}

type NodeStatusGetter interface {
GetNodeExecutionStatus(id NodeID) ExecutableNodeStatus
GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus
}

type NodeStatusMap = map[NodeID]ExecutableNodeStatus
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3574a4d

Please sign in to comment.