-
Notifications
You must be signed in to change notification settings - Fork 674
/
data.go
131 lines (112 loc) · 4.95 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package util
import (
"context"
"github.com/golang/protobuf/proto"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
const (
OutputsFile = "outputs.pb"
DeckFile = "deck.html"
)
func shouldFetchData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 ||
urlBlob.Bytes < config.MaxSizeInBytes
}
func shouldFetchOutputData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool {
return len(outputURI) > 0 && shouldFetchData(config, urlBlob)
}
// GetInputs returns an inputs URL blob and if config settings permit, inline inputs data for an execution.
func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, inputURI string) (
*core.LiteralMap, *admin.UrlBlob, error) {
var inputsURLBlob admin.UrlBlob
var fullInputs core.LiteralMap
if len(inputURI) == 0 {
return &fullInputs, &inputsURLBlob, nil
}
var err error
if remoteDataConfig.SignedURL.Enabled {
inputsURLBlob, err = urlData.Get(ctx, inputURI)
if err != nil {
return nil, nil, err
}
}
if shouldFetchData(remoteDataConfig, inputsURLBlob) {
err = storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the input data.
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err)
}
}
return &fullInputs, &inputsURLBlob, nil
}
// ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
type ExecutionClosure interface {
GetOutputUri() string //nolint
GetOutputData() *core.LiteralMap
}
// Wrapper around an admin.ExecutionClosure object which conforms to the output interface
// used by admin.NodeExecutionClosure and admin.TaskExecutionClosure
// Due to historical reasons, the workflow execution closure message is slightly different.
type workflowExecutionClosure struct {
*admin.ExecutionClosure
}
func (c workflowExecutionClosure) GetOutputUri() string { //nolint
var outputURI string
if c.ExecutionClosure != nil && c.ExecutionClosure.GetOutputs() != nil {
outputURI = c.ExecutionClosure.GetOutputs().GetUri()
}
return outputURI
}
func (c workflowExecutionClosure) GetOutputData() *core.LiteralMap {
if c.ExecutionClosure.GetOutputs() != nil && c.ExecutionClosure.GetOutputs().GetValues() != nil {
return c.ExecutionClosure.GetOutputs().GetValues()
}
return c.ExecutionClosure.GetOutputData()
}
// ToExecutionClosureInterface converts a workflow execution closure to an implementation of the ExecutionClosure
// interface for use in producing execution output data.
func ToExecutionClosureInterface(closure *admin.ExecutionClosure) ExecutionClosure {
return &workflowExecutionClosure{
ExecutionClosure: closure,
}
}
// GetOutputs returns an outputs URL blob and if config settings permit, inline outputs data for an execution.
func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, closure ExecutionClosure) (
*core.LiteralMap, *admin.UrlBlob, error) {
var outputsURLBlob admin.UrlBlob
var fullOutputs = &core.LiteralMap{}
if closure == nil {
return fullOutputs, &outputsURLBlob, nil
}
if len(closure.GetOutputUri()) > 0 && remoteDataConfig.SignedURL.Enabled {
var err error
outputsURLBlob, err = urlData.Get(ctx, closure.GetOutputUri())
if err != nil {
return nil, nil, err
}
}
if closure.GetOutputData() != nil {
if int64(proto.Size(closure.GetOutputData())) < remoteDataConfig.MaxSizeInBytes {
fullOutputs = closure.GetOutputData()
} else {
logger.Debugf(ctx, "execution closure contains output data that exceeds max data size for responses")
}
} else if shouldFetchOutputData(remoteDataConfig, outputsURLBlob, closure.GetOutputUri()) {
err := storageClient.ReadProtobuf(ctx, storage.DataReference(closure.GetOutputUri()), fullOutputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the output data.
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err)
}
}
return fullOutputs, &outputsURLBlob, nil
}