-
Notifications
You must be signed in to change notification settings - Fork 674
/
prepare_execution.go
158 lines (142 loc) · 5.66 KB
/
prepare_execution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package impl
import (
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)
func addMapValues(overrides map[string]string, defaultValues map[string]string) map[string]string {
if defaultValues == nil {
defaultValues = map[string]string{}
}
if overrides == nil {
return defaultValues
}
for label, value := range overrides {
defaultValues[label] = value
}
return defaultValues
}
func addPermissions(securityCtx *core.SecurityContext, roleNameKey string, flyteWf *v1alpha1.FlyteWorkflow) {
if securityCtx == nil || securityCtx.RunAs == nil {
return
}
flyteWf.SecurityContext = *securityCtx
if len(securityCtx.RunAs.IamRole) > 0 {
if flyteWf.Annotations == nil {
flyteWf.Annotations = map[string]string{}
}
flyteWf.Annotations[roleNameKey] = securityCtx.RunAs.IamRole
}
if len(securityCtx.RunAs.K8SServiceAccount) > 0 {
flyteWf.ServiceAccountName = securityCtx.RunAs.K8SServiceAccount
}
}
func addExecutionOverrides(taskPluginOverrides []*admin.PluginOverride,
workflowExecutionConfig *admin.WorkflowExecutionConfig, recoveryExecution *core.WorkflowExecutionIdentifier,
taskResources *interfaces.TaskResources, flyteWf *v1alpha1.FlyteWorkflow) {
executionConfig := v1alpha1.ExecutionConfig{
TaskPluginImpls: make(map[string]v1alpha1.TaskPluginOverride),
RecoveryExecution: v1alpha1.WorkflowExecutionIdentifier{
WorkflowExecutionIdentifier: recoveryExecution,
},
}
for _, override := range taskPluginOverrides {
executionConfig.TaskPluginImpls[override.TaskType] = v1alpha1.TaskPluginOverride{
PluginIDs: override.PluginId,
MissingPluginBehavior: override.MissingPluginBehavior,
}
}
if workflowExecutionConfig != nil {
executionConfig.MaxParallelism = uint32(workflowExecutionConfig.MaxParallelism)
if workflowExecutionConfig.GetInterruptible() != nil {
interruptible := workflowExecutionConfig.GetInterruptible().GetValue()
executionConfig.Interruptible = &interruptible
}
executionConfig.OverwriteCache = workflowExecutionConfig.GetOverwriteCache()
envs := make(map[string]string)
if workflowExecutionConfig.GetEnvs() != nil {
for _, v := range workflowExecutionConfig.GetEnvs().Values {
envs[v.Key] = v.Value
}
executionConfig.EnvironmentVariables = envs
}
}
if taskResources != nil {
var requests = v1alpha1.TaskResourceSpec{}
if !taskResources.Defaults.CPU.IsZero() {
requests.CPU = taskResources.Defaults.CPU
}
if !taskResources.Defaults.Memory.IsZero() {
requests.Memory = taskResources.Defaults.Memory
}
if !taskResources.Defaults.EphemeralStorage.IsZero() {
requests.EphemeralStorage = taskResources.Defaults.EphemeralStorage
}
if !taskResources.Defaults.Storage.IsZero() {
requests.Storage = taskResources.Defaults.Storage
}
if !taskResources.Defaults.GPU.IsZero() {
requests.GPU = taskResources.Defaults.GPU
}
var limits = v1alpha1.TaskResourceSpec{}
if !taskResources.Limits.CPU.IsZero() {
limits.CPU = taskResources.Limits.CPU
}
if !taskResources.Limits.Memory.IsZero() {
limits.Memory = taskResources.Limits.Memory
}
if !taskResources.Limits.EphemeralStorage.IsZero() {
limits.EphemeralStorage = taskResources.Limits.EphemeralStorage
}
if !taskResources.Limits.Storage.IsZero() {
limits.Storage = taskResources.Limits.Storage
}
if !taskResources.Limits.GPU.IsZero() {
limits.GPU = taskResources.Limits.GPU
}
executionConfig.TaskResources = v1alpha1.TaskResources{
Requests: requests,
Limits: limits,
}
}
flyteWf.ExecutionConfig = executionConfig
}
func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1.FlyteWorkflow) error {
if data.ExecutionID == nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "invalid execution id")
}
if flyteWorkflow == nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "missing Flyte Workflow")
}
// add the executionId so Propeller can send events back that are associated with the ID
flyteWorkflow.ExecutionID = v1alpha1.WorkflowExecutionIdentifier{
WorkflowExecutionIdentifier: data.ExecutionID,
}
// add the acceptedAt timestamp so propeller can emit latency metrics.
acceptAtWrapper := v1.NewTime(data.ExecutionParameters.AcceptedAt)
flyteWorkflow.AcceptedAt = &acceptAtWrapper
// add permissions from auth and security context. Adding permissions from auth would be removed once all clients
// have migrated over to security context
addPermissions(data.ExecutionParameters.ExecutionConfig.SecurityContext,
data.ExecutionParameters.RoleNameKey, flyteWorkflow)
labels := addMapValues(data.ExecutionParameters.Labels, flyteWorkflow.Labels)
flyteWorkflow.Labels = labels
annotations := addMapValues(data.ExecutionParameters.Annotations, flyteWorkflow.Annotations)
flyteWorkflow.Annotations = annotations
if flyteWorkflow.WorkflowMeta == nil {
flyteWorkflow.WorkflowMeta = &v1alpha1.WorkflowMeta{}
}
flyteWorkflow.WorkflowMeta.EventVersion = v1alpha1.EventVersion(data.ExecutionParameters.EventVersion)
addExecutionOverrides(data.ExecutionParameters.TaskPluginOverrides, data.ExecutionParameters.ExecutionConfig,
data.ExecutionParameters.RecoveryExecution, data.ExecutionParameters.TaskResources, flyteWorkflow)
if data.ExecutionParameters.RawOutputDataConfig != nil {
flyteWorkflow.RawOutputDataConfig = v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: data.ExecutionParameters.RawOutputDataConfig,
}
}
return nil
}