Skip to content

Commit

Permalink
Always return execution data inline when no remote data provider set (f…
Browse files Browse the repository at this point in the history
…lyteorg#178)

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
Katrina Rogan authored Apr 27, 2021
1 parent 8ec2d55 commit 4c387a5
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 6 deletions.
5 changes: 3 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,15 +1108,16 @@ func (m *ExecutionManager) GetExecutionData(
Inputs: &inputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || inputsURLBlob.Bytes < maxDataSize {
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || inputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
var fullOutputs core.LiteralMap
outputsURI := execution.Closure.GetOutputs().GetUri()
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs)
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,16 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
Outputs: &signedOutputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize {
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || signedInputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) {
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,16 @@ func (m *TaskExecutionManager) GetTaskExecutionData(
Outputs: &signedOutputsURLBlob,
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize {
remoteDataScheme := m.config.ApplicationConfiguration().GetRemoteDataConfig().Scheme
if remoteDataScheme == common.Local || signedInputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", taskExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(taskExecution.Closure.GetOutputUri()) > 0) {
if remoteDataScheme == common.Local || (signedOutputsURLBlob.Bytes < maxDataSize && len(taskExecution.Closure.GetOutputUri()) > 0) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(taskExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/manager/impl/testutils/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package testutils

import (
"github.com/flyteorg/flyteadmin/pkg/common"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
runtimeMocks "github.com/flyteorg/flyteadmin/pkg/runtime/mocks"
)
Expand All @@ -25,5 +26,6 @@ func GetApplicationConfigWithDefaultDomains() runtimeInterfaces.ApplicationConfi
Name: "domain",
},
})
config.SetRemoteDataConfig(runtimeInterfaces.RemoteDataConfig{Scheme: common.Local})
return &config
}
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.Remote
SignedURL: interfaces.SignedURL{
DurationMinutes: 3,
},
MaxSizeInBytes: 1048576, // 1 Mib
})
var notificationsConfig = config.MustRegisterSection(notifications, &interfaces.NotificationsConfig{
Type: common.Local,
Expand Down

0 comments on commit 4c387a5

Please sign in to comment.