Skip to content

Commit

Permalink
RawDataOutput directory for every task execution (flyteorg#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Mar 26, 2020
1 parent 0c4673b commit 7cec5c3
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 64 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.8
github.com/lyft/flyteplugins v0.3.12
github.com/lyft/flytestdlib v0.3.2
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flyteplugins v0.3.15
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/mapstructure v1.1.2
Expand Down
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,22 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0=
github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.12 h1:ZfuHCwwZm5F6cII5X6Z/evBxJS+sZp9i/jkYySujIa0=
github.com/lyft/flyteplugins v0.3.12/go.mod h1:2fMH+Le0rlMlSOq5Z6utnkvDmw8AyYk7lxuAnYhlAI8=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w=
github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30=
github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA=
github.com/lyft/flyteplugins v0.3.15 h1:chDrm8maK3dCSy7UM8ElfmzTUBn1fiF7UnmP4px4sVI=
github.com/lyft/flyteplugins v0.3.15/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
39 changes: 20 additions & 19 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ var (
// Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is
// the base configuration to start propeller
type Config struct {
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Workers int `json:"workers" pflag:"2,Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"`
ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."`
EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"`
MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"`
MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"`
LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."`
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Workers int `json:"workers" pflag:"2,Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"`
ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."`
Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."`
EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"`
MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"`
MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"`
LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."`
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
}

type KubeClientConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 42 additions & 7 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package controller
import (
"context"

errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
stdErrs "github.com/lyft/flytestdlib/errors"

errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"

"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog"

Expand Down Expand Up @@ -297,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n

if trns.Info().GetPhase() == handler.EPhaseSuccess {
logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID())
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
// These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil.
// The sandbox creation as it uses hashing can be expensive and we skip that expense.
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil)
execID := task.GetTaskExecutionIdentifier(nCtx)
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
Expand Down
17 changes: 15 additions & 2 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import (
"fmt"
"time"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils"
errors2 "github.com/lyft/flytestdlib/errors"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/clients/go/events"
eventsErr "github.com/lyft/flyteidl/clients/go/events/errors"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"

"github.com/lyft/flytepropeller/pkg/controller/config"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -66,6 +68,8 @@ type nodeExecutor struct {
defaultActiveDeadline time.Duration
maxNodeRetriesForSystemFailures uint32
interruptibleFailureThreshold uint32
defaultDataSandbox storage.DataReference
shardSelector ioutils.ShardSelector
}

func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) {
Expand Down Expand Up @@ -735,7 +739,14 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error {
return c.nodeHandlerFactory.Setup(ctx, s)
}

func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, maxDatasetSize int64, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) {
func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) {

// TODO we may want to make this configurable.
shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx)
if err != nil {
return nil, err
}

nodeScope := scope.NewSubScope("node")
exec := &nodeExecutor{
Expand Down Expand Up @@ -765,6 +776,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration,
maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures),
interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold),
defaultDataSandbox: defaultRawOutputPrefix,
shardSelector: shardSelector,
}
nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope)
exec.nodeHandlerFactory = nodeHandlerFactory
Expand Down
Loading

0 comments on commit 7cec5c3

Please sign in to comment.