From 6a14e7fbffe89786fb1d8cde22715f93c2f3aff5 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 26 Aug 2021 12:53:34 -0700 Subject: [PATCH] Add option to send raw output data in events (#304) --- go.mod | 2 +- go.sum | 4 +- pkg/controller/config/config.go | 19 ++ pkg/controller/config/config_flags.go | 2 + pkg/controller/config/config_flags_test.go | 28 +++ pkg/controller/controller.go | 4 +- .../events/mocks/node_event_recorder.go | 50 +++++ .../events/mocks/task_event_recorder.go | 50 +++++ .../events/mocks/workflow_event_recorder.go | 50 +++++ pkg/controller/events/node_execution.go | 86 +++++++++ pkg/controller/events/node_execution_test.go | 158 +++++++++++++++ pkg/controller/events/task_execution.go | 86 +++++++++ pkg/controller/events/task_execution_test.go | 180 ++++++++++++++++++ pkg/controller/events/test_utils.go | 50 +++++ pkg/controller/events/workflow_execution.go | 86 +++++++++ .../events/workflow_execution_test.go | 158 +++++++++++++++ pkg/controller/nodes/branch/evaluator.go | 1 + pkg/controller/nodes/branch/handler.go | 6 +- pkg/controller/nodes/branch/handler_test.go | 16 +- pkg/controller/nodes/dynamic/handler.go | 6 +- pkg/controller/nodes/dynamic/handler_test.go | 24 ++- pkg/controller/nodes/executor.go | 19 +- pkg/controller/nodes/executor_test.go | 86 +++++---- .../handler/mocks/node_execution_context.go | 2 +- .../nodes/handler/node_exec_context.go | 2 +- pkg/controller/nodes/handler_factory.go | 13 +- pkg/controller/nodes/node_exec_context.go | 3 +- pkg/controller/nodes/subworkflow/handler.go | 7 +- .../nodes/subworkflow/handler_test.go | 20 +- .../nodes/subworkflow/launchplan.go | 3 + .../nodes/subworkflow/launchplan_test.go | 30 ++- .../nodes/subworkflow/subworkflow.go | 6 +- .../nodes/subworkflow/subworkflow_test.go | 6 +- pkg/controller/nodes/task/handler.go | 15 +- pkg/controller/nodes/task/handler_test.go | 18 +- pkg/controller/nodes/task_event_recorder.go | 8 +- .../nodes/task_event_recorder_test.go | 10 +- pkg/controller/workflow/executor.go | 15 +- pkg/controller/workflow/executor_test.go | 83 ++++---- 39 files changed, 1261 insertions(+), 151 deletions(-) create mode 100644 pkg/controller/events/mocks/node_event_recorder.go create mode 100644 pkg/controller/events/mocks/task_event_recorder.go create mode 100644 pkg/controller/events/mocks/workflow_event_recorder.go create mode 100644 pkg/controller/events/node_execution.go create mode 100644 pkg/controller/events/node_execution_test.go create mode 100644 pkg/controller/events/task_execution.go create mode 100644 pkg/controller/events/task_execution_test.go create mode 100644 pkg/controller/events/test_utils.go create mode 100644 pkg/controller/events/workflow_execution.go create mode 100644 pkg/controller/events/workflow_execution_test.go diff --git a/go.mod b/go.mod index 8d01f06645..59b8ef9d5e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 - github.com/flyteorg/flyteidl v0.19.19 + github.com/flyteorg/flyteidl v0.19.22 github.com/flyteorg/flyteplugins v0.5.69 github.com/flyteorg/flytestdlib v0.3.34 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index faf5c5d174..dd8480db83 100644 --- a/go.sum +++ b/go.sum @@ -230,8 +230,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o= -github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ= +github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.5.69 h1:i1V1n+uoI5TrBG/UWD6vzJ/fFAtru9FSYbjCnYBttUc= github.com/flyteorg/flyteplugins v0.5.69/go.mod h1:YjahYP+i4/Qn+dFvxMOGbhDtkQT4EiH4Kb88KNK505A= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 478b36b855..156a74786c 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -105,6 +105,9 @@ var ( MetadataPrefix: "metadata/propeller", EnableAdminLauncher: true, MetricsPrefix: "flyte", + EventConfig: EventConfig{ + RawOutputPolicy: RawOutputPolicyReference, + }, } ) @@ -133,6 +136,7 @@ type Config struct { KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."` + EventConfig EventConfig `json:"event-config,omitempty" pflag:",Configures execution event behavior."` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. @@ -215,6 +219,21 @@ type LeaderElectionConfig struct { RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."` } +// Defines how output data should be passed along in execution events. +type RawOutputPolicy = string + +const ( + // Only send output data as a URI referencing where outputs have been uploaded + RawOutputPolicyReference RawOutputPolicy = "reference" + // Send raw output data in events. + RawOutputPolicyInline RawOutputPolicy = "inline" +) + +type EventConfig struct { + RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` + FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` +} + // GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object. func GetConfig() *Config { return configSection.GetConfig().(*Config) diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index 9fae047b63..d0590612bc 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -93,5 +93,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.max-node-retries-system-failures"), defaultConfig.NodeConfig.MaxNodeRetriesOnSystemFailures, "Maximum number of retries per node for node failure due to infra issues") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible'") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") return cmdFlags } diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index 5ff467b335..7b1ca36d97 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -701,4 +701,32 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_event-config.raw-output-policy", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("event-config.raw-output-policy", testValue) + if vString, err := cmdFlags.GetString("event-config.raw-output-policy"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.EventConfig.RawOutputPolicy) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_event-config.fallback-to-output-reference", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("event-config.fallback-to-output-reference", testValue) + if vBool, err := cmdFlags.GetBool("event-config.fallback-to-output-reference"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.EventConfig.FallbackToOutputReference) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e56898bcea..fb59aec92f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -421,12 +421,12 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes, - storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), scope) + storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), &cfg.EventConfig, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") } - workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, scope) + workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, scope) if err != nil { return nil, err } diff --git a/pkg/controller/events/mocks/node_event_recorder.go b/pkg/controller/events/mocks/node_event_recorder.go new file mode 100644 index 0000000000..bdb42cb0d4 --- /dev/null +++ b/pkg/controller/events/mocks/node_event_recorder.go @@ -0,0 +1,50 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + config "github.com/flyteorg/flytepropeller/pkg/controller/config" + + event "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + + mock "github.com/stretchr/testify/mock" +) + +// NodeEventRecorder is an autogenerated mock type for the NodeEventRecorder type +type NodeEventRecorder struct { + mock.Mock +} + +type NodeEventRecorder_RecordNodeEvent struct { + *mock.Call +} + +func (_m NodeEventRecorder_RecordNodeEvent) Return(_a0 error) *NodeEventRecorder_RecordNodeEvent { + return &NodeEventRecorder_RecordNodeEvent{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeEventRecorder) OnRecordNodeEvent(ctx context.Context, _a1 *event.NodeExecutionEvent, eventConfig *config.EventConfig) *NodeEventRecorder_RecordNodeEvent { + c := _m.On("RecordNodeEvent", ctx, _a1, eventConfig) + return &NodeEventRecorder_RecordNodeEvent{Call: c} +} + +func (_m *NodeEventRecorder) OnRecordNodeEventMatch(matchers ...interface{}) *NodeEventRecorder_RecordNodeEvent { + c := _m.On("RecordNodeEvent", matchers...) + return &NodeEventRecorder_RecordNodeEvent{Call: c} +} + +// RecordNodeEvent provides a mock function with given fields: ctx, _a1, eventConfig +func (_m *NodeEventRecorder) RecordNodeEvent(ctx context.Context, _a1 *event.NodeExecutionEvent, eventConfig *config.EventConfig) error { + ret := _m.Called(ctx, _a1, eventConfig) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *event.NodeExecutionEvent, *config.EventConfig) error); ok { + r0 = rf(ctx, _a1, eventConfig) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/controller/events/mocks/task_event_recorder.go b/pkg/controller/events/mocks/task_event_recorder.go new file mode 100644 index 0000000000..84140ed07b --- /dev/null +++ b/pkg/controller/events/mocks/task_event_recorder.go @@ -0,0 +1,50 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + config "github.com/flyteorg/flytepropeller/pkg/controller/config" + + event "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + + mock "github.com/stretchr/testify/mock" +) + +// TaskEventRecorder is an autogenerated mock type for the TaskEventRecorder type +type TaskEventRecorder struct { + mock.Mock +} + +type TaskEventRecorder_RecordTaskEvent struct { + *mock.Call +} + +func (_m TaskEventRecorder_RecordTaskEvent) Return(_a0 error) *TaskEventRecorder_RecordTaskEvent { + return &TaskEventRecorder_RecordTaskEvent{Call: _m.Call.Return(_a0)} +} + +func (_m *TaskEventRecorder) OnRecordTaskEvent(ctx context.Context, _a1 *event.TaskExecutionEvent, eventConfig *config.EventConfig) *TaskEventRecorder_RecordTaskEvent { + c := _m.On("RecordTaskEvent", ctx, _a1, eventConfig) + return &TaskEventRecorder_RecordTaskEvent{Call: c} +} + +func (_m *TaskEventRecorder) OnRecordTaskEventMatch(matchers ...interface{}) *TaskEventRecorder_RecordTaskEvent { + c := _m.On("RecordTaskEvent", matchers...) + return &TaskEventRecorder_RecordTaskEvent{Call: c} +} + +// RecordTaskEvent provides a mock function with given fields: ctx, _a1, eventConfig +func (_m *TaskEventRecorder) RecordTaskEvent(ctx context.Context, _a1 *event.TaskExecutionEvent, eventConfig *config.EventConfig) error { + ret := _m.Called(ctx, _a1, eventConfig) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *event.TaskExecutionEvent, *config.EventConfig) error); ok { + r0 = rf(ctx, _a1, eventConfig) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/controller/events/mocks/workflow_event_recorder.go b/pkg/controller/events/mocks/workflow_event_recorder.go new file mode 100644 index 0000000000..c55c0aadb4 --- /dev/null +++ b/pkg/controller/events/mocks/workflow_event_recorder.go @@ -0,0 +1,50 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + config "github.com/flyteorg/flytepropeller/pkg/controller/config" + + event "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + + mock "github.com/stretchr/testify/mock" +) + +// WorkflowEventRecorder is an autogenerated mock type for the WorkflowEventRecorder type +type WorkflowEventRecorder struct { + mock.Mock +} + +type WorkflowEventRecorder_RecordWorkflowEvent struct { + *mock.Call +} + +func (_m WorkflowEventRecorder_RecordWorkflowEvent) Return(_a0 error) *WorkflowEventRecorder_RecordWorkflowEvent { + return &WorkflowEventRecorder_RecordWorkflowEvent{Call: _m.Call.Return(_a0)} +} + +func (_m *WorkflowEventRecorder) OnRecordWorkflowEvent(ctx context.Context, _a1 *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) *WorkflowEventRecorder_RecordWorkflowEvent { + c := _m.On("RecordWorkflowEvent", ctx, _a1, eventConfig) + return &WorkflowEventRecorder_RecordWorkflowEvent{Call: c} +} + +func (_m *WorkflowEventRecorder) OnRecordWorkflowEventMatch(matchers ...interface{}) *WorkflowEventRecorder_RecordWorkflowEvent { + c := _m.On("RecordWorkflowEvent", matchers...) + return &WorkflowEventRecorder_RecordWorkflowEvent{Call: c} +} + +// RecordWorkflowEvent provides a mock function with given fields: ctx, _a1, eventConfig +func (_m *WorkflowEventRecorder) RecordWorkflowEvent(ctx context.Context, _a1 *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error { + ret := _m.Called(ctx, _a1, eventConfig) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *event.WorkflowExecutionEvent, *config.EventConfig) error); ok { + r0 = rf(ctx, _a1, eventConfig) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/controller/events/node_execution.go b/pkg/controller/events/node_execution.go new file mode 100644 index 0000000000..ef398dee11 --- /dev/null +++ b/pkg/controller/events/node_execution.go @@ -0,0 +1,86 @@ +package events + +import ( + "context" + "strings" + + "github.com/flyteorg/flytestdlib/logger" + + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +//go:generate mockery -all -output=mocks -case=underscore + +// Recorder for Node events +type NodeEventRecorder interface { + // Records node execution events indicating the node has undergone a phase change and additional metadata. + RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error +} + +type nodeEventRecorder struct { + eventRecorder events.NodeEventRecorder + store *storage.DataStore +} + +// In certain cases, a successful node execution event can be configured to include raw output data inline. However, +// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing +// the offloaded output URI instead. +func (r *nodeEventRecorder) handleFailure(ctx context.Context, ev *event.NodeExecutionEvent, err error) error { + st, ok := status.FromError(err) + if !ok || st.Code() != codes.ResourceExhausted { + // Error was not a status error + return err + } + if !strings.HasPrefix(st.Message(), "message too large") { + return err + } + + // This time, we attempt to record the event with the output URI set. + return r.eventRecorder.RecordNodeEvent(ctx, ev) +} + +func (r *nodeEventRecorder) RecordNodeEvent(ctx context.Context, ev *event.NodeExecutionEvent, eventConfig *config.EventConfig) error { + var origEvent = ev + var rawOutputPolicy = eventConfig.RawOutputPolicy + if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 { + outputs := &core.LiteralMap{} + err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs) + if err != nil { + // Fall back to forwarding along outputs by reference when we can't fetch them. + logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err) + rawOutputPolicy = config.RawOutputPolicyReference + } else { + origEvent = proto.Clone(ev).(*event.NodeExecutionEvent) + ev.OutputResult = &event.NodeExecutionEvent_OutputData{ + OutputData: outputs, + } + } + } + + err := r.eventRecorder.RecordNodeEvent(ctx, ev) + if err != nil { + logger.Infof(ctx, "Failed to record node event [%+v] with err: %v", ev, err) + // Only attempt to retry sending an event in the case we tried to send raw output data inline + if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline { + logger.Infof(ctx, "Falling back to sending node event outputs by reference for [%+v]", ev.Id) + return r.handleFailure(ctx, origEvent, err) + } + return err + } + return nil +} + +func NewNodeEventRecorder(eventSink events.EventSink, scope promutils.Scope, store *storage.DataStore) NodeEventRecorder { + return &nodeEventRecorder{ + eventRecorder: events.NewNodeEventRecorder(eventSink, scope), + store: store, + } +} diff --git a/pkg/controller/events/node_execution_test.go b/pkg/controller/events/node_execution_test.go new file mode 100644 index 0000000000..69538780e5 --- /dev/null +++ b/pkg/controller/events/node_execution_test.go @@ -0,0 +1,158 @@ +package events + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytestdlib/storage" + storageMocks "github.com/flyteorg/flytestdlib/storage/mocks" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func getReferenceNodeEv() *event.NodeExecutionEvent { + return &event.NodeExecutionEvent{ + Id: nodeExecID, + OutputResult: &event.NodeExecutionEvent_OutputUri{ + OutputUri: referenceURI, + }, + } +} + +func getRawOutputNodeEv() *event.NodeExecutionEvent { + return &event.NodeExecutionEvent{ + Id: nodeExecID, + OutputResult: &event.NodeExecutionEvent_OutputData{ + OutputData: outputData, + }, + } +} + +func TestRecordNodeEvent_Success_ReferenceOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordNodeEventCb = func(ctx context.Context, event *event.NodeExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceNodeEv())) + return nil + } + mockStore := &storage.DataStore{ + ComposedProtobufStore: &storageMocks.ComposedProtobufStore{}, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &nodeEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordNodeEvent(context.TODO(), getReferenceNodeEv(), referenceEventConfig) + assert.NoError(t, err) +} + +func TestRecordNodeEvent_Success_InlineOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordNodeEventCb = func(ctx context.Context, event *event.NodeExecutionEvent) error { + assert.True(t, proto.Equal(event, getRawOutputNodeEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &nodeEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordNodeEvent(context.TODO(), getReferenceNodeEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordNodeEvent_Failure_FetchInlineOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordNodeEventCb = func(ctx context.Context, event *event.NodeExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceNodeEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(errors.New("foo")) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &nodeEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordNodeEvent(context.TODO(), getReferenceNodeEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordNodeEvent_Failure_FallbackReference_Retry(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordNodeEventCb = func(ctx context.Context, event *event.NodeExecutionEvent) error { + if event.GetOutputData() != nil { + return status.Error(codes.ResourceExhausted, "message too large") + } + assert.True(t, proto.Equal(event, getReferenceNodeEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &nodeEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordNodeEvent(context.TODO(), getReferenceNodeEv(), inlineEventConfigFallback) + assert.NoError(t, err) +} + +func TestRecordNodeEvent_Failure_FallbackReference_Unretriable(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordNodeEventCb = func(ctx context.Context, event *event.NodeExecutionEvent) error { + return errors.New("foo") + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &nodeEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordNodeEvent(context.TODO(), getReferenceNodeEv(), inlineEventConfigFallback) + assert.EqualError(t, err, "foo") +} diff --git a/pkg/controller/events/task_execution.go b/pkg/controller/events/task_execution.go new file mode 100644 index 0000000000..edfe477c76 --- /dev/null +++ b/pkg/controller/events/task_execution.go @@ -0,0 +1,86 @@ +package events + +import ( + "context" + "strings" + + "github.com/flyteorg/flytestdlib/logger" + + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +//go:generate mockery -all -output=mocks -case=underscore + +// Recorder for Task events +type TaskEventRecorder interface { + // Records task execution events indicating the task has undergone a phase change and additional metadata. + RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent, eventConfig *config.EventConfig) error +} + +type taskEventRecorder struct { + eventRecorder events.TaskEventRecorder + store *storage.DataStore +} + +// In certain cases, a successful task execution event can be configured to include raw output data inline. However, +// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing +// the offloaded output URI instead. +func (r *taskEventRecorder) handleFailure(ctx context.Context, ev *event.TaskExecutionEvent, err error) error { + st, ok := status.FromError(err) + if !ok || st.Code() != codes.ResourceExhausted { + // Error was not a status error + return err + } + if !strings.HasPrefix(st.Message(), "message too large") { + return err + } + + // This time, we attempt to record the event with the output URI set. + return r.eventRecorder.RecordTaskEvent(ctx, ev) +} + +func (r *taskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskExecutionEvent, eventConfig *config.EventConfig) error { + var origEvent = ev + var rawOutputPolicy = eventConfig.RawOutputPolicy + if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 { + outputs := &core.LiteralMap{} + err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs) + if err != nil { + // Fall back to forwarding along outputs by reference when we can't fetch them. + logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err) + rawOutputPolicy = config.RawOutputPolicyReference + } else { + origEvent = proto.Clone(ev).(*event.TaskExecutionEvent) + ev.OutputResult = &event.TaskExecutionEvent_OutputData{ + OutputData: outputs, + } + } + } + + err := r.eventRecorder.RecordTaskEvent(ctx, ev) + if err != nil { + logger.Infof(ctx, "Failed to record task event [%+v] with err: %v", ev, err) + // Only attempt to retry sending an event in the case we tried to send raw output data inline + if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline { + logger.Infof(ctx, "Falling back to sending task event outputs by reference for [%+v]", ev.TaskId) + return r.handleFailure(ctx, origEvent, err) + } + return err + } + return nil +} + +func NewTaskEventRecorder(eventSink events.EventSink, scope promutils.Scope, store *storage.DataStore) TaskEventRecorder { + return &taskEventRecorder{ + eventRecorder: events.NewTaskEventRecorder(eventSink, scope), + store: store, + } +} diff --git a/pkg/controller/events/task_execution_test.go b/pkg/controller/events/task_execution_test.go new file mode 100644 index 0000000000..441036dc15 --- /dev/null +++ b/pkg/controller/events/task_execution_test.go @@ -0,0 +1,180 @@ +package events + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytestdlib/storage" + storageMocks "github.com/flyteorg/flytestdlib/storage/mocks" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var taskID = &core.Identifier{ + Project: "p", + Domain: "d", + Name: "n", + Version: "v", +} + +func getReferenceTaskEv() *event.TaskExecutionEvent { + return &event.TaskExecutionEvent{ + TaskId: taskID, + RetryAttempt: 1, + ParentNodeExecutionId: nodeExecID, + OutputResult: &event.TaskExecutionEvent_OutputUri{ + OutputUri: referenceURI, + }, + } +} + +func getRawOutputTaskEv() *event.TaskExecutionEvent { + return &event.TaskExecutionEvent{ + TaskId: taskID, + RetryAttempt: 1, + ParentNodeExecutionId: nodeExecID, + OutputResult: &event.TaskExecutionEvent_OutputData{ + OutputData: outputData, + }, + } +} + +// TODO: move this mock definition to flyteidl/events +type mockTaskEventRecorder struct { + RecordTaskEventCb func(ctx context.Context, event *event.TaskExecutionEvent) error +} + +func (r *mockTaskEventRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error { + if r.RecordTaskEventCb != nil { + return r.RecordTaskEventCb(ctx, event) + } + return nil +} + +func TestRecordTaskEvent_Success_ReferenceOutputs(t *testing.T) { + eventRecorder := mockTaskEventRecorder{} + eventRecorder.RecordTaskEventCb = func(ctx context.Context, event *event.TaskExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceTaskEv())) + return nil + } + mockStore := &storage.DataStore{ + ComposedProtobufStore: &storageMocks.ComposedProtobufStore{}, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &taskEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordTaskEvent(context.TODO(), getReferenceTaskEv(), referenceEventConfig) + assert.NoError(t, err) +} + +func TestRecordTaskEvent_Success_InlineOutputs(t *testing.T) { + eventRecorder := mockTaskEventRecorder{} + eventRecorder.RecordTaskEventCb = func(ctx context.Context, event *event.TaskExecutionEvent) error { + assert.True(t, proto.Equal(event, getRawOutputTaskEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &taskEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordTaskEvent(context.TODO(), getReferenceTaskEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordTaskEvent_Failure_FetchInlineOutputs(t *testing.T) { + eventRecorder := mockTaskEventRecorder{} + eventRecorder.RecordTaskEventCb = func(ctx context.Context, event *event.TaskExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceTaskEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(errors.New("foo")) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &taskEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordTaskEvent(context.TODO(), getReferenceTaskEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordTaskEvent_Failure_FallbackReference_Retry(t *testing.T) { + eventRecorder := mockTaskEventRecorder{} + eventRecorder.RecordTaskEventCb = func(ctx context.Context, event *event.TaskExecutionEvent) error { + if event.GetOutputData() != nil { + return status.Error(codes.ResourceExhausted, "message too large") + } + assert.True(t, proto.Equal(event, getReferenceTaskEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &taskEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordTaskEvent(context.TODO(), getReferenceTaskEv(), inlineEventConfigFallback) + assert.NoError(t, err) +} + +func TestRecordTaskEvent_Failure_FallbackReference_Unretriable(t *testing.T) { + eventRecorder := mockTaskEventRecorder{} + eventRecorder.RecordTaskEventCb = func(ctx context.Context, event *event.TaskExecutionEvent) error { + return errors.New("foo") + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &taskEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordTaskEvent(context.TODO(), getReferenceTaskEv(), inlineEventConfigFallback) + assert.EqualError(t, err, "foo") +} diff --git a/pkg/controller/events/test_utils.go b/pkg/controller/events/test_utils.go new file mode 100644 index 0000000000..cc7399b8ad --- /dev/null +++ b/pkg/controller/events/test_utils.go @@ -0,0 +1,50 @@ +package events + +import ( + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytepropeller/pkg/controller/config" +) + +var inlineEventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyInline, +} + +var inlineEventConfigFallback = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyInline, + FallbackToOutputReference: true, +} + +var referenceEventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + +var referenceURI = "s3://foo/bar/outputs.pb" + +var outputData = &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 4, + }, + }, + }, + }, + }, + }, + }, +} + +var workflowExecID = &core.WorkflowExecutionIdentifier{ + Project: "p", + Domain: "d", + Name: "n", +} + +var nodeExecID = &core.NodeExecutionIdentifier{ + ExecutionId: workflowExecID, + NodeId: "node_id", +} diff --git a/pkg/controller/events/workflow_execution.go b/pkg/controller/events/workflow_execution.go new file mode 100644 index 0000000000..9ac874a881 --- /dev/null +++ b/pkg/controller/events/workflow_execution.go @@ -0,0 +1,86 @@ +package events + +import ( + "context" + "strings" + + "github.com/flyteorg/flytestdlib/logger" + + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +//go:generate mockery -all -output=mocks -case=underscore + +// Recorder for Workflow events +type WorkflowEventRecorder interface { + // Records workflow execution events indicating the workflow has undergone a phase change and additional metadata. + RecordWorkflowEvent(ctx context.Context, event *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error +} + +type workflowEventRecorder struct { + eventRecorder events.WorkflowEventRecorder + store *storage.DataStore +} + +// In certain cases, a successful workflow execution event can be configured to include raw output data inline. However, +// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing +// the offloaded output URI instead. +func (r *workflowEventRecorder) handleFailure(ctx context.Context, ev *event.WorkflowExecutionEvent, err error) error { + st, ok := status.FromError(err) + if !ok || st.Code() != codes.ResourceExhausted { + // Error was not a status error + return err + } + if !strings.HasPrefix(st.Message(), "message too large") { + return err + } + + // This time, we attempt to record the event with the output URI set. + return r.eventRecorder.RecordWorkflowEvent(ctx, ev) +} + +func (r *workflowEventRecorder) RecordWorkflowEvent(ctx context.Context, ev *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error { + var origEvent = ev + var rawOutputPolicy = eventConfig.RawOutputPolicy + if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 { + outputs := &core.LiteralMap{} + err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs) + if err != nil { + // Fall back to forwarding along outputs by reference when we can't fetch them. + logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err) + rawOutputPolicy = config.RawOutputPolicyReference + } else { + origEvent = proto.Clone(ev).(*event.WorkflowExecutionEvent) + ev.OutputResult = &event.WorkflowExecutionEvent_OutputData{ + OutputData: outputs, + } + } + } + + err := r.eventRecorder.RecordWorkflowEvent(ctx, ev) + if err != nil { + logger.Infof(ctx, "Failed to record workflow event [%+v] with err: %v", ev, err) + // Only attempt to retry sending an event in the case we tried to send raw output data inline + if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline { + logger.Infof(ctx, "Falling back to sending workflow event outputs by reference for [%+v]", ev.ExecutionId) + return r.handleFailure(ctx, origEvent, err) + } + return err + } + return nil +} + +func NewWorkflowEventRecorder(eventSink events.EventSink, scope promutils.Scope, store *storage.DataStore) WorkflowEventRecorder { + return &workflowEventRecorder{ + eventRecorder: events.NewWorkflowEventRecorder(eventSink, scope), + store: store, + } +} diff --git a/pkg/controller/events/workflow_execution_test.go b/pkg/controller/events/workflow_execution_test.go new file mode 100644 index 0000000000..629c9b67b1 --- /dev/null +++ b/pkg/controller/events/workflow_execution_test.go @@ -0,0 +1,158 @@ +package events + +import ( + "context" + "testing" + + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytestdlib/storage" + storageMocks "github.com/flyteorg/flytestdlib/storage/mocks" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func getReferenceWorkflowEv() *event.WorkflowExecutionEvent { + return &event.WorkflowExecutionEvent{ + ExecutionId: workflowExecID, + OutputResult: &event.WorkflowExecutionEvent_OutputUri{ + OutputUri: referenceURI, + }, + } +} + +func getRawOutputWorkflowEv() *event.WorkflowExecutionEvent { + return &event.WorkflowExecutionEvent{ + ExecutionId: workflowExecID, + OutputResult: &event.WorkflowExecutionEvent_OutputData{ + OutputData: outputData, + }, + } +} + +func TestRecordWorkflowEvent_Success_ReferenceOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordWorkflowEventCb = func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceWorkflowEv())) + return nil + } + mockStore := &storage.DataStore{ + ComposedProtobufStore: &storageMocks.ComposedProtobufStore{}, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &workflowEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordWorkflowEvent(context.TODO(), getReferenceWorkflowEv(), referenceEventConfig) + assert.NoError(t, err) +} + +func TestRecordWorkflowEvent_Success_InlineOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordWorkflowEventCb = func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + assert.True(t, proto.Equal(event, getRawOutputWorkflowEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &workflowEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordWorkflowEvent(context.TODO(), getReferenceWorkflowEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordWorkflowEvent_Failure_FetchInlineOutputs(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordWorkflowEventCb = func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + assert.True(t, proto.Equal(event, getReferenceWorkflowEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(errors.New("foo")) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &workflowEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordWorkflowEvent(context.TODO(), getReferenceWorkflowEv(), inlineEventConfig) + assert.NoError(t, err) +} + +func TestRecordWorkflowEvent_Failure_FallbackReference_Retry(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordWorkflowEventCb = func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + if event.GetOutputData() != nil { + return status.Error(codes.ResourceExhausted, "message too large") + } + assert.True(t, proto.Equal(event, getReferenceWorkflowEv())) + return nil + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &workflowEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordWorkflowEvent(context.TODO(), getReferenceWorkflowEv(), inlineEventConfigFallback) + assert.NoError(t, err) +} + +func TestRecordWorkflowEvent_Failure_FallbackReference_Unretriable(t *testing.T) { + eventRecorder := events.MockRecorder{} + eventRecorder.RecordWorkflowEventCb = func(ctx context.Context, event *event.WorkflowExecutionEvent) error { + return errors.New("foo") + } + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.OnReadProtobufMatch(mock.Anything, mock.MatchedBy(func(ref storage.DataReference) bool { + return ref.String() == referenceURI + }), mock.Anything).Return(nil).Run(func(args mock.Arguments) { + arg := args.Get(2).(*core.LiteralMap) + *arg = *outputData + }) + mockStore := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + recorder := &workflowEventRecorder{ + eventRecorder: &eventRecorder, + store: mockStore, + } + err := recorder.RecordWorkflowEvent(context.TODO(), getReferenceWorkflowEv(), inlineEventConfigFallback) + assert.EqualError(t, err, "foo") +} diff --git a/pkg/controller/nodes/branch/evaluator.go b/pkg/controller/nodes/branch/evaluator.go index c8f411176b..fe6d7edac5 100644 --- a/pkg/controller/nodes/branch/evaluator.go +++ b/pkg/controller/nodes/branch/evaluator.go @@ -15,6 +15,7 @@ import ( const ErrorCodeUserProvidedError = "UserProvidedError" const ErrorCodeMalformedBranch = "MalformedBranchUserError" const ErrorCodeCompilerError = "CompilerError" +const ErrorCodeFailedFetchOutputs = "FailedFetchOutputs" func EvaluateComparison(expr *core.ComparisonExpression, nodeInputs *core.LiteralMap) (bool, error) { var lValue *core.Literal diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index 3d02761a3e..7c9549f71d 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" stdErrors "github.com/flyteorg/flytestdlib/errors" @@ -23,6 +25,7 @@ type metrics struct { type branchHandler struct { nodeExecutor executors.Node m metrics + eventConfig *config.EventConfig } func (b *branchHandler) FinalizeRequired() bool { @@ -241,9 +244,10 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution return b.nodeExecutor.FinalizeHandler(ctx, execContext, dag, nCtx.ContextualNodeLookup(), branchTakenNode) } -func New(executor executors.Node, scope promutils.Scope) handler.Node { +func New(executor executors.Node, eventConfig *config.EventConfig, scope promutils.Scope) handler.Node { return &branchHandler{ nodeExecutor: executor, m: metrics{scope: scope}, + eventConfig: eventConfig, } } diff --git a/pkg/controller/nodes/branch/handler_test.go b/pkg/controller/nodes/branch/handler_test.go index c40bd679b0..b6f4ab0d66 100644 --- a/pkg/controller/nodes/branch/handler_test.go +++ b/pkg/controller/nodes/branch/handler_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -27,6 +29,10 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler/mocks" ) +var eventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + type branchNodeStateHolder struct { s handler.BranchNodeState } @@ -197,7 +203,7 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) { childNodeStatus.On("SetDataDir", storage.DataReference("parent-data-dir")).Once() childNodeStatus.On("SetOutputDir", storage.DataReference("parent-output-dir")).Once() } - branch := New(mockNodeExecutor, promutils.NewTestScope()).(*branchHandler) + branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope()).(*branchHandler) h, err := branch.recurseDownstream(ctx, nCtx, test.nodeStatus, test.branchTakenNode) if test.isErr { assert.Error(t, err) @@ -278,7 +284,7 @@ func TestBranchHandler_AbortNode(t *testing.T) { eCtx := &execMocks.ExecutionContext{} eCtx.OnGetParentInfo().Return(nil) nCtx, _ := createNodeContext(v1alpha1.BranchNodeError, nil, n, nil, nil, eCtx) - branch := New(mockNodeExecutor, promutils.NewTestScope()) + branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope()) err := branch.Abort(ctx, nCtx, "") assert.NoError(t, err) }) @@ -296,7 +302,7 @@ func TestBranchHandler_AbortNode(t *testing.T) { mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) nl.OnGetNode(*s.s.FinalizedNodeID).Return(n, true) - branch := New(mockNodeExecutor, promutils.NewTestScope()) + branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope()) err := branch.Abort(ctx, nCtx, "") assert.NoError(t, err) }) @@ -305,7 +311,7 @@ func TestBranchHandler_AbortNode(t *testing.T) { func TestBranchHandler_Initialize(t *testing.T) { ctx := context.TODO() mockNodeExecutor := &execMocks.Node{} - branch := New(mockNodeExecutor, promutils.NewTestScope()) + branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope()) assert.NoError(t, branch.Setup(ctx, nil)) } @@ -313,7 +319,7 @@ func TestBranchHandler_Initialize(t *testing.T) { func TestBranchHandler_HandleNode(t *testing.T) { ctx := context.TODO() mockNodeExecutor := &execMocks.Node{} - branch := New(mockNodeExecutor, promutils.NewTestScope()) + branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope()) childNodeID := "child" childDatadir := v1alpha1.DataReference("test") w := &v1alpha1.FlyteWorkflow{ diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 4a7261e8e6..80aeffc550 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" @@ -60,6 +62,7 @@ type dynamicNodeTaskNodeHandler struct { metrics metrics nodeExecutor executors.Node lpReader launchplan.Reader + eventConfig *config.EventConfig } func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevState handler.DynamicNodeState, nCtx handler.NodeExecutionContext) (handler.Transition, handler.DynamicNodeState, error) { @@ -297,12 +300,13 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N return nil } -func New(underlying TaskNodeHandler, nodeExecutor executors.Node, launchPlanReader launchplan.Reader, scope promutils.Scope) handler.Node { +func New(underlying TaskNodeHandler, nodeExecutor executors.Node, launchPlanReader launchplan.Reader, eventConfig *config.EventConfig, scope promutils.Scope) handler.Node { return &dynamicNodeTaskNodeHandler{ TaskNodeHandler: underlying, metrics: newMetrics(scope), nodeExecutor: nodeExecutor, lpReader: launchPlanReader, + eventConfig: eventConfig, } } diff --git a/pkg/controller/nodes/dynamic/handler_test.go b/pkg/controller/nodes/dynamic/handler_test.go index 4ccd228b22..6c048ec178 100644 --- a/pkg/controller/nodes/dynamic/handler_test.go +++ b/pkg/controller/nodes/dynamic/handler_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/promutils/labeled" @@ -55,6 +57,10 @@ func (t *dynamicNodeStateHolder) PutDynamicNodeState(s handler.DynamicNodeState) var tID = "task-1" +var eventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { createNodeContext := func(ttype string) *nodeMocks.NodeExecutionContext { wfExecID := &core.WorkflowExecutionIdentifier{ @@ -181,7 +187,7 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { } else { h.OnHandleMatch(mock.Anything, mock.Anything).Return(tt.args.trns, nil) } - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) if (err != nil) != tt.want.isErr { t.Errorf("Handle() error = %v, wantErr %v", err, tt.want.isErr) @@ -293,7 +299,7 @@ func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) { assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) h := &mocks.TaskNodeHandler{} h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning.String(), got.Info().GetPhase().String()) @@ -313,7 +319,7 @@ func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) { assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) h := &mocks.TaskNodeHandler{} h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(fmt.Errorf("err")) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) _, err = d.Handle(context.TODO(), nCtx) assert.Error(t, err) }) @@ -566,7 +572,7 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) { execContext.OnGetParentInfo().Return(&immutableParentInfo) execContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) nCtx.OnExecutionContext().Return(&execContext) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) if tt.want.isErr { assert.Error(t, err) @@ -753,7 +759,7 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { execContext.OnGetParentInfo().Return(nil) execContext.OnGetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) nCtx.OnExecutionContext().Return(&execContext) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) if tt.want.isErr { assert.Error(t, err) @@ -823,7 +829,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) assert.NoError(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -955,7 +961,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) assert.NoError(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -976,7 +982,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { h.OnFinalize(ctx, nCtx).Return(fmt.Errorf("err")) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) assert.Error(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -997,7 +1003,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("err")) - d := New(h, n, mockLPLauncher, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, eventConfig, promutils.NewTestScope()) assert.Error(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index d6a146b539..f05cb3d87f 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -34,6 +34,7 @@ import ( eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + controllerEvents "github.com/flyteorg/flytepropeller/pkg/controller/events" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" @@ -88,8 +89,8 @@ type nodeExecutor struct { nodeHandlerFactory HandlerFactory enqueueWorkflow v1alpha1.EnqueueWorkflow store *storage.DataStore - nodeRecorder events.NodeEventRecorder - taskRecorder events.TaskEventRecorder + nodeRecorder controllerEvents.NodeEventRecorder + taskRecorder controllerEvents.TaskEventRecorder metrics *nodeMetrics maxDatasetSizeBytes int64 outputResolver OutputResolver @@ -100,6 +101,7 @@ type nodeExecutor struct { defaultDataSandbox storage.DataReference shardSelector ioutils.ShardSelector recoveryClient recovery.Client + eventConfig *config.EventConfig } func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) { @@ -128,7 +130,7 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve } logger.Infof(ctx, "Recording event p[%+v]", nodeEvent) - err := c.nodeRecorder.RecordNodeEvent(ctx, nodeEvent) + err := c.nodeRecorder.RecordNodeEvent(ctx, nodeEvent, c.eventConfig) if err != nil { if nodeEvent.GetId().NodeId == v1alpha1.EndNodeID { return nil @@ -224,6 +226,8 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe var outputs = &core.LiteralMap{} if recoveredData.FullOutputs != nil { outputs = recoveredData.FullOutputs + } else if recovered.Closure.GetOutputData() != nil { + outputs = recovered.Closure.GetOutputData() } else if len(recovered.Closure.GetOutputUri()) > 0 { if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.Closure.GetOutputUri()), outputs); err != nil { return handler.PhaseInfoUndefined, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read output data [%v].", recovered.Closure.GetOutputUri()) @@ -1047,7 +1051,7 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, - catalogClient catalog.Client, recoveryClient recovery.Client, scope promutils.Scope) (executors.Node, error) { + catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, scope promutils.Scope) (executors.Node, error) { // TODO we may want to make this configurable. shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx) @@ -1059,8 +1063,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora exec := &nodeExecutor{ store: store, enqueueWorkflow: enQWorkflow, - nodeRecorder: events.NewNodeEventRecorder(eventSink, nodeScope), - taskRecorder: events.NewTaskEventRecorder(eventSink, scope.NewSubScope("task")), + nodeRecorder: controllerEvents.NewNodeEventRecorder(eventSink, nodeScope, store), + taskRecorder: controllerEvents.NewTaskEventRecorder(eventSink, scope.NewSubScope("task"), store), maxDatasetSizeBytes: maxDatasetSize, metrics: &nodeMetrics{ Scope: nodeScope, @@ -1092,8 +1096,9 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultDataSandbox: defaultRawOutputPrefix, shardSelector: shardSelector, recoveryClient: recoveryClient, + eventConfig: eventConfig, } - nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, launchPlanReader, kubeClient, catalogClient, recoveryClient, nodeScope) + nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, launchPlanReader, kubeClient, catalogClient, recoveryClient, eventConfig, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory return exec, err } diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 17bfa96eac..dc92910c10 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + controllerEvents "github.com/flyteorg/flytepropeller/pkg/controller/events" + "github.com/golang/protobuf/proto" mocks3 "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" @@ -26,6 +28,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + eventMocks "github.com/flyteorg/flytepropeller/pkg/controller/events/mocks" mocks4 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" nodeHandlerMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler/mocks" @@ -53,6 +56,10 @@ const taskID = "tID" const inputsPath = "inputs.pb" const outputsPath = "out/outputs.pb" +var eventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + func TestSetInputsForStartNode(t *testing.T) { ctx := context.Background() mockStorage := createInmemoryDataStore(t, testScope.NewSubScope("f")) @@ -60,7 +67,7 @@ func TestSetInputsForStartNode(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), adminClient, - adminClient, 10, "s3://bucket/", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket/", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) inputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -107,7 +114,7 @@ func TestSetInputsForStartNode(t *testing.T) { failStorage := createFailingDatastore(t, testScope.NewSubScope("failing")) execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) t.Run("StorageFailure", func(t *testing.T) { w := createDummyBaseWorkflow(mockStorage) @@ -133,7 +140,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { t.Run("happy", func(t *testing.T) { execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -147,7 +154,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { t.Run("error", func(t *testing.T) { execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -170,7 +177,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -274,7 +281,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -677,7 +684,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, - adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -752,20 +759,21 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf called := false - exec.nodeRecorder = &events.MockRecorder{ - RecordNodeEventCb: func(ctx context.Context, ev *event.NodeExecutionEvent) error { - assert.NotNil(t, ev) - assert.Equal(t, test.eventPhase, ev.Phase) - called = true - return nil - }, - } + evRecorder := &eventMocks.NodeEventRecorder{} + evRecorder.OnRecordNodeEventMatch(mock.Anything, mock.MatchedBy(func(ev *event.NodeExecutionEvent) bool { + assert.NotNil(t, ev) + assert.Equal(t, test.eventPhase, ev.Phase) + called = true + return true + }), mock.Anything).Return(nil) + + exec.nodeRecorder = evRecorder h := &nodeHandlerMocks.Node{} h.On("Handle", @@ -862,20 +870,20 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf called := false - exec.nodeRecorder = &events.MockRecorder{ - RecordNodeEventCb: func(ctx context.Context, ev *event.NodeExecutionEvent) error { - assert.NotNil(t, ev) - assert.Equal(t, test.eventPhase.String(), ev.Phase.String()) - called = true - return nil - }, - } + evRecorder := &eventMocks.NodeEventRecorder{} + evRecorder.OnRecordNodeEventMatch(mock.Anything, mock.MatchedBy(func(ev *event.NodeExecutionEvent) bool { + assert.NotNil(t, ev) + assert.Equal(t, test.eventPhase, ev.Phase) + called = true + return true + }), mock.Anything).Return(nil) + exec.nodeRecorder = evRecorder h := &nodeHandlerMocks.Node{} h.On("Handle", @@ -926,7 +934,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -957,7 +965,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -991,7 +999,7 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -1102,7 +1110,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -1218,7 +1226,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) // Node not yet started @@ -1329,7 +1337,7 @@ func Test_nodeExecutor_RecordTransitionLatency(t *testing.T) { nodeHandlerFactory HandlerFactory enqueueWorkflow v1alpha1.EnqueueWorkflow store *storage.DataStore - nodeRecorder events.NodeEventRecorder + nodeRecorder controllerEvents.NodeEventRecorder metrics *nodeMetrics } type args struct { @@ -1725,7 +1733,7 @@ func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -1856,7 +1864,7 @@ type fakeNodeEventRecorder struct { err error } -func (f fakeNodeEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent) error { +func (f fakeNodeEventRecorder) RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error { if f.err != nil { return f.err } @@ -1871,7 +1879,7 @@ func Test_nodeExecutor_IdempotentRecordEvent(t *testing.T) { tests := []struct { name string - rec events.NodeEventRecorder + rec controllerEvents.NodeEventRecorder p core.NodeExecution_Phase wantErr bool }{ @@ -1885,6 +1893,9 @@ func Test_nodeExecutor_IdempotentRecordEvent(t *testing.T) { t.Run(tt.name, func(t *testing.T) { c := &nodeExecutor{ nodeRecorder: tt.rec, + eventConfig: &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, + }, } ev := &event.NodeExecutionEvent{ Id: &core.NodeExecutionIdentifier{}, @@ -2005,6 +2016,9 @@ func TestRecover(t *testing.T) { executor := nodeExecutor{ recoveryClient: recoveryClient, store: storageClient, + eventConfig: &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, + }, } phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) @@ -2071,6 +2085,7 @@ func TestRecover(t *testing.T) { executor := nodeExecutor{ recoveryClient: recoveryClient, store: storageClient, + eventConfig: eventConfig, } phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) @@ -2129,6 +2144,7 @@ func TestRecover(t *testing.T) { executor := nodeExecutor{ recoveryClient: recoveryClient, store: storageClient, + eventConfig: eventConfig, } phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) @@ -2192,6 +2208,7 @@ func TestRecover(t *testing.T) { executor := nodeExecutor{ recoveryClient: recoveryClient, store: storageClient, + eventConfig: eventConfig, } phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) @@ -2231,6 +2248,7 @@ func TestRecover(t *testing.T) { executor := nodeExecutor{ recoveryClient: recoveryClient, store: storageClient, + eventConfig: eventConfig, } phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx) diff --git a/pkg/controller/nodes/handler/mocks/node_execution_context.go b/pkg/controller/nodes/handler/mocks/node_execution_context.go index 5fc8cbff8a..97ed93bcfb 100644 --- a/pkg/controller/nodes/handler/mocks/node_execution_context.go +++ b/pkg/controller/nodes/handler/mocks/node_execution_context.go @@ -3,7 +3,7 @@ package mocks import ( - events "github.com/flyteorg/flyteidl/clients/go/events" + events "github.com/flyteorg/flytepropeller/pkg/controller/events" executors "github.com/flyteorg/flytepropeller/pkg/controller/executors" handler "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index a32c8c056f..419920534f 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -3,9 +3,9 @@ package handler import ( "context" - "github.com/flyteorg/flyteidl/clients/go/events" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flytepropeller/pkg/controller/events" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/controller/nodes/handler_factory.go b/pkg/controller/nodes/handler_factory.go index cef3c3cf1b..b42b2385f6 100644 --- a/pkg/controller/nodes/handler_factory.go +++ b/pkg/controller/nodes/handler_factory.go @@ -3,6 +3,8 @@ package nodes import ( "context" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" @@ -53,18 +55,19 @@ func (f handlerFactory) Setup(ctx context.Context, setup handler.SetupContext) e } func NewHandlerFactory(ctx context.Context, executor executors.Node, workflowLauncher launchplan.Executor, - launchPlanReader launchplan.Reader, kubeClient executors.Client, client catalog.Client, recoveryClient recovery.Client, scope promutils.Scope) (HandlerFactory, error) { + launchPlanReader launchplan.Reader, kubeClient executors.Client, client catalog.Client, recoveryClient recovery.Client, + eventConfig *config.EventConfig, scope promutils.Scope) (HandlerFactory, error) { - t, err := task.New(ctx, kubeClient, client, recoveryClient, scope) + t, err := task.New(ctx, kubeClient, client, eventConfig, scope) if err != nil { return nil, err } f := &handlerFactory{ handlers: map[v1alpha1.NodeKind]handler.Node{ - v1alpha1.NodeKindBranch: branch.New(executor, scope), - v1alpha1.NodeKindTask: dynamic.New(t, executor, launchPlanReader, scope), - v1alpha1.NodeKindWorkflow: subworkflow.New(executor, workflowLauncher, recoveryClient, scope), + v1alpha1.NodeKindBranch: branch.New(executor, eventConfig, scope), + v1alpha1.NodeKindTask: dynamic.New(t, executor, launchPlanReader, eventConfig, scope), + v1alpha1.NodeKindWorkflow: subworkflow.New(executor, workflowLauncher, recoveryClient, eventConfig, scope), v1alpha1.NodeKindStart: start.New(), v1alpha1.NodeKindEnd: end.New(), }, diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 78c7ff6339..86faeb0443 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -5,7 +5,8 @@ import ( "fmt" "strconv" - "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flytepropeller/pkg/controller/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index b11892e9f3..bf7b5c393b 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -3,6 +3,8 @@ package subworkflow import ( "context" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -127,13 +129,14 @@ func (w *workflowNodeHandler) Finalize(ctx context.Context, _ handler.NodeExecut return nil } -func New(executor executors.Node, workflowLauncher launchplan.Executor, recoveryClient recovery.Client, scope promutils.Scope) handler.Node { +func New(executor executors.Node, workflowLauncher launchplan.Executor, recoveryClient recovery.Client, eventConfig *config.EventConfig, scope promutils.Scope) handler.Node { workflowScope := scope.NewSubScope("workflow") return &workflowNodeHandler{ - subWfHandler: newSubworkflowHandler(executor), + subWfHandler: newSubworkflowHandler(executor, eventConfig), lpHandler: launchPlanHandler{ launchPlan: workflowLauncher, recoveryClient: recoveryClient, + eventConfig: eventConfig, }, metrics: newMetrics(workflowScope), } diff --git a/pkg/controller/nodes/subworkflow/handler_test.go b/pkg/controller/nodes/subworkflow/handler_test.go index 784df59343..049572ce00 100644 --- a/pkg/controller/nodes/subworkflow/handler_test.go +++ b/pkg/controller/nodes/subworkflow/handler_test.go @@ -6,6 +6,8 @@ import ( "reflect" "testing" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + mocks5 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -33,6 +35,10 @@ type workflowNodeStateHolder struct { s handler.WorkflowNodeState } +var eventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + func (t *workflowNodeStateHolder) PutTaskNodeState(s handler.TaskNodeState) error { panic("not implemented") } @@ -147,7 +153,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { t.Run("happy v0", func(t *testing.T) { mockLPExec := &mocks.Executor{} - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnLaunchMatch( ctx, mock.MatchedBy(func(o launchplan.LaunchContext) bool { @@ -170,7 +176,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { t.Run("happy v1", func(t *testing.T) { mockLPExec := &mocks.Executor{} - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnLaunchMatch( ctx, mock.MatchedBy(func(o launchplan.LaunchContext) bool { @@ -223,7 +229,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) { mockLPExec := &mocks.Executor{} - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnGetStatusMatch( ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -242,7 +248,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) { mockLPExec := &mocks.Executor{} - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnGetStatusMatch( ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -292,7 +298,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { mockLPExec := &mocks.Executor{} nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnKillMatch( ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -313,7 +319,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { mockLPExec := &mocks.Executor{} nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnKillMatch( ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { @@ -332,7 +338,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { mockLPExec := &mocks.Executor{} expectedErr := fmt.Errorf("fail") - h := New(nil, mockLPExec, recoveryClient, promutils.NewTestScope()) + h := New(nil, mockLPExec, recoveryClient, eventConfig, promutils.NewTestScope()) mockLPExec.OnKillMatch( ctx, mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index dbff0fb6b2..9a7bef6cbf 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,6 +25,7 @@ import ( type launchPlanHandler struct { launchPlan launchplan.Executor recoveryClient recovery.Client + eventConfig *config.EventConfig } func getParentNodeExecutionID(nCtx handler.NodeExecutionContext) (*core.NodeExecutionIdentifier, error) { diff --git a/pkg/controller/nodes/subworkflow/launchplan_test.go b/pkg/controller/nodes/subworkflow/launchplan_test.go index 7de30efdf1..eb0c82f3e2 100644 --- a/pkg/controller/nodes/subworkflow/launchplan_test.go +++ b/pkg/controller/nodes/subworkflow/launchplan_test.go @@ -347,7 +347,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { // tODO ssingh: do we need mockStore h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } op := &core.LiteralMap{ @@ -391,7 +392,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockStore := createInmemoryStore(t) mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } op := &core.LiteralMap{ @@ -432,7 +434,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -461,7 +464,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -484,7 +488,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -507,7 +512,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -528,7 +534,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -550,7 +557,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { mockLPExec := &mocks.Executor{} h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } op := &core.LiteralMap{ @@ -588,7 +596,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { uri := storage.DataReference("uri") h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", @@ -621,7 +630,8 @@ func TestSubWorkflowHandler_CheckLaunchPlanStatus(t *testing.T) { uri := storage.DataReference("uri") h := launchPlanHandler{ - launchPlan: mockLPExec, + launchPlan: mockLPExec, + eventConfig: eventConfig, } mockLPExec.On("GetStatus", diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 7d17ee0a94..95939d1399 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flytestdlib/logger" @@ -19,6 +21,7 @@ import ( // Subworkflow handler handles inline subWorkflows type subworkflowHandler struct { nodeExecutor executors.Node + eventConfig *config.EventConfig } // Helper method that extracts the SubWorkflow from the ExecutionContext @@ -236,8 +239,9 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE return s.nodeExecutor.AbortHandler(ctx, execContext, subWorkflow, nodeLookup, subWorkflow.StartNode(), reason) } -func newSubworkflowHandler(nodeExecutor executors.Node) subworkflowHandler { +func newSubworkflowHandler(nodeExecutor executors.Node, eventConfig *config.EventConfig) subworkflowHandler { return subworkflowHandler{ nodeExecutor: nodeExecutor, + eventConfig: eventConfig, } } diff --git a/pkg/controller/nodes/subworkflow/subworkflow_test.go b/pkg/controller/nodes/subworkflow/subworkflow_test.go index 3e438fef5a..50840776f7 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow_test.go +++ b/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -88,7 +88,7 @@ func Test_subworkflowHandler_HandleAbort(t *testing.T) { nCtx.OnNodeID().Return("n1") nodeExec := &execMocks.Node{} - s := newSubworkflowHandler(nodeExec) + s := newSubworkflowHandler(nodeExec, eventConfig) n := &coreMocks.ExecutableNode{} swf.OnGetID().Return("swf") nodeExec.OnAbortHandlerMatch(mock.Anything, ectx, swf, mock.Anything, n, "reason").Return(nil) @@ -121,7 +121,7 @@ func Test_subworkflowHandler_HandleAbort(t *testing.T) { nCtx.OnCurrentAttempt().Return(uint32(1)) nodeExec := &execMocks.Node{} - s := newSubworkflowHandler(nodeExec) + s := newSubworkflowHandler(nodeExec, eventConfig) n := &coreMocks.ExecutableNode{} swf.OnGetID().Return("swf") newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt()) @@ -155,7 +155,7 @@ func Test_subworkflowHandler_HandleAbort(t *testing.T) { nCtx.OnCurrentAttempt().Return(uint32(1)) nodeExec := &execMocks.Node{} - s := newSubworkflowHandler(nodeExec) + s := newSubworkflowHandler(nodeExec, eventConfig) n := &coreMocks.ExecutableNode{} swf.OnGetID().Return("swf") newParentInfo, _ := common.CreateParentInfo(nil, nCtx.NodeID(), nCtx.CurrentAttempt()) diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 2af6269ea5..b5cb30ce28 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -6,8 +6,6 @@ import ( "runtime/debug" "time" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -22,6 +20,7 @@ import ( pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" pluginK8s "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" @@ -172,6 +171,7 @@ type Handler struct { barrierCache *barrier cfg *config.Config pluginScope promutils.Scope + eventConfig *controllerConfig.EventConfig } func (t *Handler) FinalizeRequired() bool { @@ -429,7 +429,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute // Until then we have to check if the Handler executed resulted in a dynamic node being generated, if so, then // we will not check for outputs or call onTaskSuccess. The reason is that outputs have not yet been materialized. - // Outputs for the parent node will only get generated after the subtasks complete. We have to wait for the completion + // Output for the parent node will only get generated after the subtasks complete. We have to wait for the completion // the dynamic.handler will call onTaskSuccess for the parent node f, err := NewRemoteFutureFileReader(ctx, tCtx.ow.GetOutputPrefixPath(), tCtx.DataStore()) @@ -593,7 +593,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) if err != nil { return handler.UnknownTransition, err } - if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo); err != nil { + if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo, t.eventConfig); err != nil { logger.Errorf(ctx, "Event recording failed for Plugin [%s], eventPhase [%s], error :%s", p.GetID(), evInfo.Phase.String(), err.Error()) // Check for idempotency // Check for terminate state error @@ -618,7 +618,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) return handler.UnknownTransition, err } if evInfo != nil { - if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo); err != nil { + if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo, t.eventConfig); err != nil { // Check for idempotency // Check for terminate state error logger.Errorf(ctx, "failed to send event to Admin. error: %s", err.Error()) @@ -705,7 +705,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r Code: "Task Aborted", Message: reason, }}, - }); err != nil { + }, t.eventConfig); err != nil { logger.Errorf(ctx, "failed to send event to Admin. error: %s", err.Error()) return err } @@ -740,7 +740,7 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext }() } -func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, recoveryClient recovery.Client, scope promutils.Scope) (*Handler, error) { +func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, eventConfig *controllerConfig.EventConfig, scope promutils.Scope) (*Handler, error) { // TODO New should take a pointer async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog")) if err != nil { @@ -777,5 +777,6 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client secretManager: secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()), barrierCache: newLRUBarrier(ctx, cfg.BarrierConfig), cfg: cfg, + eventConfig: eventConfig, }, nil } diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index b8c1958736..ce5425fdf1 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/events" pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" @@ -22,7 +22,6 @@ import ( "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/promutils/labeled" - "github.com/flyteorg/flyteidl/clients/go/events" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" @@ -32,6 +31,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" pluginK8s "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -50,6 +50,10 @@ import ( rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config" ) +var eventConfig = &controllerConfig.EventConfig{ + RawOutputPolicy: controllerConfig.RawOutputPolicyReference, +} + func Test_task_setDefault(t *testing.T) { type fields struct { defaultPlugin pluginCore.Plugin @@ -239,7 +243,7 @@ func Test_task_Setup(t *testing.T) { sCtx.On("EnqueueOwner").Return(pluginCore.EnqueueOwner(func(name types.NamespacedName) error { return nil })) sCtx.On("MetricsScope").Return(promutils.NewTestScope()) - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, &mocks2.RecoveryClient{}, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, promutils.NewTestScope()) tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes assert.NoError(t, err) @@ -344,7 +348,7 @@ type fakeBufferedTaskEventRecorder struct { evs []*event.TaskExecutionEvent } -func (f *fakeBufferedTaskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskExecutionEvent) error { +func (f *fakeBufferedTaskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskExecutionEvent, eventConfig *controllerConfig.EventConfig) error { f.evs = append(f.evs, ev) return nil } @@ -898,7 +902,7 @@ func Test_task_Handle_Catalog(t *testing.T) { } else { c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) } - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, &mocks2.RecoveryClient{}, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) tk.defaultPlugins = map[pluginCore.TaskType]pluginCore.Plugin{ "test": fakeplugins.NewPhaseBasedPlugin(), @@ -1189,7 +1193,7 @@ func Test_task_Handle_Barrier(t *testing.T) { nCtx := createNodeContext(ev, "test", state, tt.args.prevTick) c := &pluginCatalogMocks.Client{} - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, &mocks2.RecoveryClient{}, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) tk.resourceManager = noopRm @@ -1664,7 +1668,7 @@ func Test_task_Finalize(t *testing.T) { } func TestNew(t *testing.T) { - got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, &mocks2.RecoveryClient{}, promutils.NewTestScope()) + got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) assert.NotNil(t, got) assert.NotNil(t, got.defaultPlugins) diff --git a/pkg/controller/nodes/task_event_recorder.go b/pkg/controller/nodes/task_event_recorder.go index 3a26f48437..bc1d59d749 100644 --- a/pkg/controller/nodes/task_event_recorder.go +++ b/pkg/controller/nodes/task_event_recorder.go @@ -3,7 +3,9 @@ package nodes import ( "context" - "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/logger" "github.com/pkg/errors" @@ -15,8 +17,8 @@ type taskEventRecorder struct { events.TaskEventRecorder } -func (t taskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskExecutionEvent) error { - if err := t.TaskEventRecorder.RecordTaskEvent(ctx, ev); err != nil { +func (t taskEventRecorder) RecordTaskEvent(ctx context.Context, ev *event.TaskExecutionEvent, eventConfig *config.EventConfig) error { + if err := t.TaskEventRecorder.RecordTaskEvent(ctx, ev, eventConfig); err != nil { if eventsErr.IsAlreadyExists(err) { logger.Warningf(ctx, "Failed to record taskEvent, error [%s]. Trying to record state: %s. Ignoring this error!", err.Error(), ev.Phase) return nil diff --git a/pkg/controller/nodes/task_event_recorder_test.go b/pkg/controller/nodes/task_event_recorder_test.go index e2558db350..617b37ddd4 100644 --- a/pkg/controller/nodes/task_event_recorder_test.go +++ b/pkg/controller/nodes/task_event_recorder_test.go @@ -5,7 +5,9 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/events" + eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" @@ -15,7 +17,7 @@ type fakeTaskEventsRecorder struct { err error } -func (f fakeTaskEventsRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error { +func (f fakeTaskEventsRecorder) RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent, eventConfig *config.EventConfig) error { if f.err != nil { return f.err } @@ -48,7 +50,9 @@ func Test_taskEventRecorder_RecordTaskEvent(t1 *testing.T) { ev := &event.TaskExecutionEvent{ Phase: tt.p, } - if err := t.RecordTaskEvent(context.TODO(), ev); (err != nil) != tt.wantErr { + if err := t.RecordTaskEvent(context.TODO(), ev, &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, + }); (err != nil) != tt.wantErr { t1.Errorf("RecordTaskEvent() error = %v, wantErr %v", err, tt.wantErr) } }) diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index bc1dfb668c..ccb7e4bd3a 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -5,10 +5,13 @@ import ( "fmt" "time" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyteidl/clients/go/events" eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + controllerEvents "github.com/flyteorg/flytepropeller/pkg/controller/events" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" @@ -59,11 +62,12 @@ func StatusFailed(err *core.ExecutionError) Status { type workflowExecutor struct { enqueueWorkflow v1alpha1.EnqueueWorkflow store *storage.DataStore - wfRecorder events.WorkflowEventRecorder + wfRecorder controllerEvents.WorkflowEventRecorder k8sRecorder record.EventRecorder metadataPrefix storage.DataReference nodeExecutor executors.Node metrics *workflowMetrics + eventConfig *config.EventConfig } func (c *workflowExecutor) constructWorkflowMetadataPrefix(ctx context.Context, w *v1alpha1.FlyteWorkflow) (storage.DataReference, error) { @@ -252,7 +256,7 @@ func convertToExecutionError(err *core.ExecutionError, alternateErr *core.Execut } func (c *workflowExecutor) IdempotentReportEvent(ctx context.Context, e *event.WorkflowExecutionEvent) error { - err := c.wfRecorder.RecordWorkflowEvent(ctx, e) + err := c.wfRecorder.RecordWorkflowEvent(ctx, e, c.eventConfig) if err != nil && eventsErr.IsAlreadyExists(err) { logger.Infof(ctx, "Workflow event phase: %s, executionId %s already exist", e.Phase.String(), e.ExecutionId) @@ -480,7 +484,9 @@ func (c *workflowExecutor) cleanupRunningNodes(ctx context.Context, w v1alpha1.E return nil } -func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor executors.Node, scope promutils.Scope) (executors.Workflow, error) { +func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, + k8sEventRecorder record.EventRecorder, metadataPrefix string, nodeExecutor executors.Node, eventConfig *config.EventConfig, + scope promutils.Scope) (executors.Workflow, error) { basePrefix := store.GetBaseContainerFQN(ctx) if metadataPrefix != "" { var err error @@ -497,10 +503,11 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al nodeExecutor: nodeExecutor, store: store, enqueueWorkflow: enQWorkflow, - wfRecorder: events.NewWorkflowEventRecorder(eventSink, workflowScope), + wfRecorder: controllerEvents.NewWorkflowEventRecorder(eventSink, workflowScope, store), k8sRecorder: k8sEventRecorder, metadataPrefix: basePrefix, metrics: newMetrics(workflowScope), + eventConfig: eventConfig, }, nil } diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index b6f5fec2ab..17054e0ffd 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -9,6 +9,9 @@ import ( "testing" "time" + "github.com/flyteorg/flyteidl/clients/go/events" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" @@ -32,9 +35,8 @@ import ( wfErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors" - "github.com/flyteorg/flyteidl/clients/go/events" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + eventMocks "github.com/flyteorg/flytepropeller/pkg/controller/events/mocks" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" "github.com/flyteorg/flytestdlib/yamlutils" @@ -59,6 +61,10 @@ const ( maxOutputSize = 10 * 1024 ) +var eventConfig = &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, +} + type fakeRemoteWritePlugin struct { pluginCore.Plugin enableAsserts bool @@ -235,9 +241,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -314,10 +320,10 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -377,10 +383,10 @@ func BenchmarkWorkflowExecutor(b *testing.B) { recoveryClient := &recoveryMocks.RecoveryClient{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, scope) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, scope) assert.NoError(b, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(b, err) assert.NoError(b, executor.Initialize(ctx)) @@ -465,9 +471,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { recoveryClient := &recoveryMocks.RecoveryClient{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -558,9 +564,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() recoveryClient := &recoveryMocks.RecoveryClient{} nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) assert.NoError(t, executor.Initialize(ctx)) @@ -615,7 +621,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, adminClient, - adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, promutils.NewTestScope()) + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) { @@ -627,7 +633,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("already exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, mockSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, mockSink, recorder, "metadata", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -647,7 +653,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("already exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -664,7 +670,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { Cause: errors.New("generic exists"), } } - executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) + executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, promutils.NewTestScope()) assert.NoError(t, err) w := &v1alpha1.FlyteWorkflow{} assert.NoError(t, json.Unmarshal(wJSON, w)) @@ -711,15 +717,18 @@ func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) { var evs []*event.WorkflowExecutionEvent nodeExec := &mocks2.Node{} + wfRecorder := &eventMocks.WorkflowEventRecorder{} + wfRecorder.On("RecordWorkflowEvent", mock.Anything, mock.MatchedBy(func(ev *event.WorkflowExecutionEvent) bool { + evs = append(evs, ev) + return true + }), mock.Anything).Return(nil) wExec := &workflowExecutor{ nodeExecutor: nodeExec, - wfRecorder: &events.MockRecorder{ - RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { - evs = append(evs, event) - return nil - }, + wfRecorder: wfRecorder, + metrics: newMetrics(promutils.NewTestScope()), + eventConfig: &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, }, - metrics: newMetrics(promutils.NewTestScope()), } nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -748,15 +757,18 @@ func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) { var evs []*event.WorkflowExecutionEvent nodeExec := &mocks2.Node{} + wfRecorder := &eventMocks.WorkflowEventRecorder{} + wfRecorder.OnRecordWorkflowEventMatch(mock.Anything, mock.MatchedBy(func(ev *event.WorkflowExecutionEvent) bool { + evs = append(evs, ev) + return true + }), mock.Anything).Return(nil) wExec := &workflowExecutor{ nodeExecutor: nodeExec, - wfRecorder: &events.MockRecorder{ - RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { - evs = append(evs, event) - return nil - }, + wfRecorder: wfRecorder, + metrics: newMetrics(promutils.NewTestScope()), + eventConfig: &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, }, - metrics: newMetrics(promutils.NewTestScope()), } nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -784,15 +796,18 @@ func TestWorkflowExecutor_HandleAbortedWorkflow(t *testing.T) { t.Run("failure-abort-success", func(t *testing.T) { var evs []*event.WorkflowExecutionEvent nodeExec := &mocks2.Node{} + wfRecorder := &eventMocks.WorkflowEventRecorder{} + wfRecorder.OnRecordWorkflowEventMatch(mock.Anything, mock.MatchedBy(func(ev *event.WorkflowExecutionEvent) bool { + evs = append(evs, ev) + return true + }), mock.Anything).Return(nil) wExec := &workflowExecutor{ nodeExecutor: nodeExec, - wfRecorder: &events.MockRecorder{ - RecordWorkflowEventCb: func(ctx context.Context, event *event.WorkflowExecutionEvent) error { - evs = append(evs, event) - return nil - }, + wfRecorder: wfRecorder, + metrics: newMetrics(promutils.NewTestScope()), + eventConfig: &config.EventConfig{ + RawOutputPolicy: config.RawOutputPolicyReference, }, - metrics: newMetrics(promutils.NewTestScope()), } nodeExec.OnAbortHandlerMatch(ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)