Skip to content

Commit

Permalink
Add support for GateNode with signal and sleep condition (flyteorg#436)
Browse files Browse the repository at this point in the history
* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* Fix build break

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Update flyteidl version

Signed-off-by: Flyte-Bot <[email protected]>

* added GateNode to compiler

Signed-off-by: Daniel Rammer <[email protected]>

* added gate node handler

Signed-off-by: Daniel Rammer <[email protected]>

* enable reading and setting gate node state

Signed-off-by: Daniel Rammer <[email protected]>

* gate nodes working

Signed-off-by: Daniel Rammer <[email protected]>

* changed Conditional to Condition in proto naming

Signed-off-by: Daniel Rammer <[email protected]>

* passing admin client to gate node handler

Signed-off-by: Daniel Rammer <[email protected]>

* using signal service client to check for signal in admin and write output

Signed-off-by: Daniel Rammer <[email protected]>

* updated comments

Signed-off-by: Daniel Rammer <[email protected]>

* completed implementation

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests for gate node

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests with missing signal mocks

Signed-off-by: Daniel Rammer <[email protected]>

* added docs on gate node handler

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updating flyteidl dependency

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>

* added output variable name to signal condition

Signed-off-by: Daniel Rammer <[email protected]>

* using last attempt started at timestamp on node context rather than tracking in gate node status

Signed-off-by: Daniel Rammer <[email protected]>

* updated GateNodeStatus mocks

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl deps

Signed-off-by: Daniel Rammer <[email protected]>

* update flyteidl deps

Signed-off-by: Daniel Rammer <[email protected]>

* added interface validation for approve condition

Signed-off-by: Daniel Rammer <[email protected]>

* added approve condition unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed missed merge conflict updating to slice of dial options

Signed-off-by: Daniel Rammer <[email protected]>

* update generated mocks

Signed-off-by: Dan Rammer <[email protected]>

Signed-off-by: Flyte-Bot <[email protected]>
Signed-off-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Dan Rammer <[email protected]>
Co-authored-by: flyte-bot <[email protected]>
Co-authored-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
3 people authored Dec 1, 2022
1 parent ca7c744 commit bf00d9f
Show file tree
Hide file tree
Showing 42 changed files with 1,876 additions and 68 deletions.
6 changes: 5 additions & 1 deletion flytepropeller/cmd/kubectl-flyte/cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler"
"github.com/flyteorg/flytepropeller/pkg/compiler/common"
compilerErrors "github.com/flyteorg/flytepropeller/pkg/compiler/errors"
Expand Down Expand Up @@ -200,6 +201,9 @@ func (c *CreateOpts) createWorkflowFromProto() error {
if err != nil {
return err
}
flyteWf.ExecutionID = v1alpha1.WorkflowExecutionIdentifier{
WorkflowExecutionIdentifier: executionID,
}
if flyteWf.Annotations == nil {
flyteWf.Annotations = *c.annotations.value
} else {
Expand All @@ -209,7 +213,7 @@ func (c *CreateOpts) createWorkflowFromProto() error {
}

if c.dryRun {
fmt.Printf("Dry Run mode enabled. Printing the compiled workflow.")
fmt.Printf("Dry Run mode enabled. Printing the compiled workflow.\n")
j, err := json.Marshal(flyteWf)
if err != nil {
return errors.Wrapf(err, "Failed to marshal final workflow to Propeller format.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
workflow:
id:
name: workflow-id-123
domain: development
project: flytesnacks
interface:
inputs:
variables:
x:
type:
simple: INTEGER
"y":
type:
collectionType:
simple: STRING
nodes:
- id: node-1
gateNode:
approve:
signalId: foo
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
workflow:
id:
name: workflow-id-123
domain: development
project: flytesnacks
interface:
inputs:
variables: {}
nodes:
- id: node-1
gateNode:
signal:
signalId: foo
type:
simple: BOOLEAN
outputVariableName: o0
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
workflow:
id:
name: workflow-id-123
domain: development
project: flytesnacks
interface:
inputs:
variables: {}
nodes:
- id: node-1
gateNode:
sleep:
duration: 10s
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
workflow:
id:
name: missing-launchplan
nodes:
- id: node-1
workflowNode:
launchplanRef:
project: foo
domain: bar
name: baz
106 changes: 106 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package v1alpha1

import (
"bytes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/golang/protobuf/jsonpb"
)

type ConditionKind string

func (n ConditionKind) String() string {
return string(n)
}

const (
ConditionKindApprove ConditionKind = "approve"
ConditionKindSignal ConditionKind = "signal"
ConditionKindSleep ConditionKind = "sleep"
)

type ApproveCondition struct {
*core.ApproveCondition
}

func (in ApproveCondition) MarshalJSON() ([]byte, error) {
if in.ApproveCondition == nil {
return nilJSON, nil
}

var buf bytes.Buffer
if err := marshaler.Marshal(&buf, in.ApproveCondition); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (in *ApproveCondition) UnmarshalJSON(b []byte) error {
in.ApproveCondition = &core.ApproveCondition{}
return jsonpb.Unmarshal(bytes.NewReader(b), in.ApproveCondition)
}

type SignalCondition struct {
*core.SignalCondition
}

func (in SignalCondition) MarshalJSON() ([]byte, error) {
if in.SignalCondition == nil {
return nilJSON, nil
}

var buf bytes.Buffer
if err := marshaler.Marshal(&buf, in.SignalCondition); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (in *SignalCondition) UnmarshalJSON(b []byte) error {
in.SignalCondition = &core.SignalCondition{}
return jsonpb.Unmarshal(bytes.NewReader(b), in.SignalCondition)
}

type SleepCondition struct {
*core.SleepCondition
}

func (in SleepCondition) MarshalJSON() ([]byte, error) {
if in.SleepCondition == nil {
return nilJSON, nil
}

var buf bytes.Buffer
if err := marshaler.Marshal(&buf, in.SleepCondition); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (in *SleepCondition) UnmarshalJSON(b []byte) error {
in.SleepCondition = &core.SleepCondition{}
return jsonpb.Unmarshal(bytes.NewReader(b), in.SleepCondition)
}

type GateNodeSpec struct {
Kind ConditionKind `json:"kind"`
Approve *ApproveCondition `json:"approve,omitempty"`
Signal *SignalCondition `json:"signal,omitempty"`
Sleep *SleepCondition `json:"sleep,omitempty"`
}

func (g *GateNodeSpec) GetKind() ConditionKind {
return g.Kind
}

func (g *GateNodeSpec) GetApprove() *core.ApproveCondition {
return g.Approve.ApproveCondition
}

func (g *GateNodeSpec) GetSignal() *core.SignalCondition {
return g.Signal.SignalCondition
}

func (g *GateNodeSpec) GetSleep() *core.SleepCondition {
return g.Sleep.SleepCondition
}
23 changes: 23 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
NodeKindTask NodeKind = "task"
NodeKindBranch NodeKind = "branch" // A Branch node with conditions
NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition
NodeKindGate NodeKind = "gate" // A Gate node with a condition
NodeKindStart NodeKind = "start" // Start node is a special node
NodeKindEnd NodeKind = "end"
)
Expand Down Expand Up @@ -245,6 +246,13 @@ type ExecutableBranchNode interface {
GetElseFail() *core.Error
}

type ExecutableGateNode interface {
GetKind() ConditionKind
GetApprove() *core.ApproveCondition
GetSignal() *core.SignalCondition
GetSleep() *core.SleepCondition
}

type ExecutableWorkflowNodeStatus interface {
GetWorkflowNodePhase() WorkflowNodePhase
GetExecutionError() *core.ExecutionError
Expand All @@ -257,6 +265,16 @@ type MutableWorkflowNodeStatus interface {
SetExecutionError(executionError *core.ExecutionError)
}

type ExecutableGateNodeStatus interface {
GetGateNodePhase() GateNodePhase
}

type MutableGateNodeStatus interface {
Mutable
ExecutableGateNodeStatus
SetGateNodePhase(phase GateNodePhase)
}

type Mutable interface {
IsDirty() bool
}
Expand Down Expand Up @@ -288,6 +306,10 @@ type MutableNodeStatus interface {
ClearDynamicNodeStatus()
ClearLastAttemptStartedAt()
ClearSubNodeStatus()

GetGateNodeStatus() MutableGateNodeStatus
GetOrCreateGateNodeStatus() MutableGateNodeStatus
ClearGateNodeStatus()
}

type ExecutionTimeInfo interface {
Expand Down Expand Up @@ -370,6 +392,7 @@ type ExecutableNode interface {
GetTaskID() *TaskID
GetBranchNode() ExecutableBranchNode
GetWorkflowNode() ExecutableWorkflowNode
GetGateNode() ExecutableGateNode
GetOutputAlias() []Alias
GetInputBindings() []*Binding
GetResources() *v1.ResourceRequirements
Expand Down
Loading

0 comments on commit bf00d9f

Please sign in to comment.