Skip to content

Commit

Permalink
[House keeping] remove setting max size bytes in node context (#5092)
Browse files Browse the repository at this point in the history
* remove setting max size bytes in node context

Signed-off-by: Paul Dittamo <[email protected]>

* add note for clarity

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Apr 19, 2024
1 parent 35797ec commit 2ca3111
Show file tree
Hide file tree
Showing 34 changed files with 67 additions and 261 deletions.
5 changes: 2 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

//go:generate mockery -all -case=underscore

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Expand All @@ -34,9 +36,6 @@ type TaskExecutionContext interface {
// Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates
TaskRefreshIndicator() SignalAsync

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) {
}

func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
// Note: even though the data store retrieval checks against GetLimitMegabytes, there might be external
// storage implementations, so we keep this check here as well.
maxPayloadSize := maxDatasetSize
if maxPayloadSize == 0 {
maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024
}
return RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxDatasetSize,
maxPayloadSize: maxPayloadSize,
}
}
32 changes: 0 additions & 32 deletions flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type PluginContext interface {
// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ type TaskExecutionContext interface {

// Provides the raw datastore to enable persisting outputs.
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64
}

type GetContext interface {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC

switch w.Status() {
case workqueue.WorkStatusSucceeded:
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/plugins/array/outputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx := &mocks3.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnMaxDatasetSizeBytes().Return(10000)
tCtx.OnDataStore().Return(d)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
Expand Down Expand Up @@ -368,7 +367,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx.OnTaskReader().Return(tReader)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnDataStore().Return(ds)
tCtx.OnMaxDatasetSizeBytes().Return(10000)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/testing/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext)
return core.UnknownTransition, err
}

or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return core.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, outputs *fly
opReader = ioutils.NewInMemoryOutputReader(outputs, nil, nil)
} else {
logger.Debugf(ctx, "AgentDeployment didn't return any output, assuming file based outputs.")
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)
}
return taskCtx.OutputWriter().Put(ctx, opReader)
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext) error {
return nil
}

outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)
return taskCtx.OutputWriter().Put(ctx, outputReader)
}

Expand Down
1 change: 0 additions & 1 deletion flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
tCtx.OnCatalog().Return(cat)
tCtx.OnEventsRecorder().Return(eRecorder)
tCtx.OnResourceManager().Return(resourceManager)
tCtx.OnMaxDatasetSizeBytes().Return(1000000)
tCtx.OnSecretManager().Return(secretManager)

trns := pluginCore.DoTransition(pluginCore.PhaseInfoQueued(time.Now(), 0, ""))
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, sCfg.Limits.GetLimitMegabytes*1024*1024, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// checkpoint paths are not computed here because this function is only called when writing
// existing cached outputs. if this functionality changes this will need to be revisited.
outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "")
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0)

gatherOutputsRequest.reader = &reader
a.gatherOutputsRequestChannel <- gatherOutputsRequest
Expand Down
3 changes: 1 addition & 2 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler inter

// create node executor
nodeExecutor, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, dataStore, enqueueWorkflowFunc, mockEventSink, adminClient,
adminClient, 10, "s3://bucket/", mockKubeClient, noopCatalogClient, mockRecoveryClient, eventConfig, "clusterID", mockSignalClient, mockHandlerFactory, scope)
adminClient, "s3://bucket/", mockKubeClient, noopCatalogClient, mockRecoveryClient, eventConfig, "clusterID", mockSignalClient, mockHandlerFactory, scope)
assert.NoError(t, err)

// return ArrayNodeHandler
Expand All @@ -79,7 +79,6 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
currentParallelism uint32, maxParallelism uint32) interfaces.NodeExecutionContext {

nCtx := &mocks.NodeExecutionContext{}
nCtx.OnMaxDatasetSizeBytes().Return(9999999)
nCtx.OnCurrentAttempt().Return(uint32(0))

// ContextualNodeLookup
Expand Down
1 change: 0 additions & 1 deletion flytepropeller/pkg/controller/nodes/branch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func createNodeContext(phase v1alpha1.BranchNodePhase, childNodeID *v1alpha1.Nod
tmpDataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
nCtx.OnDataStore().Return(tmpDataStore)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeStatus().Return(ns)

nCtx.OnNodeID().Return("n1")
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.No
catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version)

outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0)
metadata := catalog.Metadata{
TaskExecutionIdentifier: task.GetTaskExecutionIdentifier(nCtx),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
nCtx.OnInputReader().Return(ir)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnTaskReader().Return(tr)
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeID().Return("n1")
nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil })
nCtx.OnDataStore().Return(dataStore)
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
// These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil.
// The sandbox creation as it uses hashing can be expensive and we skip that expense.
outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0)
ee, err := d.TaskNodeHandler.ValidateOutput(ctx, nCtx.NodeID(), nCtx.InputReader(),
outputReader, nil, nCtx.ExecutionContext().GetExecutionConfig(), nCtx.TaskReader())

Expand Down
4 changes: 0 additions & 4 deletions flytepropeller/pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) {
nCtx.OnInputReader().Return(ir)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnTaskReader().Return(tr)
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeStatus().Return(ns)
nCtx.OnNodeID().Return("n1")
nCtx.OnEnqueueOwnerFunc().Return(nil)
Expand Down Expand Up @@ -467,7 +466,6 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) {
nCtx.OnInputReader().Return(ir)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnTaskReader().Return(tr)
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeID().Return(nodeID)
nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil })
nCtx.OnDataStore().Return(dataStore)
Expand Down Expand Up @@ -665,7 +663,6 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
nCtx.OnInputReader().Return(ir)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnTaskReader().Return(tr)
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeID().Return(nodeID)
nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil })
nCtx.OnDataStore().Return(dataStore)
Expand Down Expand Up @@ -912,7 +909,6 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
nCtx.OnInputReader().Return(ir)
nCtx.OnCurrentAttempt().Return(uint32(1))
nCtx.OnTaskReader().Return(tr)
nCtx.OnMaxDatasetSizeBytes().Return(int64(1))
nCtx.OnNodeID().Return(nodeID)
nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil })
nCtx.OnDataStore().Return(dataStore)
Expand Down
4 changes: 1 addition & 3 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ type nodeExecutor struct {
enqueueWorkflow v1alpha1.EnqueueWorkflow
eventConfig *config.EventConfig
interruptibleFailureThreshold int32
maxDatasetSizeBytes int64
maxNodeRetriesForSystemFailures uint32
metrics *nodeMetrics
nodeRecorder events.NodeEventRecorder
Expand Down Expand Up @@ -1399,7 +1398,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
}

func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client,
workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client,
catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient,
nodeHandlerFactory interfaces.HandlerFactory, scope promutils.Scope) (interfaces.Node, error) {

Expand Down Expand Up @@ -1453,7 +1452,6 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
enqueueWorkflow: enQWorkflow,
eventConfig: eventConfig,
interruptibleFailureThreshold: nodeConfig.InterruptibleFailureThreshold,
maxDatasetSizeBytes: maxDatasetSize,
maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures),
metrics: metrics,
nodeRecorder: events.NewNodeEventRecorder(eventSink, nodeScope, store),
Expand Down
Loading

0 comments on commit 2ca3111

Please sign in to comment.