Skip to content

Commit

Permalink
Add option to send raw output data in events (flyteorg#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Aug 26, 2021
1 parent ec1865b commit 6a14e7f
Show file tree
Hide file tree
Showing 39 changed files with 1,261 additions and 151 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.19.19
github.com/flyteorg/flyteidl v0.19.22
github.com/flyteorg/flyteplugins v0.5.69
github.com/flyteorg/flytestdlib v0.3.34
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o=
github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ=
github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.69 h1:i1V1n+uoI5TrBG/UWD6vzJ/fFAtru9FSYbjCnYBttUc=
github.com/flyteorg/flyteplugins v0.5.69/go.mod h1:YjahYP+i4/Qn+dFvxMOGbhDtkQT4EiH4Kb88KNK505A=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ var (
MetadataPrefix: "metadata/propeller",
EnableAdminLauncher: true,
MetricsPrefix: "flyte",
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
}
)

Expand Down Expand Up @@ -133,6 +136,7 @@ type Config struct {
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
EventConfig EventConfig `json:"event-config,omitempty" pflag:",Configures execution event behavior."`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down Expand Up @@ -215,6 +219,21 @@ type LeaderElectionConfig struct {
RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."`
}

// Defines how output data should be passed along in execution events.
type RawOutputPolicy = string

const (
// Only send output data as a URI referencing where outputs have been uploaded
RawOutputPolicyReference RawOutputPolicy = "reference"
// Send raw output data in events.
RawOutputPolicyInline RawOutputPolicy = "inline"
)

type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,12 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), scope)
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recovery.NewClient(adminClient), &cfg.EventConfig, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, scope)
workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, scope)
if err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/controller/events/mocks/node_event_recorder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions pkg/controller/events/mocks/task_event_recorder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions pkg/controller/events/mocks/workflow_event_recorder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions pkg/controller/events/node_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package events

import (
"context"
"strings"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/clients/go/events"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

//go:generate mockery -all -output=mocks -case=underscore

// Recorder for Node events
type NodeEventRecorder interface {
// Records node execution events indicating the node has undergone a phase change and additional metadata.
RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error
}

type nodeEventRecorder struct {
eventRecorder events.NodeEventRecorder
store *storage.DataStore
}

// In certain cases, a successful node execution event can be configured to include raw output data inline. However,
// for large outputs these events may exceed the event recipient's message size limit, so we fallback to passing
// the offloaded output URI instead.
func (r *nodeEventRecorder) handleFailure(ctx context.Context, ev *event.NodeExecutionEvent, err error) error {
st, ok := status.FromError(err)
if !ok || st.Code() != codes.ResourceExhausted {
// Error was not a status error
return err
}
if !strings.HasPrefix(st.Message(), "message too large") {
return err
}

// This time, we attempt to record the event with the output URI set.
return r.eventRecorder.RecordNodeEvent(ctx, ev)
}

func (r *nodeEventRecorder) RecordNodeEvent(ctx context.Context, ev *event.NodeExecutionEvent, eventConfig *config.EventConfig) error {
var origEvent = ev
var rawOutputPolicy = eventConfig.RawOutputPolicy
if rawOutputPolicy == config.RawOutputPolicyInline && len(ev.GetOutputUri()) > 0 {
outputs := &core.LiteralMap{}
err := r.store.ReadProtobuf(ctx, storage.DataReference(ev.GetOutputUri()), outputs)
if err != nil {
// Fall back to forwarding along outputs by reference when we can't fetch them.
logger.Warnf(ctx, "failed to fetch outputs by ref [%s] to send inline with err: %v", ev.GetOutputUri(), err)
rawOutputPolicy = config.RawOutputPolicyReference
} else {
origEvent = proto.Clone(ev).(*event.NodeExecutionEvent)
ev.OutputResult = &event.NodeExecutionEvent_OutputData{
OutputData: outputs,
}
}
}

err := r.eventRecorder.RecordNodeEvent(ctx, ev)
if err != nil {
logger.Infof(ctx, "Failed to record node event [%+v] with err: %v", ev, err)
// Only attempt to retry sending an event in the case we tried to send raw output data inline
if eventConfig.FallbackToOutputReference && rawOutputPolicy == config.RawOutputPolicyInline {
logger.Infof(ctx, "Falling back to sending node event outputs by reference for [%+v]", ev.Id)
return r.handleFailure(ctx, origEvent, err)
}
return err
}
return nil
}

func NewNodeEventRecorder(eventSink events.EventSink, scope promutils.Scope, store *storage.DataStore) NodeEventRecorder {
return &nodeEventRecorder{
eventRecorder: events.NewNodeEventRecorder(eventSink, scope),
store: store,
}
}
Loading

0 comments on commit 6a14e7f

Please sign in to comment.