From 7cec5c3fc0559f89a4ad36b537777313bbe567f4 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 25 Mar 2020 17:11:18 -0700 Subject: [PATCH] RawDataOutput directory for every task execution (#92) --- go.mod | 6 +- go.sum | 15 +++- pkg/controller/config/config.go | 39 +++++------ pkg/controller/config/config_flags.go | 3 +- pkg/controller/config/config_flags_test.go | 49 +++++++++++-- pkg/controller/controller.go | 6 +- pkg/controller/nodes/dynamic/handler.go | 4 +- pkg/controller/nodes/executor.go | 17 ++++- pkg/controller/nodes/executor_test.go | 26 +++---- .../handler/mocks/node_execution_context.go | 68 +++++++++++++++++++ .../nodes/handler/node_exec_context.go | 13 +++- pkg/controller/nodes/node_exec_context.go | 23 ++++++- .../nodes/node_exec_context_test.go | 8 ++- pkg/controller/nodes/task/handler_test.go | 17 +++++ pkg/controller/nodes/task/taskexec_context.go | 7 +- .../nodes/task/taskexec_context_test.go | 3 + pkg/controller/workflow/executor_test.go | 12 ++-- 17 files changed, 252 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 97e242a446..cf43cd47f1 100644 --- a/go.mod +++ b/go.mod @@ -21,9 +21,9 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 - github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.12 - github.com/lyft/flytestdlib v0.3.2 + github.com/lyft/flyteidl v0.17.9 + github.com/lyft/flyteplugins v0.3.15 + github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect github.com/mitchellh/mapstructure v1.1.2 diff --git a/go.sum b/go.sum index 9d65e3fb29..25748404ef 100644 --- a/go.sum +++ b/go.sum @@ -382,11 +382,22 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ= github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0= github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.3.12 h1:ZfuHCwwZm5F6cII5X6Z/evBxJS+sZp9i/jkYySujIa0= -github.com/lyft/flyteplugins v0.3.12/go.mod h1:2fMH+Le0rlMlSOq5Z6utnkvDmw8AyYk7lxuAnYhlAI8= +github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= +github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w= +github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc= +github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30= +github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA= +github.com/lyft/flyteplugins v0.3.15 h1:chDrm8maK3dCSy7UM8ElfmzTUBn1fiF7UnmP4px4sVI= +github.com/lyft/flyteplugins v0.3.15/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= +github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= +github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 10dccecc02..400e7ede9a 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -54,25 +54,26 @@ var ( // Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is // the base configuration to start propeller type Config struct { - KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` - MasterURL string `json:"master"` - Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` - WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` - DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` - LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` - ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` - MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` - Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` - MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` - EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` - MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` - MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` - GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` - LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` - PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` - MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` - KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` - NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` + KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` + MasterURL string `json:"master"` + Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` + WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` + DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` + LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` + ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` + MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` + DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."` + Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` + MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` + EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` + MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` + MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` + GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` + LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` + PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` + MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` + KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` + NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` } type KubeClientConfig struct { diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index d92c30ef9c..4b52f673b2 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -49,6 +49,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limit-namespace"), defaultConfig.LimitNamespace, "Namespaces to watch for this propeller") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), defaultConfig.ProfilerPort.String(), "Profiler port") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metadata-prefix"), defaultConfig.MetadataPrefix, "MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "rawoutput-prefix"), defaultConfig.DefaultRawOutputPrefix, "a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.type"), defaultConfig.Queue.Type, "Type of composite queue to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.type"), defaultConfig.Queue.Queue.Type, "Type of RateLimiter to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.base-delay"), defaultConfig.Queue.Queue.BaseDelay.String(), "base backoff delay for failure") @@ -81,6 +82,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.node-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String(), "Default value of node timeout") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.workflow-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String(), "Default value of workflow timeout") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.max-node-retries-system-failures"), defaultConfig.NodeConfig.MaxNodeRetriesOnSystemFailures, "Maximum number of retries per node for node failure due to infra issues") - cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, " number of failures for a node to be still considered interruptible'") + cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible'") return cmdFlags } diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index c83f55de47..1d3db8cb0c 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -275,6 +275,28 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_rawoutput-prefix", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("rawoutput-prefix"); err == nil { + assert.Equal(t, string(defaultConfig.DefaultRawOutputPrefix), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("rawoutput-prefix", testValue) + if vString, err := cmdFlags.GetString("rawoutput-prefix"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.DefaultRawOutputPrefix) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_queue.type", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly @@ -979,13 +1001,26 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_node-config.interruptible-failure-threshold", func(t *testing.T) { - // Test that default value is set properly - if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { - assert.Equal(t, int64(defaultConfig.NodeConfig.InterruptibleFailureThreshold), vInt64) - } else { - assert.FailNow(t, err.Error()) - } + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { + assert.Equal(t, int64(defaultConfig.NodeConfig.InterruptibleFailureThreshold), vInt64) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.interruptible-failure-threshold", testValue) + if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.NodeConfig.InterruptibleFailureThreshold) + + } else { + assert.FailNow(t, err.Error()) + } + }) }) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 81d46bdd7c..c3ec4d6046 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,9 +3,10 @@ package controller import ( "context" - errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" stdErrs "github.com/lyft/flytestdlib/errors" + errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" + "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog" @@ -297,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store") } - nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope) + nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, + storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 8531d1543f..853e0c974d 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -106,7 +106,9 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n if trns.Info().GetPhase() == handler.EPhaseSuccess { logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID()) - outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir()) + // These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil. + // The sandbox creation as it uses hashing can be expensive and we skip that expense. + outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil) execID := task.GetTaskExecutionIdentifier(nCtx) outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{ diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 5d33d18124..a1d7d1a7b4 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" errors2 "github.com/lyft/flytestdlib/errors" "github.com/golang/protobuf/ptypes" @@ -12,7 +13,6 @@ import ( eventsErr "github.com/lyft/flyteidl/clients/go/events/errors" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/promutils" @@ -20,6 +20,8 @@ import ( "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" + "github.com/lyft/flytepropeller/pkg/controller/config" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -66,6 +68,8 @@ type nodeExecutor struct { defaultActiveDeadline time.Duration maxNodeRetriesForSystemFailures uint32 interruptibleFailureThreshold uint32 + defaultDataSandbox storage.DataReference + shardSelector ioutils.ShardSelector } func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) { @@ -735,7 +739,14 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { return c.nodeHandlerFactory.Setup(ctx, s) } -func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, maxDatasetSize int64, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { +func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, + workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { + + // TODO we may want to make this configurable. + shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx) + if err != nil { + return nil, err + } nodeScope := scope.NewSubScope("node") exec := &nodeExecutor{ @@ -765,6 +776,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures), interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold), + defaultDataSandbox: defaultRawOutputPrefix, + shardSelector: shardSelector, } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index 468b9979e1..7c40431073 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -45,7 +45,7 @@ func TestSetInputsForStartNode(t *testing.T) { mockStorage := createInmemoryDataStore(t, testScope.NewSubScope("f")) enQWf := func(workflowID v1alpha1.WorkflowID) {} - exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket/", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) inputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -91,7 +91,7 @@ func TestSetInputsForStartNode(t *testing.T) { }) failStorage := createFailingDatastore(t, testScope.NewSubScope("failing")) - execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("StorageFailure", func(t *testing.T) { w := createDummyBaseWorkflow(mockStorage) @@ -115,7 +115,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { assert.NoError(t, err) t.Run("happy", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -128,7 +128,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { }) t.Run("error", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -149,7 +149,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -247,7 +247,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -603,7 +603,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -675,7 +675,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -773,7 +773,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run(test.name, func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -823,7 +823,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-exhausted", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -850,7 +850,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-remaining", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -882,7 +882,7 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -990,7 +990,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) diff --git a/pkg/controller/nodes/handler/mocks/node_execution_context.go b/pkg/controller/nodes/handler/mocks/node_execution_context.go index db5219d400..59456d50eb 100644 --- a/pkg/controller/nodes/handler/mocks/node_execution_context.go +++ b/pkg/controller/nodes/handler/mocks/node_execution_context.go @@ -7,6 +7,8 @@ import ( io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" handler "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" + ioutils "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + mock "github.com/stretchr/testify/mock" storage "github.com/lyft/flytestdlib/storage" @@ -421,6 +423,72 @@ func (_m *NodeExecutionContext) NodeStatus() v1alpha1.ExecutableNodeStatus { return r0 } +type NodeExecutionContext_OutputShardSelector struct { + *mock.Call +} + +func (_m NodeExecutionContext_OutputShardSelector) Return(_a0 ioutils.ShardSelector) *NodeExecutionContext_OutputShardSelector { + return &NodeExecutionContext_OutputShardSelector{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeExecutionContext) OnOutputShardSelector() *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector") + return &NodeExecutionContext_OutputShardSelector{Call: c} +} + +func (_m *NodeExecutionContext) OnOutputShardSelectorMatch(matchers ...interface{}) *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector", matchers...) + return &NodeExecutionContext_OutputShardSelector{Call: c} +} + +// OutputShardSelector provides a mock function with given fields: +func (_m *NodeExecutionContext) OutputShardSelector() ioutils.ShardSelector { + ret := _m.Called() + + var r0 ioutils.ShardSelector + if rf, ok := ret.Get(0).(func() ioutils.ShardSelector); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ioutils.ShardSelector) + } + } + + return r0 +} + +type NodeExecutionContext_RawOutputPrefix struct { + *mock.Call +} + +func (_m NodeExecutionContext_RawOutputPrefix) Return(_a0 storage.DataReference) *NodeExecutionContext_RawOutputPrefix { + return &NodeExecutionContext_RawOutputPrefix{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeExecutionContext) OnRawOutputPrefix() *NodeExecutionContext_RawOutputPrefix { + c := _m.On("RawOutputPrefix") + return &NodeExecutionContext_RawOutputPrefix{Call: c} +} + +func (_m *NodeExecutionContext) OnRawOutputPrefixMatch(matchers ...interface{}) *NodeExecutionContext_RawOutputPrefix { + c := _m.On("RawOutputPrefix", matchers...) + return &NodeExecutionContext_RawOutputPrefix{Call: c} +} + +// RawOutputPrefix provides a mock function with given fields: +func (_m *NodeExecutionContext) RawOutputPrefix() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type NodeExecutionContext_TaskReader struct { *mock.Call } diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index c761664b0a..dd9224b8c9 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -5,12 +5,14 @@ import ( "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) @@ -39,6 +41,15 @@ type NodeExecutionMetadata interface { } type NodeExecutionContext interface { + // This path is never read by propeller, but allows using some container or prefix in a specific container for all output from tasks + // Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is + // available to the task at High Bandwidth (for example the base path of a sharded s3 bucket. + // This with a prefix based sharded strategy, could improve the throughput from S3 manifold) + RawOutputPrefix() storage.DataReference + + // Sharding strategy for the output data for this node execution. + OutputShardSelector() ioutils.ShardSelector + DataStore() *storage.DataStore InputReader() io.InputReader EventsRecorder() events.TaskEventRecorder diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index e7b39a7144..6099573f80 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -6,11 +6,12 @@ import ( "strconv" "github.com/lyft/flyteidl/clients/go/events" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" "github.com/lyft/flytepropeller/pkg/utils" @@ -54,6 +55,16 @@ type execContext struct { nsm *nodeStateManager enqueueOwner func() error w v1alpha1.ExecutableWorkflow + rawOutputPrefix storage.DataReference + shardSelector ioutils.ShardSelector +} + +func (e execContext) OutputShardSelector() ioutils.ShardSelector { + return e.shardSelector +} + +func (e execContext) RawOutputPrefix() storage.DataReference { + return e.rawOutputPrefix } func (e execContext) EnqueueOwnerFunc() func() error { @@ -112,7 +123,7 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error) *execContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { md := execMetadata{WorkflowMeta: w, interrutptible: interruptible} // Copy the wf labels before adding node specific labels. @@ -139,6 +150,8 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1. nsm: nsm, enqueueOwner: enqueueOwner, w: w, + rawOutputPrefix: rawOutputPrefix, + shardSelector: outputShardSelector, } } @@ -190,5 +203,9 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, w v1alpha1 tr, newNodeStateManager(ctx, s), workflowEnqueuer, + // Eventually we want to replace this with per workflow sandboxes + // https://github.com/lyft/flyte/issues/211 + c.defaultDataSandbox, + c.shardSelector, ), nil } diff --git a/pkg/controller/nodes/node_exec_context_test.go b/pkg/controller/nodes/node_exec_context_test.go index 571a201439..dc42eeedb2 100644 --- a/pkg/controller/nodes/node_exec_context_test.go +++ b/pkg/controller/nodes/node_exec_context_test.go @@ -5,11 +5,13 @@ import ( "testing" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" ) type TaskReader struct{} @@ -45,7 +47,7 @@ func Test_NodeContext(t *testing.T) { Kind: v1alpha1.NodeKindTask, } s, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) - nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, false, 0, nil, TaskReader{}, nil, nil) + nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, false, 0, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) assert.Equal(t, "id", nCtx.NodeExecutionMetadata().GetLabels()["node-id"]) assert.Equal(t, "false", nCtx.NodeExecutionMetadata().GetLabels()["interruptible"]) assert.Equal(t, "task-name", nCtx.NodeExecutionMetadata().GetLabels()["task-name"]) diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 06e2a32210..4fe39d4c0f 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/resourcemanager" "github.com/lyft/flytestdlib/contextutils" @@ -381,6 +383,9 @@ func Test_task_Handle_NoCatalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(pluginResp, st)) @@ -682,6 +687,9 @@ func Test_task_Handle_Catalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(&fakeplugins.NextPhaseState{ @@ -884,6 +892,9 @@ func Test_task_Handle_Barrier(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(&fakeplugins.NextPhaseState{ @@ -1137,6 +1148,9 @@ func Test_task_Abort(t *testing.T) { nCtx.On("EnqueueOwner").Return(nil) nCtx.On("EventsRecorder").Return(ev) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) a := 45 type test struct { @@ -1258,6 +1272,9 @@ func Test_task_Finalize(t *testing.T) { nCtx.On("EventsRecorder").Return(nil) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) st := bytes.NewBuffer([]byte{}) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index cee86ad56c..a9d7d54eb0 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -10,6 +10,7 @@ import ( "github.com/lyft/flytestdlib/logger" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + pluginCatalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -123,7 +124,11 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, err } - ow := ioutils.NewBufferedOutputWriter(ctx, ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())) + outputSandbox, err := ioutils.NewShardedRawOutputPath(ctx, nCtx.OutputShardSelector(), nCtx.RawOutputPrefix(), uniqueID, nCtx.DataStore()) + if err != nil { + return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "failed to create output sandbox for node execution") + } + ow := ioutils.NewBufferedOutputWriter(ctx, ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), outputSandbox)) ts := nCtx.NodeStateReader().GetTaskNodeState() var b *bytes.Buffer if ts.PluginState != nil { diff --git a/pkg/controller/nodes/task/taskexec_context_test.go b/pkg/controller/nodes/task/taskexec_context_test.go index ed0c4bae76..356bdfe134 100644 --- a/pkg/controller/nodes/task/taskexec_context_test.go +++ b/pkg/controller/nodes/task/taskexec_context_test.go @@ -8,6 +8,7 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -82,6 +83,8 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { PluginState: st.Bytes(), }) nCtx.On("NodeStateReader").Return(nr) + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index c7a968f0b2..90bdff4a16 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -226,7 +226,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -301,7 +301,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -359,7 +359,7 @@ func BenchmarkWorkflowExecutor(b *testing.B) { eventSink := events.NewMockEventSink() catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(b, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, scope) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, scope) assert.NoError(b, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -444,7 +444,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -532,7 +532,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -584,7 +584,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) {