From af4f80025552d4faadc512e87ea808ff8e754c2c Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 2 Jul 2024 11:42:54 -0700 Subject: [PATCH] Upstream/node event update (#5528) Signed-off-by: Yee Hing Tong Signed-off-by: Vladyslav Libov --- flyteidl/clients/go/assets/admin.swagger.json | 12 +++-- flyteidl/gen/pb-es/flyteidl/event/event_pb.ts | 2 - flyteidl/gen/pb-go/flyteidl/event/event.pb.go | 2 - .../flyteidl/service/admin.swagger.json | 4 +- flyteidl/gen/pb_rust/flyteidl.event.rs | 2 - flyteidl/protos/flyteidl/event/event.proto | 2 - .../pkg/apis/flyteworkflow/v1alpha1/nodes.go | 13 +++-- .../apis/flyteworkflow/v1alpha1/nodes_test.go | 51 +++++++++++++++++++ .../flyteworkflow/v1alpha1/subworkflow.go | 10 +++- .../controller/executors/execution_context.go | 17 +++++-- .../executors/execution_context_test.go | 12 +++-- .../executors/mocks/immutable_parent_info.go | 32 ++++++++++++ .../controller/nodes/array/event_recorder.go | 11 ++-- .../pkg/controller/nodes/array/handler.go | 3 +- .../pkg/controller/nodes/branch/handler.go | 2 +- .../controller/nodes/branch/handler_test.go | 8 ++- .../pkg/controller/nodes/common/utils.go | 35 ++++++++++++- .../pkg/controller/nodes/common/utils_test.go | 20 +++++--- .../nodes/dynamic/dynamic_workflow.go | 4 +- .../pkg/controller/nodes/executor.go | 28 ++++++++-- .../pkg/controller/nodes/executor_test.go | 40 +++++++++++++-- .../nodes/node_exec_context_test.go | 48 +++++++++++++++++ .../nodes/subworkflow/subworkflow.go | 2 +- .../nodes/subworkflow/subworkflow_test.go | 4 +- .../pkg/controller/nodes/transformers.go | 36 +++++++++---- .../pkg/controller/nodes/transformers_test.go | 9 ++-- flytestdlib/storage/stow_store.go | 2 +- 27 files changed, 341 insertions(+), 70 deletions(-) create mode 100644 flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go diff --git a/flyteidl/clients/go/assets/admin.swagger.json b/flyteidl/clients/go/assets/admin.swagger.json index 351cc67cd6..4854a4e895 100644 --- a/flyteidl/clients/go/assets/admin.swagger.json +++ b/flyteidl/clients/go/assets/admin.swagger.json @@ -7076,9 +7076,9 @@ "coreExecutionEnv": { "type": "object", "properties": { - "id": { + "name": { "type": "string", - "description": "id is a unique identifier for the execution environment." + "description": "name is a human-readable identifier for the execution environment. This is combined with the\nproject, domain, and version to uniquely identify an execution environment." }, "type": { "type": "string", @@ -7091,6 +7091,10 @@ "spec": { "type": "object", "description": "spec is a specification of the environment." + }, + "version": { + "type": "string", + "description": "version is the version of the execution environment. This may be used differently by each\nindividual environment type (ex. auto-generated or manually provided), but is intended to\nallow variance in environment specifications with the same ID." } }, "description": "ExecutionEnv is a message that is used to specify the execution environment." @@ -8707,11 +8711,11 @@ }, "target_entity": { "$ref": "#/definitions/coreIdentifier", - "description": "Holding this field here for now, this will be upstreamed soon.\nSo that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this\nin optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding\nto this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be\nnested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow." + "description": "So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this\nin optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding\nto this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be\nnested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow." }, "is_in_dynamic_chain": { "type": "boolean", - "description": "Holding this field here for now, this will be upstreamed soon.\nTasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of\nthe tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not\neven registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea\nif the relevant execution entity is was registered, or dynamic. This field indicates that the target_entity ID,\nas well as task IDs in any corresponding Task Executions, should not be used to looked up the task in Admin's db." + "description": "Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of\nthe tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not\neven registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea\nif the relevant execution entity is was registered, or dynamic. This field indicates that the target_entity ID,\nas well as task IDs in any corresponding Task Executions, should not be used to looked up the task in Admin's db." } } }, diff --git a/flyteidl/gen/pb-es/flyteidl/event/event_pb.ts b/flyteidl/gen/pb-es/flyteidl/event/event_pb.ts index 62572d87c5..9e5fd39c1d 100644 --- a/flyteidl/gen/pb-es/flyteidl/event/event_pb.ts +++ b/flyteidl/gen/pb-es/flyteidl/event/event_pb.ts @@ -287,7 +287,6 @@ export class NodeExecutionEvent extends Message { isArray = false; /** - * Holding this field here for now, this will be upstreamed soon. * So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this * in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding * to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be @@ -298,7 +297,6 @@ export class NodeExecutionEvent extends Message { targetEntity?: Identifier; /** - * Holding this field here for now, this will be upstreamed soon. * Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of * the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not * even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea diff --git a/flyteidl/gen/pb-go/flyteidl/event/event.pb.go b/flyteidl/gen/pb-go/flyteidl/event/event.pb.go index c9ea7d10f2..963ed02ff6 100644 --- a/flyteidl/gen/pb-go/flyteidl/event/event.pb.go +++ b/flyteidl/gen/pb-go/flyteidl/event/event.pb.go @@ -265,13 +265,11 @@ type NodeExecutionEvent struct { ReportedAt *timestamppb.Timestamp `protobuf:"bytes,21,opt,name=reported_at,json=reportedAt,proto3" json:"reported_at,omitempty"` // Indicates if this node is an ArrayNode. IsArray bool `protobuf:"varint,22,opt,name=is_array,json=isArray,proto3" json:"is_array,omitempty"` - // Holding this field here for now, this will be upstreamed soon. // So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this // in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding // to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be // nested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow. TargetEntity *core.Identifier `protobuf:"bytes,23,opt,name=target_entity,json=targetEntity,proto3" json:"target_entity,omitempty"` - // Holding this field here for now, this will be upstreamed soon. // Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of // the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not // even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea diff --git a/flyteidl/gen/pb-go/gateway/flyteidl/service/admin.swagger.json b/flyteidl/gen/pb-go/gateway/flyteidl/service/admin.swagger.json index c959a8d766..4854a4e895 100644 --- a/flyteidl/gen/pb-go/gateway/flyteidl/service/admin.swagger.json +++ b/flyteidl/gen/pb-go/gateway/flyteidl/service/admin.swagger.json @@ -8711,11 +8711,11 @@ }, "target_entity": { "$ref": "#/definitions/coreIdentifier", - "description": "Holding this field here for now, this will be upstreamed soon.\nSo that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this\nin optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding\nto this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be\nnested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow." + "description": "So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this\nin optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding\nto this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be\nnested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow." }, "is_in_dynamic_chain": { "type": "boolean", - "description": "Holding this field here for now, this will be upstreamed soon.\nTasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of\nthe tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not\neven registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea\nif the relevant execution entity is was registered, or dynamic. This field indicates that the target_entity ID,\nas well as task IDs in any corresponding Task Executions, should not be used to looked up the task in Admin's db." + "description": "Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of\nthe tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not\neven registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea\nif the relevant execution entity is was registered, or dynamic. This field indicates that the target_entity ID,\nas well as task IDs in any corresponding Task Executions, should not be used to looked up the task in Admin's db." } } }, diff --git a/flyteidl/gen/pb_rust/flyteidl.event.rs b/flyteidl/gen/pb_rust/flyteidl.event.rs index 4afbce8630..281ee07daa 100644 --- a/flyteidl/gen/pb_rust/flyteidl.event.rs +++ b/flyteidl/gen/pb_rust/flyteidl.event.rs @@ -86,14 +86,12 @@ pub struct NodeExecutionEvent { /// Indicates if this node is an ArrayNode. #[prost(bool, tag="22")] pub is_array: bool, - /// Holding this field here for now, this will be upstreamed soon. /// So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this /// in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding /// to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be /// nested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow. #[prost(message, optional, tag="23")] pub target_entity: ::core::option::Option, - /// Holding this field here for now, this will be upstreamed soon. /// Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of /// the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not /// even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea diff --git a/flyteidl/protos/flyteidl/event/event.proto b/flyteidl/protos/flyteidl/event/event.proto index d07356493f..640b4804e9 100644 --- a/flyteidl/protos/flyteidl/event/event.proto +++ b/flyteidl/protos/flyteidl/event/event.proto @@ -115,14 +115,12 @@ message NodeExecutionEvent { // Indicates if this node is an ArrayNode. bool is_array = 22; - // Holding this field here for now, this will be upstreamed soon. // So that Admin doesn't have to rebuild the node execution graph to find the target entity, propeller will fill this // in optionally - currently this is only filled in for subworkflows. This is the ID of the subworkflow corresponding // to this node execution. It is difficult to find because Admin only sees one node at a time. A subworkflow could be // nested multiple layers deep, and you'd need to access the correct workflow template to know the target subworkflow. core.Identifier target_entity = 23; - // Holding this field here for now, this will be upstreamed soon. // Tasks and subworkflows (but not launch plans) that are run within a dynamic task are effectively independent of // the tasks that are registered in Admin's db. Confusingly, they are often identical, but sometimes they are not // even registered at all. Similar to the target_entity field, at the time Admin receives this event, it has no idea diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go index 6554357031..2e4514638d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes.go @@ -224,11 +224,18 @@ func (in *NodeSpec) GetOutputAlias() []Alias { return in.OutputAliases } +// In functions below, explicitly strip out nil type information because NodeSpec's WorkflowNode is a struct type, +// not interface and downstream nil checks will not pass. +// See the test in TestPointersForNodeSpec for more information. + func (in *NodeSpec) GetWorkflowNode() ExecutableWorkflowNode { - if in.WorkflowNode == nil { - return nil + if in != nil { + if in.WorkflowNode == nil { + return nil + } + return in.WorkflowNode } - return in.WorkflowNode + return nil } func (in *NodeSpec) GetBranchNode() ExecutableBranchNode { diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go new file mode 100644 index 0000000000..f36d874241 --- /dev/null +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/nodes_test.go @@ -0,0 +1,51 @@ +package v1alpha1 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type CanDo interface { + MyDo() int +} + +type Concrete struct { + Doer CanDo +} + +func (c *Concrete) MyDo() int { + return 1 +} + +type Parent struct { + Concrete *Concrete +} + +func (p *Parent) GetDoer() CanDo { + return p.Concrete +} + +func (p *Parent) GetConcreteDoer() *Concrete { + return p.Concrete +} + +func TestPointersForNodeSpec(t *testing.T) { + p := &Parent{ + Concrete: nil, + } + // GetDoer returns a fake nil because it carries type information + // assert.NotNil(t, p.GetDoer()) funnily enough doesn't work, so use a regular if statement + if p.GetDoer() == nil { + assert.Fail(t, "GetDoer") + } + + assert.Nil(t, p.GetConcreteDoer()) +} + +func TestNodeSpec(t *testing.T) { + n := &NodeSpec{ + WorkflowNode: nil, + } + assert.Nil(t, n.GetWorkflowNode()) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go index a7d7532b97..4f1b95d399 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/subworkflow.go @@ -15,9 +15,15 @@ type WorkflowNodeSpec struct { } func (in *WorkflowNodeSpec) GetLaunchPlanRefID() *LaunchPlanRefID { - return in.LaunchPlanRefID + if in != nil { + return in.LaunchPlanRefID + } + return nil } func (in *WorkflowNodeSpec) GetSubWorkflowRef() *WorkflowID { - return in.SubWorkflowReference + if in != nil { + return in.SubWorkflowReference + } + return nil } diff --git a/flytepropeller/pkg/controller/executors/execution_context.go b/flytepropeller/pkg/controller/executors/execution_context.go index 07b5a6c450..b693d1aee8 100644 --- a/flytepropeller/pkg/controller/executors/execution_context.go +++ b/flytepropeller/pkg/controller/executors/execution_context.go @@ -28,6 +28,7 @@ type ParentInfoGetter interface { type ImmutableParentInfo interface { GetUniqueID() v1alpha1.NodeID CurrentAttempt() uint32 + IsInDynamicChain() bool } type ControlFlow interface { @@ -60,14 +61,19 @@ func (e execContext) GetParentInfo() ImmutableParentInfo { } type parentExecutionInfo struct { - uniqueID v1alpha1.NodeID - currentAttempts uint32 + uniqueID v1alpha1.NodeID + currentAttempts uint32 + isInDynamicChain bool } func (p *parentExecutionInfo) GetUniqueID() v1alpha1.NodeID { return p.uniqueID } +func (p *parentExecutionInfo) IsInDynamicChain() bool { + return p.isInDynamicChain +} + func (p *parentExecutionInfo) CurrentAttempt() uint32 { return p.currentAttempts } @@ -129,10 +135,11 @@ func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter T } } -func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo { +func NewParentInfo(uniqueID string, currentAttempts uint32, isInDynamicChain bool) ImmutableParentInfo { return &parentExecutionInfo{ - currentAttempts: currentAttempts, - uniqueID: uniqueID, + currentAttempts: currentAttempts, + uniqueID: uniqueID, + isInDynamicChain: isInDynamicChain, } } diff --git a/flytepropeller/pkg/controller/executors/execution_context_test.go b/flytepropeller/pkg/controller/executors/execution_context_test.go index 6a1561ea7c..3e82b88841 100644 --- a/flytepropeller/pkg/controller/executors/execution_context_test.go +++ b/flytepropeller/pkg/controller/executors/execution_context_test.go @@ -66,16 +66,22 @@ func TestExecutionContext(t *testing.T) { func TestParentExecutionInfo_GetUniqueID(t *testing.T) { expectedID := "testID" - parentInfo := NewParentInfo(expectedID, 1) + parentInfo := NewParentInfo(expectedID, 1, false) assert.Equal(t, expectedID, parentInfo.GetUniqueID()) } func TestParentExecutionInfo_CurrentAttempt(t *testing.T) { expectedAttempt := uint32(123465) - parentInfo := NewParentInfo("testID", expectedAttempt) + parentInfo := NewParentInfo("testID", expectedAttempt, false) assert.Equal(t, expectedAttempt, parentInfo.CurrentAttempt()) } +func TestParentExecutionInfo_DynamicChain(t *testing.T) { + expectedAttempt := uint32(123465) + parentInfo := NewParentInfo("testID", expectedAttempt, true) + assert.True(t, parentInfo.IsInDynamicChain()) +} + func TestControlFlow_ControlFlowParallelism(t *testing.T) { cFlow := InitializeControlFlow().(*controlFlow) assert.Equal(t, uint32(0), cFlow.CurrentParallelism()) @@ -88,7 +94,7 @@ func TestControlFlow_ControlFlowParallelism(t *testing.T) { func TestNewParentInfo(t *testing.T) { expectedID := "testID" expectedAttempt := uint32(123465) - parentInfo := NewParentInfo(expectedID, expectedAttempt).(*parentExecutionInfo) + parentInfo := NewParentInfo(expectedID, expectedAttempt, false).(*parentExecutionInfo) assert.Equal(t, expectedID, parentInfo.uniqueID) assert.Equal(t, expectedAttempt, parentInfo.currentAttempts) } diff --git a/flytepropeller/pkg/controller/executors/mocks/immutable_parent_info.go b/flytepropeller/pkg/controller/executors/mocks/immutable_parent_info.go index 209a0ee10f..a65f619e46 100644 --- a/flytepropeller/pkg/controller/executors/mocks/immutable_parent_info.go +++ b/flytepropeller/pkg/controller/executors/mocks/immutable_parent_info.go @@ -72,3 +72,35 @@ func (_m *ImmutableParentInfo) GetUniqueID() string { return r0 } + +type ImmutableParentInfo_IsInDynamicChain struct { + *mock.Call +} + +func (_m ImmutableParentInfo_IsInDynamicChain) Return(_a0 bool) *ImmutableParentInfo_IsInDynamicChain { + return &ImmutableParentInfo_IsInDynamicChain{Call: _m.Call.Return(_a0)} +} + +func (_m *ImmutableParentInfo) OnIsInDynamicChain() *ImmutableParentInfo_IsInDynamicChain { + c_call := _m.On("IsInDynamicChain") + return &ImmutableParentInfo_IsInDynamicChain{Call: c_call} +} + +func (_m *ImmutableParentInfo) OnIsInDynamicChainMatch(matchers ...interface{}) *ImmutableParentInfo_IsInDynamicChain { + c_call := _m.On("IsInDynamicChain", matchers...) + return &ImmutableParentInfo_IsInDynamicChain{Call: c_call} +} + +// IsInDynamicChain provides a mock function with given fields: +func (_m *ImmutableParentInfo) IsInDynamicChain() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index 35120c069e..ac1ad3e39f 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -220,7 +220,11 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index timestamp := ptypes.TimestampNow() workflowExecutionID := nCtx.ExecutionContext().GetExecutionID().WorkflowExecutionIdentifier - // send NodeExecutionEvent + // Extract dynamic chain information. + var dynamic = false + if nCtx.ExecutionContext() != nil && nCtx.ExecutionContext().GetParentInfo() != nil && nCtx.ExecutionContext().GetParentInfo().IsInDynamicChain() { + dynamic = true + } nodeExecutionEvent := &event.NodeExecutionEvent{ Id: &idlcore.NodeExecutionIdentifier{ NodeId: subNodeID, @@ -231,14 +235,15 @@ func sendEvents(ctx context.Context, nCtx interfaces.NodeExecutionContext, index ParentNodeMetadata: &event.ParentNodeExecutionMetadata{ NodeId: nCtx.NodeID(), }, - ReportedAt: timestamp, + ReportedAt: timestamp, + IsInDynamicChain: dynamic, } if err := eventRecorder.RecordNodeEvent(ctx, nodeExecutionEvent, eventConfig); err != nil { return err } - // send TaskExeucutionEvent + // send TaskExecutionEvent taskExecutionEvent := &event.TaskExecutionEvent{ TaskId: &idlcore.Identifier{ ResourceType: idlcore.ResourceType_TASK, diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0f9e95f19b..5e571a0c48 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -94,6 +94,7 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut } else { // record events transitioning subNodes to aborted retryAttempt := uint32(arrayNodeState.SubNodeRetryAttempts.GetItem(i)) + if err := sendEvents(ctx, nCtx, i, retryAttempt, idlcore.NodeExecution_ABORTED, idlcore.TaskExecution_ABORTED, eventRecorder, a.eventConfig); err != nil { logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) } @@ -707,7 +708,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter // initialize mocks arrayNodeLookup := newArrayNodeLookup(nCtx.ContextualNodeLookup(), subNodeID, &subNodeSpec, subNodeStatus) - newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false) if err != nil { return nil, nil, nil, nil, nil, nil, err } diff --git a/flytepropeller/pkg/controller/nodes/branch/handler.go b/flytepropeller/pkg/controller/nodes/branch/handler.go index d2a4fcfa68..431f5fa3eb 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler.go @@ -116,7 +116,7 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecutio } func (b *branchHandler) getExecutionContextForDownstream(nCtx interfaces.NodeExecutionContext) (executors.ExecutionContext, error) { - newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/controller/nodes/branch/handler_test.go b/flytepropeller/pkg/controller/nodes/branch/handler_test.go index b26dd004c4..f7f2836018 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler_test.go @@ -75,6 +75,10 @@ func (parentInfo) CurrentAttempt() uint32 { return uint32(2) } +func (parentInfo) IsInDynamicChain() bool { + return false +} + func createNodeContext(phase v1alpha1.BranchNodePhase, childNodeID *v1alpha1.NodeID, n v1alpha1.ExecutableNode, inputs *core.LiteralMap, nl *execMocks.NodeLookup, eCtx executors.ExecutionContext) (*mocks.NodeExecutionContext, *branchNodeStateHolder) { branchNodeState := handler.BranchNodeState{ @@ -191,7 +195,7 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) { } nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, mockNodeLookup, eCtx) - newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt(), false) expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) mockNodeExecutor := &mocks.Node{} mockNodeExecutor.OnRecursiveNodeHandlerMatch( @@ -319,7 +323,7 @@ func TestBranchHandler_AbortNode(t *testing.T) { eCtx := &execMocks.ExecutionContext{} eCtx.EXPECT().GetParentInfo().Return(parentInfo{}) nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, mockNodeLookup, eCtx) - newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt(), false) expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) mockNodeExecutor.OnAbortHandlerMatch(mock.Anything, mock.MatchedBy(func(e executors.ExecutionContext) bool { return assert.Equal(t, e, expectedExecContext) }), diff --git a/flytepropeller/pkg/controller/nodes/common/utils.go b/flytepropeller/pkg/controller/nodes/common/utils.go index 19714da3d5..04ddc5183d 100644 --- a/flytepropeller/pkg/controller/nodes/common/utils.go +++ b/flytepropeller/pkg/controller/nodes/common/utils.go @@ -1,11 +1,15 @@ package common import ( + "context" "strconv" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" + "github.com/flyteorg/flyte/flytestdlib/logger" ) const maxUniqueIDLength = 20 @@ -28,11 +32,38 @@ func GenerateUniqueID(parentInfo executors.ImmutableParentInfo, nodeID string) ( // CreateParentInfo creates a unique parent id, the unique id of parent is dependent on the unique id and the current // attempt of the grandparent to track the lineage. -func CreateParentInfo(grandParentInfo executors.ImmutableParentInfo, nodeID string, parentAttempt uint32) (executors.ImmutableParentInfo, error) { +func CreateParentInfo(grandParentInfo executors.ImmutableParentInfo, nodeID string, parentAttempt uint32, nodeIsDynamic bool) (executors.ImmutableParentInfo, error) { uniqueID, err := GenerateUniqueID(grandParentInfo, nodeID) if err != nil { return nil, err } - return executors.NewParentInfo(uniqueID, parentAttempt), nil + if nodeIsDynamic || (grandParentInfo != nil && grandParentInfo.IsInDynamicChain()) { + return executors.NewParentInfo(uniqueID, parentAttempt, true), nil + } + + return executors.NewParentInfo(uniqueID, parentAttempt, false), nil +} +func GetTargetEntity(ctx context.Context, nCtx interfaces.NodeExecutionContext) *core.Identifier { + var targetEntity *core.Identifier + if nCtx.Node().GetWorkflowNode() != nil { + subRef := nCtx.Node().GetWorkflowNode().GetSubWorkflowRef() + if subRef != nil && len(*subRef) > 0 { + // todo: uncomment this if Support caching subworkflows and launchplans (v2) is upstreamed + // for now, we can leave it empty + //nCtx.ExecutionContext().FindSubWorkflow(*subRef) + //targetEntity = subWorkflow.GetIdentifier() + } else if nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID() != nil { + lpRef := nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID() + targetEntity = lpRef.Identifier + } + } else if taskIDStr := nCtx.Node().GetTaskID(); taskIDStr != nil && len(*taskIDStr) > 0 { + taskID, err := nCtx.ExecutionContext().GetTask(*taskIDStr) + if err != nil { + // This doesn't feed a very important part of the node execution event, swallow it for now. + logger.Errorf(ctx, "Failed to get task [%v] with error [%v]", taskID, err) + } + targetEntity = taskID.CoreTask().Id + } + return targetEntity } diff --git a/flytepropeller/pkg/controller/nodes/common/utils_test.go b/flytepropeller/pkg/controller/nodes/common/utils_test.go index 0639605589..9e451da69a 100644 --- a/flytepropeller/pkg/controller/nodes/common/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/common/utils_test.go @@ -9,8 +9,9 @@ import ( ) type ParentInfo struct { - uniqueID string - attempt uint32 + uniqueID string + attempt uint32 + isInDynamicChain bool } func (p ParentInfo) GetUniqueID() v1alpha1.NodeID { @@ -21,6 +22,10 @@ func (p ParentInfo) CurrentAttempt() uint32 { return p.attempt } +func (p ParentInfo) IsInDynamicChain() bool { + return p.isInDynamicChain +} + func TestGenerateUniqueID(t *testing.T) { p := ParentInfo{ uniqueID: "u1", @@ -43,18 +48,21 @@ func TestGenerateUniqueIDLong(t *testing.T) { func TestCreateParentInfo(t *testing.T) { gp := ParentInfo{ - uniqueID: "u1", - attempt: uint32(2), + uniqueID: "u1", + attempt: uint32(2), + isInDynamicChain: true, } - parent, err := CreateParentInfo(gp, "n1", uint32(1)) + parent, err := CreateParentInfo(gp, "n1", uint32(1), false) assert.Nil(t, err) assert.Equal(t, "u1-2-n1", parent.GetUniqueID()) assert.Equal(t, uint32(1), parent.CurrentAttempt()) + assert.True(t, parent.IsInDynamicChain()) } func TestCreateParentInfoNil(t *testing.T) { - parent, err := CreateParentInfo(nil, "n1", uint32(1)) + parent, err := CreateParentInfo(nil, "n1", uint32(1), true) assert.Nil(t, err) assert.Equal(t, "n1", parent.GetUniqueID()) assert.Equal(t, uint32(1), parent.CurrentAttempt()) + assert.True(t, parent.IsInDynamicChain()) } diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go index cbc5414e11..c53827328e 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -169,7 +169,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to set ephemeral node execution attributions") } - newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), true) if err != nil { return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") } @@ -207,7 +207,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C // The current node would end up becoming the parent for the dynamic task nodes. // This is done to track the lineage. For level zero, the CreateParentInfo will return nil - newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, err := node_common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), true) if err != nil { return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to generate uniqueID") } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 1c42357623..47c91edc51 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -900,6 +900,12 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx nodeExecutionID.NodeId = currentNodeUniqueID } + var dynamic = false + if nCtx.ExecutionContext().GetParentInfo() != nil && nCtx.ExecutionContext().GetParentInfo().IsInDynamicChain() { + dynamic = true + } + + targetEntity := common.GetTargetEntity(ctx, nCtx) err := nCtx.EventsRecorder().RecordNodeEvent(ctx, &event.NodeExecutionEvent{ Id: nodeExecutionID, Phase: core.NodeExecution_ABORTED, @@ -910,8 +916,10 @@ func (c *nodeExecutor) Abort(ctx context.Context, h interfaces.NodeHandler, nCtx Message: reason, }, }, - ProducerId: c.clusterID, - ReportedAt: ptypes.TimestampNow(), + ProducerId: c.clusterID, + ReportedAt: ptypes.TimestampNow(), + IsInDynamicChain: dynamic, + TargetEntity: targetEntity, }, c.eventConfig) if err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { if errors2.IsCausedBy(err, errors.IllegalStateError) { @@ -1003,10 +1011,12 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) p = p.WithOccuredAt(occurredAt) + targetEntity := common.GetTargetEntity(ctx, nCtx) + nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(), nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase, - c.eventConfig) + c.eventConfig, targetEntity) if err != nil { return interfaces.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event") } @@ -1231,10 +1241,12 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter // assert np == skipped, succeeding, failing or recovered logger.Infof(ctx, "Change in node state detected from [%s] -> [%s], (handler phase [%s])", nodeStatus.GetPhase().String(), np.String(), p.GetPhase().String()) + targetEntity := common.GetTargetEntity(ctx, nCtx) + nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(), nCtx.ExecutionContext().GetParentInfo(), nCtx.Node(), c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase, - c.eventConfig) + c.eventConfig, targetEntity) if err != nil { return interfaces.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event") } @@ -1251,6 +1263,10 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter np = v1alpha1.NodePhaseFailing p = handler.PhaseInfoFailure(core.ExecutionError_USER, "NodeFailed", err.Error(), p.GetInfo()) + var dynamic = false + if nCtx.ExecutionContext().GetParentInfo() != nil && nCtx.ExecutionContext().GetParentInfo().IsInDynamicChain() { + dynamic = true + } err = nCtx.EventsRecorder().RecordNodeEvent(ctx, &event.NodeExecutionEvent{ Id: nCtx.NodeExecutionMetadata().GetNodeExecutionID(), Phase: core.NodeExecution_FAILED, @@ -1261,7 +1277,9 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter Message: err.Error(), }, }, - ReportedAt: ptypes.TimestampNow(), + ReportedAt: ptypes.TimestampNow(), + IsInDynamicChain: dynamic, + TargetEntity: targetEntity, }, c.eventConfig) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 194458d2b8..783821c9ab 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -602,6 +602,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockNode.OnGetInputBindings().Return([]*v1alpha1.Binding{}) mockNode.OnIsInterruptible().Return(nil) mockNode.OnGetName().Return("name") + mockNode.OnGetWorkflowNode().Return(nil) mockNodeN0 := &mocks.ExecutableNode{} mockNodeN0.OnGetID().Return(nodeN0) @@ -612,6 +613,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockNodeN0.OnGetTaskID().Return(&taskID0) mockNodeN0.OnIsInterruptible().Return(nil) mockNodeN0.OnGetName().Return("name") + mockNodeN0.OnGetWorkflowNode().Return(nil) mockN0Status := &mocks.ExecutableNodeStatus{} mockN0Status.OnGetPhase().Return(n0Phase) @@ -1323,6 +1325,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { branchTakenNode.OnIsStartNode().Return(false) branchTakenNode.OnIsEndNode().Return(false) branchTakenNode.OnGetInputBindings().Return(nil) + branchTakenNode.OnGetWorkflowNode().Return(nil) branchTakeNodeStatus := &mocks.ExecutableNodeStatus{} branchTakeNodeStatus.OnGetPhase().Return(test.currentNodePhase) branchTakeNodeStatus.OnIsDirty().Return(false) @@ -1647,6 +1650,7 @@ func TestNodeExecutor_AbortHandler(t *testing.T) { n.OnGetID().Return(id) n.OnGetKind().Return(v1alpha1.NodeKindStart) n.OnGetTaskID().Return(&id) + n.OnGetWorkflowNode().Return(nil) interruptible := false n.OnIsInterruptible().Return(&interruptible) nl := &mocks4.NodeLookup{} @@ -1683,6 +1687,23 @@ func TestNodeExecutor_AbortHandler(t *testing.T) { execContext.EXPECT().GetLabels().Return(nil) execContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0) + et := &mocks.ExecutableTask{} + et.OnCoreTask().Return(&core.TaskTemplate{ + Id: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "p", + Domain: "d", + Name: "fake_task_name", + Version: "v", + }, + }) + execContext.EXPECT().GetTask("id").Return(et, nil) + parentInfo := &mocks4.ImmutableParentInfo{} + parentInfo.OnGetUniqueID().Return("someunique1") + parentInfo.OnCurrentAttempt().Return(uint32(1)) + parentInfo.OnIsInDynamicChain().Return(false) + execContext.EXPECT().GetParentInfo().Return(parentInfo) + assert.NoError(t, nExec.AbortHandler(ctx, &execContext, &dag, nl, n, "aborting")) }) } @@ -1715,12 +1736,20 @@ func TestNodeExecutionEventStartNode(t *testing.T) { tID := &core.TaskExecutionIdentifier{ NodeExecutionId: nID, } + subWfID := &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + Project: "p", + Domain: "dev", + Name: "name", + Version: "123", + } p := handler.PhaseInfoQueued("r", &core.LiteralMap{}) inputReader := &mocks3.InputReader{} inputReader.OnGetInputPath().Return("reference") parentInfo := &mocks4.ImmutableParentInfo{} parentInfo.OnGetUniqueID().Return("np1") parentInfo.OnCurrentAttempt().Return(uint32(2)) + parentInfo.OnIsInDynamicChain().Return(false) id := "id" n := &mocks.ExecutableNode{} @@ -1736,7 +1765,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) { ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyReference, - }) + }, subWfID) assert.NoError(t, err) assert.Equal(t, "start-node", ev.Id.NodeId) assert.Equal(t, execID, ev.Id.ExecutionId) @@ -1748,6 +1777,7 @@ func TestNodeExecutionEventStartNode(t *testing.T) { assert.Equal(t, "dummy://dummyOutUrl/outputs.pb", ev.OutputResult.(*event.NodeExecutionEvent_OutputUri).OutputUri) assert.Equal(t, ev.ProducerId, testClusterID) + assert.Equal(t, subWfID, ev.GetTargetEntity()) } func TestNodeExecutionEventV0(t *testing.T) { @@ -1767,6 +1797,7 @@ func TestNodeExecutionEventV0(t *testing.T) { parentInfo := &mocks4.ImmutableParentInfo{} parentInfo.OnGetUniqueID().Return("np1") parentInfo.OnCurrentAttempt().Return(uint32(2)) + parentInfo.OnIsInDynamicChain().Return(false).Twice() id := "id" n := &mocks.ExecutableNode{} @@ -1780,7 +1811,7 @@ func TestNodeExecutionEventV0(t *testing.T) { ns.OnGetParentTaskID().Return(tID) ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyReference, - }) + }, nil) assert.NoError(t, err) assert.Equal(t, "n1", ev.Id.NodeId) assert.Equal(t, execID, ev.Id.ExecutionId) @@ -1789,6 +1820,7 @@ func TestNodeExecutionEventV0(t *testing.T) { assert.Equal(t, tID, ev.ParentTaskMetadata.Id) assert.Empty(t, ev.NodeName) assert.Empty(t, ev.RetryGroup) + assert.Empty(t, ev.TargetEntity) } func TestNodeExecutionEventV1(t *testing.T) { @@ -1815,6 +1847,7 @@ func TestNodeExecutionEventV1(t *testing.T) { parentInfo := &mocks4.ImmutableParentInfo{} parentInfo.OnGetUniqueID().Return("np1") parentInfo.OnCurrentAttempt().Return(uint32(2)) + parentInfo.OnIsInDynamicChain().Return(false) id := "id" n := &mocks.ExecutableNode{} @@ -1828,7 +1861,7 @@ func TestNodeExecutionEventV1(t *testing.T) { ns.OnGetParentTaskID().Return(tID) eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyInline, - }) + }, nil) assert.NoError(t, err) assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId) assert.Equal(t, execID, eventOpt.Id.ExecutionId) @@ -1841,6 +1874,7 @@ func TestNodeExecutionEventV1(t *testing.T) { assert.Equal(t, "name", eventOpt.NodeName) assert.Equal(t, "2", eventOpt.RetryGroup) assert.True(t, proto.Equal(eventOpt.GetInputData(), inputs)) + assert.Empty(t, eventOpt.TargetEntity) } func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go index 10e12d35e4..4614d0f035 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go @@ -19,6 +19,8 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors" mocks2 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common" + mocks3 "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces/mocks" "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" @@ -142,6 +144,52 @@ func Test_NodeContextDefault(t *testing.T) { nodeExecContext, err = nodeExecutor.BuildNodeExecutionContext(context.Background(), execContext, nodeLookup, "node-a") assert.NoError(t, err) assert.Equal(t, "s3://bucket-b", nodeExecContext.RawOutputPrefix().String()) + + // Test that retrieving task nodes + taskIdentifier := common.GetTargetEntity(ctx, nodeExecContext) + assert.Equal(t, w1.Tasks["taskID"].TaskTemplate.Id.Project, taskIdentifier.Project) + assert.Equal(t, w1.Tasks["taskID"].TaskTemplate.Id.Domain, taskIdentifier.Domain) + assert.Equal(t, w1.Tasks["taskID"].TaskTemplate.Id.Name, taskIdentifier.Name) + assert.Equal(t, w1.Tasks["taskID"].TaskTemplate.Id.Version, taskIdentifier.Version) +} + +func TestGetTargetEntity_LaunchPlanNode(t *testing.T) { + id := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Project: "proj", + Domain: "domain", + Name: "sub-lp", + Version: "v2", + } + + subWfNode := &mocks.ExecutableWorkflowNode{} + subWfNode.OnGetSubWorkflowRef().Return(nil) + subWfNode.OnGetLaunchPlanRefID().Return(&v1alpha1.LaunchPlanRefID{Identifier: id}) + + n := &mocks.ExecutableNode{} + n.OnGetWorkflowNode().Return(subWfNode) + + nCtx := &mocks3.NodeExecutionContext{} + nCtx.OnNode().Return(n) + + fetchedID := common.GetTargetEntity(context.Background(), nCtx) + assert.Equal(t, id.Project, fetchedID.Project) + assert.Equal(t, id.Domain, fetchedID.Domain) + assert.Equal(t, id.Name, fetchedID.Name) + assert.Equal(t, id.Version, fetchedID.Version) +} + +func TestGetTargetEntity_EmptyTask(t *testing.T) { + n := &mocks.ExecutableNode{} + n.OnGetWorkflowNode().Return(nil) + taskID := "" + n.OnGetTaskID().Return(&taskID) + + nCtx := &mocks3.NodeExecutionContext{} + nCtx.OnNode().Return(n) + + fetchedID := common.GetTargetEntity(context.Background(), nCtx) + assert.Nil(t, fetchedID) } func Test_NodeContextDefaultInterruptible(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index d71f0b7e20..cd30fcf45b 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -152,7 +152,7 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx interfa } func (s *subworkflowHandler) getExecutionContextForDownstream(nCtx interfaces.NodeExecutionContext) (executors.ExecutionContext, error) { - newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt(), false) if err != nil { return nil, err } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go index 02de593463..9fa318e7f5 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -153,7 +153,7 @@ func Test_subworkflowHandler_HandleAbort(t *testing.T) { s := newSubworkflowHandler(nodeExec, eventConfig) n := &coreMocks.ExecutableNode{} swf.OnGetID().Return("swf") - newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt(), false) expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) nodeExec.OnAbortHandlerMatch(mock.Anything, expectedExecContext, swf, mock.Anything, n, "reason").Return(fmt.Errorf("err")) assert.Error(t, s.HandleAbort(ctx, nCtx, "reason")) @@ -187,7 +187,7 @@ func Test_subworkflowHandler_HandleAbort(t *testing.T) { s := newSubworkflowHandler(nodeExec, eventConfig) n := &coreMocks.ExecutableNode{} swf.OnGetID().Return("swf") - newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt()) + newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt(), false) expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) nodeExec.OnAbortHandlerMatch(mock.Anything, expectedExecContext, swf, mock.Anything, n, "reason").Return(nil) assert.NoError(t, s.HandleAbort(ctx, nCtx, "reason")) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 1c911b44f3..c9f7d5fc76 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -83,7 +83,9 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, eventVersion v1alpha1.EventVersion, parentInfo executors.ImmutableParentInfo, node v1alpha1.ExecutableNode, clusterID string, dynamicNodePhase v1alpha1.DynamicNodePhase, - eventConfig *config.EventConfig) (*event.NodeExecutionEvent, error) { + eventConfig *config.EventConfig, + targetEntity *core.Identifier) (*event.NodeExecutionEvent, error) { + if info.GetPhase() == handler.EPhaseNotReady { return nil, nil } @@ -101,6 +103,12 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, phase = core.NodeExecution_RUNNING } + // At some point, the entity that this event corresponds to came from a dynamic task. See the IDL for more info. + var dynamicChain = false + if parentInfo != nil && parentInfo.IsInDynamicChain() { + dynamicChain = true + } + var nev *event.NodeExecutionEvent // Start node is special case where the Inputs and Outputs are the same and hence here we copy the Output file // into the OutputResult and in admin we copy it over into input as well. @@ -112,19 +120,24 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, OutputResult: ToNodeExecOutput(&handler.OutputInfo{ OutputURI: outputsFile, }), - OccurredAt: occurredTime, - ProducerId: clusterID, - EventVersion: nodeExecutionEventVersion, - ReportedAt: ptypes.TimestampNow(), + OccurredAt: occurredTime, + ProducerId: clusterID, + EventVersion: nodeExecutionEventVersion, + ReportedAt: ptypes.TimestampNow(), + TargetEntity: targetEntity, + IsInDynamicChain: dynamicChain, } } else { + // include target_entity from function caller. nev = &event.NodeExecutionEvent{ - Id: nodeExecID, - Phase: phase, - OccurredAt: occurredTime, - ProducerId: clusterID, - EventVersion: nodeExecutionEventVersion, - ReportedAt: ptypes.TimestampNow(), + Id: nodeExecID, + Phase: phase, + OccurredAt: occurredTime, + ProducerId: clusterID, + EventVersion: nodeExecutionEventVersion, + ReportedAt: ptypes.TimestampNow(), + TargetEntity: targetEntity, + IsInDynamicChain: dynamicChain, } } @@ -199,6 +212,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, InputUri: inputPath, } } + return nev, nil } diff --git a/flytepropeller/pkg/controller/nodes/transformers_test.go b/flytepropeller/pkg/controller/nodes/transformers_test.go index f1d5202401..93a532a8d6 100644 --- a/flytepropeller/pkg/controller/nodes/transformers_test.go +++ b/flytepropeller/pkg/controller/nodes/transformers_test.go @@ -39,6 +39,7 @@ func TestToNodeExecutionEvent(t *testing.T) { parentInfo := mocks2.ImmutableParentInfo{} parentInfo.OnCurrentAttempt().Return(0) parentInfo.OnGetUniqueID().Return("u") + parentInfo.OnIsInDynamicChain().Return(true) node := mocks.ExecutableNode{} node.OnGetID().Return("n") node.OnGetName().Return("nodey") @@ -53,11 +54,12 @@ func TestToNodeExecutionEvent(t *testing.T) { }, }, info, "inputPath", &status, v1alpha1.EventVersion2, &parentInfo, &node, "clusterID", v1alpha1.DynamicNodePhaseParentFinalized, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyReference, - }) + }, nil) assert.NoError(t, err) assert.True(t, nev.IsDynamic) assert.True(t, nev.IsParent) assert.Equal(t, nodeExecutionEventVersion, nev.EventVersion) + assert.True(t, nev.IsInDynamicChain) }) t.Run("is parent", func(t *testing.T) { info := handler.PhaseInfoDynamicRunning(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{ @@ -69,6 +71,7 @@ func TestToNodeExecutionEvent(t *testing.T) { parentInfo := mocks2.ImmutableParentInfo{} parentInfo.OnCurrentAttempt().Return(0) parentInfo.OnGetUniqueID().Return("u") + parentInfo.OnIsInDynamicChain().Return(false) node := mocks.ExecutableNode{} node.OnGetID().Return("n") node.OnGetName().Return("nodey") @@ -87,7 +90,7 @@ func TestToNodeExecutionEvent(t *testing.T) { }, }, info, "inputPath", &status, v1alpha1.EventVersion2, &parentInfo, &node, "clusterID", v1alpha1.DynamicNodePhaseNone, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyReference, - }) + }, nil) assert.NoError(t, err) assert.False(t, nev.IsDynamic) assert.True(t, nev.IsParent) @@ -116,7 +119,7 @@ func TestToNodeExecutionEvent(t *testing.T) { }, }, info, "inputPath", &status, v1alpha1.EventVersion2, nil, &node, "clusterID", v1alpha1.DynamicNodePhaseParentFinalized, &config.EventConfig{ RawOutputPolicy: config.RawOutputPolicyInline, - }) + }, nil) assert.NoError(t, err) assert.True(t, proto.Equal(inputs, nev.GetInputData())) }) diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index e859b51258..ce4a75a0a1 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -232,7 +232,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata t.Stop() contentMD5, ok := metadata[strings.ToLower(FlyteContentMD5)].(string) if !ok { - logger.Warningf(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) + logger.Infof(ctx, "Failed to cast contentMD5 [%v] to string", contentMD5) } return StowMetadata{ exists: true,