Skip to content

Commit

Permalink
Upstream/node event update (flyteorg#5528)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Vladyslav Libov <[email protected]>
  • Loading branch information
wild-endeavor authored and VladyslavLibov committed Aug 16, 2024
1 parent 352be64 commit af4f800
Show file tree
Hide file tree
Showing 27 changed files with 341 additions and 70 deletions.
12 changes: 8 additions & 4 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb-es/flyteidl/event/event_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb-go/flyteidl/event/event.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/gen/pb_rust/flyteidl.event.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions flyteidl/protos/flyteidl/event/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,12 @@ message NodeExecutionEvent {
// Indicates if this node is an ArrayNode.
bool is_array = 22;

// Holding this field here for now, this will be upstreamed soon.
// So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this
// in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding
// to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be
// nested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow.
core.Identifier target_entity = 23;

// Holding this field here for now, this will be upstreamed soon.
// Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of
// the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not
// even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea
Expand Down
13 changes: 10 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,18 @@ func (in *NodeSpec) GetOutputAlias() []Alias {
return in.OutputAliases
}

// In functions below, explicitly strip out nil type information because NodeSpec's WorkflowNode is a struct type,
// not interface and downstream nil checks will not pass.
// See the test in TestPointersForNodeSpec for more information.

func (in *NodeSpec) GetWorkflowNode() ExecutableWorkflowNode {
if in.WorkflowNode == nil {
return nil
if in != nil {
if in.WorkflowNode == nil {
return nil
}
return in.WorkflowNode
}
return in.WorkflowNode
return nil
}

func (in *NodeSpec) GetBranchNode() ExecutableBranchNode {
Expand Down
51 changes: 51 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

type CanDo interface {
MyDo() int
}

type Concrete struct {
Doer CanDo
}

func (c *Concrete) MyDo() int {
return 1
}

type Parent struct {
Concrete *Concrete
}

func (p *Parent) GetDoer() CanDo {
return p.Concrete
}

func (p *Parent) GetConcreteDoer() *Concrete {
return p.Concrete
}

func TestPointersForNodeSpec(t *testing.T) {
p := &Parent{
Concrete: nil,
}
// GetDoer returns a fake nil because it carries type information
// assert.NotNil(t, p.GetDoer()) funnily enough doesn't work, so use a regular if statement
if p.GetDoer() == nil {
assert.Fail(t, "GetDoer")
}

assert.Nil(t, p.GetConcreteDoer())
}

func TestNodeSpec(t *testing.T) {
n := &NodeSpec{
WorkflowNode: nil,
}
assert.Nil(t, n.GetWorkflowNode())
}
10 changes: 8 additions & 2 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ type WorkflowNodeSpec struct {
}

func (in *WorkflowNodeSpec) GetLaunchPlanRefID() *LaunchPlanRefID {
return in.LaunchPlanRefID
if in != nil {
return in.LaunchPlanRefID
}
return nil
}

func (in *WorkflowNodeSpec) GetSubWorkflowRef() *WorkflowID {
return in.SubWorkflowReference
if in != nil {
return in.SubWorkflowReference
}
return nil
}
17 changes: 12 additions & 5 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ParentInfoGetter interface {
type ImmutableParentInfo interface {
GetUniqueID() v1alpha1.NodeID
CurrentAttempt() uint32
IsInDynamicChain() bool
}

type ControlFlow interface {
Expand Down Expand Up @@ -60,14 +61,19 @@ func (e execContext) GetParentInfo() ImmutableParentInfo {
}

type parentExecutionInfo struct {
uniqueID v1alpha1.NodeID
currentAttempts uint32
uniqueID v1alpha1.NodeID
currentAttempts uint32
isInDynamicChain bool
}

func (p *parentExecutionInfo) GetUniqueID() v1alpha1.NodeID {
return p.uniqueID
}

func (p *parentExecutionInfo) IsInDynamicChain() bool {
return p.isInDynamicChain
}

func (p *parentExecutionInfo) CurrentAttempt() uint32 {
return p.currentAttempts
}
Expand Down Expand Up @@ -129,10 +135,11 @@ func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter T
}
}

func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo {
func NewParentInfo(uniqueID string, currentAttempts uint32, isInDynamicChain bool) ImmutableParentInfo {
return &parentExecutionInfo{
currentAttempts: currentAttempts,
uniqueID: uniqueID,
currentAttempts: currentAttempts,
uniqueID: uniqueID,
isInDynamicChain: isInDynamicChain,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,22 @@ func TestExecutionContext(t *testing.T) {

func TestParentExecutionInfo_GetUniqueID(t *testing.T) {
expectedID := "testID"
parentInfo := NewParentInfo(expectedID, 1)
parentInfo := NewParentInfo(expectedID, 1, false)
assert.Equal(t, expectedID, parentInfo.GetUniqueID())
}

func TestParentExecutionInfo_CurrentAttempt(t *testing.T) {
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo("testID", expectedAttempt)
parentInfo := NewParentInfo("testID", expectedAttempt, false)
assert.Equal(t, expectedAttempt, parentInfo.CurrentAttempt())
}

func TestParentExecutionInfo_DynamicChain(t *testing.T) {
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo("testID", expectedAttempt, true)
assert.True(t, parentInfo.IsInDynamicChain())
}

func TestControlFlow_ControlFlowParallelism(t *testing.T) {
cFlow := InitializeControlFlow().(*controlFlow)
assert.Equal(t, uint32(0), cFlow.CurrentParallelism())
Expand All @@ -88,7 +94,7 @@ func TestControlFlow_ControlFlowParallelism(t *testing.T) {
func TestNewParentInfo(t *testing.T) {
expectedID := "testID"
expectedAttempt := uint32(123465)
parentInfo := NewParentInfo(expectedID, expectedAttempt).(*parentExecutionInfo)
parentInfo := NewParentInfo(expectedID, expectedAttempt, false).(*parentExecutionInfo)
assert.Equal(t, expectedID, parentInfo.uniqueID)
assert.Equal(t, expectedAttempt, parentInfo.currentAttempts)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index
timestamp := ptypes.TimestampNow()
workflowExecutionID := nCtx.ExecutionContext().GetExecutionID().WorkflowExecutionIdentifier

// send NodeExecutionEvent
// Extract dynamic chain information.
var dynamic = false
if nCtx.ExecutionContext() != nil && nCtx.ExecutionContext().GetParentInfo() != nil && nCtx.ExecutionContext().GetParentInfo().IsInDynamicChain() {
dynamic = true
}
nodeExecutionEvent := &event.NodeExecutionEvent{
Id: &idlcore.NodeExecutionIdentifier{
NodeId: subNodeID,
Expand All @@ -231,14 +235,15 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index
ParentNodeMetadata: &event.ParentNodeExecutionMetadata{
NodeId: nCtx.NodeID(),
},
ReportedAt: timestamp,
ReportedAt: timestamp,
IsInDynamicChain: dynamic,
}

if err := eventRecorder.RecordNodeEvent(ctx, nodeExecutionEvent, eventConfig); err != nil {
return err
}

// send TaskExeucutionEvent
// send TaskExecutionEvent
taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut
} else {
// record events transitioning subNodes to aborted
retryAttempt := uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i))

if err := sendEvents(ctx, nCtx, i, retryAttempt, idlcore.NodeExecution_ABORTED, idlcore.TaskExecution_ABORTED, eventRecorder, a.eventConfig); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
Expand Down Expand Up @@ -707,7 +708,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
// initialize mocks
arrayNodeLookup := newArrayNodeLookup(nCtx.ContextualNodeLookup(), subNodeID, &subNodeSpec, subNodeStatus)

newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false)
if err != nil {
return nil, nil, nil, nil, nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecutio
}

func (b *branchHandler) getExecutionContextForDownstream(nCtx interfaces.NodeExecutionContext) (executors.ExecutionContext, error) {
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit af4f800

Please sign in to comment.