This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
config.go
245 lines (219 loc) · 12.5 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
// Package config contains the core configuration for FlytePropeller. This configuration can be added under the ``propeller`` section.
// Example config:
// ----------------
// propeller:
// rawoutput-prefix: s3://my-container/test/
// metadata-prefix: metadata/propeller/sandbox
// workers: 4
// workflow-reeval-duration: 10s
// downstream-eval-duration: 5s
// limit-namespace: "all"
// prof-port: 11254
// metrics-prefix: flyte
// enable-admin-launcher: true
// max-ttl-hours: 1
// gc-interval: 500m
// queue:
// type: batch
// queue:
// type: bucket
// rate: 20
// capacity: 100
// sub-queue:
// type: bucket
// rate: 100
// capacity: 1000
// # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container
// kube-config: "$HOME/kubeconfig/k3s/k3s.yaml"
// publish-k8s-events: true
// workflowStore:
// policy: "ResourceVersionCache"
package config
import (
"time"
"github.com/flyteorg/flytestdlib/config"
"k8s.io/apimachinery/pkg/types"
)
//go:generate pflags Config --default-var=defaultConfig
const configSectionKey = "propeller"
var (
configSection = config.MustRegisterSection(configSectionKey, defaultConfig)
defaultConfig = &Config{
Workers: 20,
WorkflowReEval: config.Duration{
Duration: 10 * time.Second,
},
DownstreamEval: config.Duration{
Duration: 30 * time.Second,
},
MaxWorkflowRetries: 10,
MaxTTLInHours: 23,
GCInterval: config.Duration{
Duration: 30 * time.Minute,
},
MaxDatasetSizeBytes: 10 * 1024 * 1024,
Queue: CompositeQueueConfig{
Type: CompositeQueueBatch,
BatchingInterval: config.Duration{
Duration: time.Second,
},
BatchSize: -1,
Queue: WorkqueueConfig{
Type: WorkqueueTypeMaxOfRateLimiter,
BaseDelay: config.Duration{Duration: time.Second * 5},
MaxDelay: config.Duration{Duration: time.Second * 60},
Rate: 100,
Capacity: 1000,
},
Sub: WorkqueueConfig{
Type: WorkqueueTypeBucketRateLimiter,
Rate: 100,
Capacity: 1000,
},
},
KubeConfig: KubeClientConfig{
QPS: 100,
Burst: 25,
Timeout: config.Duration{Duration: 30 * time.Second},
},
LeaderElection: LeaderElectionConfig{
Enabled: false,
LeaseDuration: config.Duration{Duration: time.Second * 15},
RenewDeadline: config.Duration{Duration: time.Second * 10},
RetryPeriod: config.Duration{Duration: time.Second * 2},
},
NodeConfig: NodeConfig{
DefaultDeadlines: DefaultDeadlines{
DefaultNodeExecutionDeadline: config.Duration{Duration: time.Hour * 48},
DefaultNodeActiveDeadline: config.Duration{Duration: time.Hour * 48},
DefaultWorkflowActiveDeadline: config.Duration{Duration: time.Hour * 72},
},
MaxNodeRetriesOnSystemFailures: 3,
InterruptibleFailureThreshold: 1,
},
MaxStreakLength: 8, // Turbo mode is enabled by default
ProfilerPort: config.Port{
Port: 10254,
},
LimitNamespace: "all",
MetadataPrefix: "metadata/propeller",
EnableAdminLauncher: true,
MetricsPrefix: "flyte",
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
}
)
// Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is
// the base configuration to start propeller
// NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables.
type Config struct {
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Workers int `json:"workers" pflag:",Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:",Namespaces to watch for this propeller"`
ProfilerPort config.Port `json:"prof-port" pflag:",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."`
Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
MetricsPrefix string `json:"metrics-prefix" pflag:",An optional prefix for all published metrics."`
EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"Enable remote Workflow launcher to Admin"`
MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"Maximum number of retries per workflow"`
MaxTTLInHours int `json:"max-ttl-hours" pflag:"Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
GCInterval config.Duration `json:"gc-interval" pflag:"Run periodic GC every 30 minutes"`
LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."`
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
EventConfig EventConfig `json:"event-config,omitempty" pflag:",Configures execution event behavior."`
}
// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
type KubeClientConfig struct {
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS float32 `json:"qps" pflag:"-,Max QPS to the master for requests to KubeAPI. 0 defaults to 5."`
// Maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int `json:"burst" pflag:",Max burst rate for throttle. 0 defaults to 10"`
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout config.Duration `json:"timeout" pflag:",Max duration allowed for every request to KubeAPI before giving up. 0 implies no timeout."`
}
type CompositeQueueType = string
const (
CompositeQueueSimple CompositeQueueType = "simple"
CompositeQueueBatch CompositeQueueType = "batch"
)
// CompositeQueueConfig contains configuration for the controller queue and the downstream resource queue
type CompositeQueueConfig struct {
Type CompositeQueueType `json:"type" pflag:",Type of composite queue to use for the WorkQueue"`
Queue WorkqueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
Sub WorkqueueConfig `json:"sub-queue,omitempty" pflag:",SubQueue configuration, affects the way the nodes cause the top-level Work to be re-evaluated."`
BatchingInterval config.Duration `json:"batching-interval" pflag:",Duration for which downstream updates are buffered"`
BatchSize int `json:"batch-size" pflag:"-1,Number of downstream triggered top-level objects to re-enqueue every duration. -1 indicates all available."`
}
type WorkqueueType = string
const (
WorkqueueTypeDefault WorkqueueType = "default"
WorkqueueTypeBucketRateLimiter WorkqueueType = "bucket"
WorkqueueTypeExponentialFailureRateLimiter WorkqueueType = "expfailure"
WorkqueueTypeMaxOfRateLimiter WorkqueueType = "maxof"
)
// WorkqueueConfig has the configuration to configure a workqueue. We may want to generalize this in a package like k8sutils
type WorkqueueConfig struct {
// Refer to https://github.com/kubernetes/client-go/tree/master/util/workqueue
Type WorkqueueType `json:"type" pflag:",Type of RateLimiter to use for the WorkQueue"`
BaseDelay config.Duration `json:"base-delay" pflag:",base backoff delay for failure"`
MaxDelay config.Duration `json:"max-delay" pflag:",Max backoff delay for failure"`
Rate int64 `json:"rate" pflag:",Bucket Refill rate per second"`
Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"`
}
// NodeConfig contains configuration that is useful for every node execution
type NodeConfig struct {
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
InterruptibleFailureThreshold int64 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible'"`
}
// DefaultDeadlines contains default values for timeouts
type DefaultDeadlines struct {
DefaultNodeExecutionDeadline config.Duration `json:"node-execution-deadline" pflag:",Default value of node execution timeout"`
DefaultNodeActiveDeadline config.Duration `json:"node-active-deadline" pflag:",Default value of node timeout"`
DefaultWorkflowActiveDeadline config.Duration `json:"workflow-active-deadline" pflag:",Default value of workflow timeout"`
}
// LeaderElectionConfig Contains leader election configuration.
type LeaderElectionConfig struct {
// Enable or disable leader election.
Enabled bool `json:"enabled" pflag:",Enables/Disables leader election."`
// Determines the name of the configmap that leader election will use for holding the leader lock.
LockConfigMap types.NamespacedName `json:"lock-config-map" pflag:",ConfigMap namespace/name to use for resource lock."`
// Duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last
// observed ack
LeaseDuration config.Duration `json:"lease-duration" pflag:",Duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last observed ack."`
// RenewDeadline is the duration that the acting master will retry refreshing leadership before giving up.
RenewDeadline config.Duration `json:"renew-deadline" pflag:",Duration that the acting master will retry refreshing leadership before giving up."`
// RetryPeriod is the duration the LeaderElector clients should wait between tries of actions.
RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."`
}
// Defines how output data should be passed along in execution events.
type RawOutputPolicy = string
const (
// Only send output data as a URI referencing where outputs have been uploaded
RawOutputPolicyReference RawOutputPolicy = "reference"
// Send raw output data in events.
RawOutputPolicyInline RawOutputPolicy = "inline"
)
type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}
// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}
// MustRegisterSubSection can be used to configure any subsections the the propeller configuration
func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section {
return configSection.MustRegisterSection(subSectionKey, section)
}