Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Scale out with propeller manager and workflow sharding #351

Merged
merged 66 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
89f25cf
added 'manager' command
hamersaw Oct 4, 2021
fead2fd
using go routine and timer for manager loop
hamersaw Oct 4, 2021
7b44857
moved manager loop out of cmd and into pkg directory
hamersaw Oct 4, 2021
314e32b
detecting missing replicas
hamersaw Oct 5, 2021
de8bc27
moved extracting replica from pod name to new function
hamersaw Oct 5, 2021
6ff2396
creating managed flytepropeller pods
hamersaw Oct 6, 2021
bbbd57b
refactored configuration
hamersaw Oct 6, 2021
d3be5a9
removed regex parsing for replica - checking for existance with fully…
hamersaw Oct 7, 2021
1d0b24c
mocked out shard strategy abstraction
hamersaw Oct 7, 2021
09e9efb
adding arguments to podspec for ConsistentHashingShardStrategy
hamersaw Oct 7, 2021
56cfc87
updated import naming
hamersaw Oct 12, 2021
05a8a6e
moved manager to a top-level package
hamersaw Oct 13, 2021
52a370a
added shard strategy to manager configuration
hamersaw Oct 13, 2021
fbc4815
setting shard key label selector on managed propeller instances
hamersaw Oct 14, 2021
fd97c0e
Merge branch 'feature/scaleout' into feature/sharding-scale-out
hamersaw Oct 21, 2021
8039b9e
fixed random lint issues
hamersaw Oct 22, 2021
feec6ef
split pod name generate to separate function to ease future auto-scal…
hamersaw Oct 26, 2021
0374a8d
cleaned up pod label selector
hamersaw Oct 26, 2021
6c2a1f2
delete pods on shutdown
hamersaw Oct 26, 2021
09fb2af
added prometheus metric reporting
hamersaw Oct 26, 2021
4f37b6a
updated manager run loop to use k8s wait.UntilWithContext
hamersaw Oct 26, 2021
d295292
moved getKubeConfig into a shared package
hamersaw Oct 26, 2021
176051a
assigning shard and namespace labels on FlyteWorkflow
hamersaw Oct 27, 2021
3a71c42
implement NamespaceShardStrategy
hamersaw Oct 27, 2021
d01fc0f
implemented NamespaceShardStrategy
hamersaw Oct 27, 2021
217f16e
fixed shard label
hamersaw Oct 27, 2021
a5f730c
added comments
hamersaw Oct 28, 2021
7b4036f
checking for existing pods on startup
hamersaw Oct 28, 2021
a4dc6ba
handling delete of non-existent pod
hamersaw Oct 28, 2021
044ba5f
changes ConsistentHashing name to Random - because that's what it rea…
hamersaw Oct 28, 2021
936fc27
implemented EnableUncoveredReplica configuration option
hamersaw Oct 28, 2021
acbfa44
added leader election to manager using existing propeller config
hamersaw Oct 29, 2021
79f8329
fixed disable leader election in managed propeller pods
hamersaw Oct 29, 2021
40d74c0
removed listPods function
hamersaw Oct 29, 2021
9c69d26
added leader election to mitigate concurrent modification issues
hamersaw Oct 31, 2021
2ea0732
enabled pprof to profile resource metrics
hamersaw Nov 1, 2021
6cc9be5
added 'manager' target to Makefile to start manager in development mo…
hamersaw Nov 1, 2021
e102617
added shard strategy test for computing key ranges
hamersaw Nov 1, 2021
2d125f4
fixed key range computation
hamersaw Nov 1, 2021
3d8b5cd
merged master to propogate FlyteWorkflow CRD label additions
hamersaw Nov 1, 2021
b37947a
implemented project and domain shard types
hamersaw Nov 1, 2021
117b02c
returning error on out of range podIndex during UpdatePodSpec call on…
hamersaw Nov 1, 2021
959b9b5
fixed random lint issues
hamersaw Nov 2, 2021
dacf014
added manager tests
hamersaw Nov 2, 2021
6f08c89
fixed lint issues
hamersaw Nov 2, 2021
b4370d5
added doc comments on exported types and functions
hamersaw Nov 16, 2021
786569b
exporting ComputeKeyRange function and changed adding addLabelSelecto…
hamersaw Nov 16, 2021
583353e
adding pod template resource version and shard config hash annotation…
hamersaw Nov 17, 2021
1e999ce
removed pod deletion on manager shutdown
hamersaw Nov 17, 2021
3055d24
cleaned up unit tests and lint
hamersaw Nov 17, 2021
1410487
updated getContainer function to retrive flytepropeller container fro…
hamersaw Nov 17, 2021
9d2adac
removed addLabelSelectorIfExists function call
hamersaw Nov 17, 2021
3c35712
changed bytes.Buffer from a var to declaring with new
hamersaw Nov 18, 2021
fd21aeb
created a new shardstrategy package
hamersaw Nov 29, 2021
cfa26a7
generating mocks for ShardStrategy to decouple manager package tests …
hamersaw Nov 29, 2021
608fa88
fixed lint issues
hamersaw Nov 29, 2021
a041eff
changed shard configuration defintions and added support for wildcard…
hamersaw Nov 29, 2021
61f1d2d
updated documentation
hamersaw Nov 30, 2021
5109a8d
fixed lint issues
hamersaw Nov 30, 2021
b04ea42
setting managed pod owner references
hamersaw Nov 30, 2021
37d8bfd
updated documentation
hamersaw Nov 30, 2021
77a3437
fixed a few nits
hamersaw Dec 3, 2021
1a80f88
delete pods with failed state
hamersaw Dec 3, 2021
e74fc1d
changed ShardType type to int instead of string
hamersaw Dec 3, 2021
85e3517
removed default values in manager config
hamersaw Dec 3, 2021
f794181
updated config_flags with pflags generation
hamersaw Dec 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ update_boilerplate:
.PHONY: linux_compile
linux_compile:
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/kubectl-flyte ./cmd/kubectl-flyte/main.go

.PHONY: compile
compile:
mkdir -p ./bin
go build -o bin/flytepropeller ./cmd/controller/main.go
go build -o bin/flytepropeller-manager ./cmd/manager/main.go
go build -o bin/kubectl-flyte ./cmd/kubectl-flyte/main.go && cp bin/kubectl-flyte ${GOPATH}/bin

cross_compile:
@glide install
@mkdir -p ./bin/cross
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/kubectl-flyte ./cmd/kubectl-flyte/main.go

op_code_generate:
Expand All @@ -38,6 +41,11 @@ benchmark:
server:
@go run ./cmd/controller/main.go --alsologtostderr --propeller.kube-config=$(HOME)/.kube/config

# manager starts the manager service in development mode
.PHONY: manager
manager:
@go run ./cmd/manager/main.go --alsologtostderr --propeller.kube-config=$(HOME)/.kube/config

clean:
rm -rf bin

Expand Down
3 changes: 2 additions & 1 deletion cmd/controller/cmd/init_certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kubeErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/utils"

corev1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -84,7 +85,7 @@ func runCertsCmd(ctx context.Context, propellerCfg *config.Config, cfg *webhookC
return err
}

kubeClient, _, err := getKubeConfig(ctx, propellerCfg)
kubeClient, _, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}
Expand Down
58 changes: 27 additions & 31 deletions cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/flyteorg/flytestdlib/contextutils"

transformers "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"k8s.io/klog"

Expand All @@ -27,20 +28,15 @@ import (
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/pkg/errors"
"github.com/spf13/pflag"

"github.com/spf13/cobra"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

restclient "k8s.io/client-go/rest"

clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
"github.com/flyteorg/flytepropeller/pkg/controller"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
)

const (
Expand Down Expand Up @@ -116,39 +112,39 @@ func logAndExit(err error) {
os.Exit(-1)
}

func getKubeConfig(_ context.Context, cfg *config2.Config) (*kubernetes.Clientset, *restclient.Config, error) {
var kubecfg *restclient.Config
var err error
if cfg.KubeConfigPath != "" {
kubeConfigPath := os.ExpandEnv(cfg.KubeConfigPath)
kubecfg, err = clientcmd.BuildConfigFromFlags(cfg.MasterURL, kubeConfigPath)
if err != nil {
return nil, nil, errors.Wrapf(err, "Error building kubeconfig")
}
} else {
kubecfg, err = restclient.InClusterConfig()
if err != nil {
return nil, nil, errors.Wrapf(err, "Cannot get InCluster kubeconfig")
}
func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption {
selectors := []struct {
label string
operation v1.LabelSelectorOperator
values []string
}{
{transformers.ShardKeyLabel, v1.LabelSelectorOpIn, cfg.IncludeShardKeyLabel},
{transformers.ShardKeyLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeShardKeyLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpIn, cfg.IncludeProjectLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeProjectLabel},
{transformers.DomainLabel, v1.LabelSelectorOpIn, cfg.IncludeDomainLabel},
{transformers.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel},
}

kubecfg.QPS = cfg.KubeConfig.QPS
kubecfg.Burst = cfg.KubeConfig.Burst
kubecfg.Timeout = cfg.KubeConfig.Timeout.Duration
labelSelector := controller.IgnoreCompletedWorkflowsLabelSelector()
for _, selector := range selectors {
if len(selector.values) > 0 {
labelSelectorRequirement := v1.LabelSelectorRequirement{
Key: selector.label,
Operator: selector.operation,
Values: selector.values,
}

kubeClient, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return nil, nil, errors.Wrapf(err, "Error building kubernetes clientset")
labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement)
}
}
return kubeClient, kubecfg, err
}

func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption {
opts := []informers.SharedInformerOption{
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = v1.FormatLabelSelector(controller.IgnoreCompletedWorkflowsLabelSelector())
options.LabelSelector = v1.FormatLabelSelector(labelSelector)
}),
}

if cfg.LimitNamespace != defaultNamespace {
opts = append(opts, informers.WithNamespace(cfg.LimitNamespace))
}
Expand All @@ -166,7 +162,7 @@ func executeRootCmd(cfg *config2.Config) {
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

kubeClient, kubecfg, err := getKubeConfig(ctx, cfg)
kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
logger.Fatalf(ctx, "Error building kubernetes clientset: %s", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
"github.com/flyteorg/flytepropeller/pkg/webhook"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
Expand Down Expand Up @@ -105,7 +106,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w

fmt.Println(string(raw))

kubeClient, kubecfg, err := getKubeConfig(ctx, propellerCfg)
kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}
Expand Down
202 changes: 202 additions & 0 deletions cmd/manager/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Commands for FlytePropeller manager.
package cmd

import (
"context"
"flag"
"os"
"runtime"

"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/config/viper"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/version"

"github.com/flyteorg/flytepropeller/manager"
managerConfig "github.com/flyteorg/flytepropeller/manager/config"
propellerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)

const (
appName = "flytepropeller-manager"
podDefaultNamespace = "flyte"
podNameEnvVar = "POD_NAME"
podNamespaceEnvVar = "POD_NAMESPACE"
)

var (
cfgFile string
configAccessor = viper.NewAccessor(config.Options{StrictMode: true})
)

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: appName,
Short: "Runs FlytePropeller Manager to scale out FlytePropeller by executing multiple instances configured according to the defined sharding scheme.",
Long: `
FlytePropeller Manager is used to effectively scale out FlyteWorkflow processing among a collection of FlytePropeller instances. Users configure a sharding mechanism (ex. 'hash', 'project', or 'domain') to define the sharding environment.

The FlytePropeller Manager uses a kubernetes PodTemplate to construct the base FlytePropeller PodSpec. This means, apart from the configured sharding scheme, all managed FlytePropeller instances will be identical.

The Manager ensures liveness and correctness by periodically scanning kubernets pods and recovering state (ie. starting missing pods, etc). Live configuration updates are currently unsupported, meaning configuration changes require an application restart.

Sample configuration, illustrating 3 separate sharding techniques, is provided below:

manager:
pod-application: "flytepropeller"
pod-namespace: "flyte"
pod-template-name: "flytepropeller-template"
pod-template-namespace: "flyte"
scan-interval: 10s
shard:
# distribute FlyteWorkflow processing over 3 machines evenly
type: hash
pod-count: 3

# process the specified projects on defined replicas and all uncovered projects on another
type: project
enableUncoveredReplica: true
replicas:
- entities:
- flytesnacks
- entities:
- flyteexamples
- flytelab

# process the 'production' domain on a single instace and all other domains on another
type: domain
enableUncoveredReplica: true
replicas:
- entities:
- production
`,
PersistentPreRunE: initConfig,
Run: func(cmd *cobra.Command, args []string) {
executeRootCmd(propellerConfig.GetConfig(), managerConfig.GetConfig())
},
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
version.LogBuildInformation(appName)
logger.Infof(context.TODO(), "detected %d CPU's\n", runtime.NumCPU())
if err := rootCmd.Execute(); err != nil {
logger.Error(context.TODO(), err)
os.Exit(1)
}
}

func init() {
// allows `$ flytepropeller-manager --logtostderr` to work
klog.InitFlags(flag.CommandLine)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
err := flag.CommandLine.Parse([]string{})
if err != nil {
logAndExit(err)
}

// Here you will define your flags and configuration settings. Cobra supports persistent flags, which, if defined
// here, will be global for your application.
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "",
"config file (default is $HOME/config.yaml)")

configAccessor.InitializePflags(rootCmd.PersistentFlags())

rootCmd.AddCommand(viper.GetConfigCommand())
}

func initConfig(cmd *cobra.Command, _ []string) error {
configAccessor = viper.NewAccessor(config.Options{
StrictMode: false,
SearchPaths: []string{cfgFile},
})

configAccessor.InitializePflags(cmd.PersistentFlags())

err := configAccessor.UpdateConfig(context.TODO())
if err != nil {
return err
}

return nil
}

func logAndExit(err error) {
logger.Error(context.Background(), err)
os.Exit(-1)
}

func executeRootCmd(propellerCfg *propellerConfig.Config, cfg *managerConfig.Config) {
baseCtx := context.Background()

// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

// lookup owner reference
kubeClient, _, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
logger.Fatalf(ctx, "error building kubernetes clientset [%v]", err)
}

ownerReferences := make([]metav1.OwnerReference, 0)
lookupOwnerReferences := true
podName, found := os.LookupEnv(podNameEnvVar)
if !found {
lookupOwnerReferences = false
}

podNamespace, found := os.LookupEnv(podNamespaceEnvVar)
if !found {
lookupOwnerReferences = false
podNamespace = podDefaultNamespace
}

if lookupOwnerReferences {
p, err := kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
logger.Fatalf(ctx, "failed to get pod '%v' in namespace '%v' [%v]", podName, podNamespace, err)
}

for _, ownerReference := range p.OwnerReferences {
// must set owner reference controller to false because k8s does not allow setting pod
// owner references to a controller that does not acknowledge ownership. in this case
// the owner is technically the FlytePropeller Manager pod and not that pods owner.
*ownerReference.BlockOwnerDeletion = false
*ownerReference.Controller = false

ownerReferences = append(ownerReferences, ownerReference)
}
}

// Add the propeller_manager subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
scope := promutils.NewScope(propellerCfg.MetricsPrefix).NewSubScope("propeller_manager")

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "failed to start profiling and metrics server [%v]", err)
}
}()

m, err := manager.New(ctx, propellerCfg, cfg, podNamespace, ownerReferences, kubeClient, scope)
if err != nil {
logger.Fatalf(ctx, "failed to start manager [%v]", err)
} else if m == nil {
logger.Fatalf(ctx, "failed to start manager, nil manager received")
}

if err = m.Run(ctx); err != nil {
logger.Fatalf(ctx, "error running manager [%v]", err)
}
}
9 changes: 9 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/flyteorg/flytepropeller/cmd/manager/cmd"
)

func main() {
cmd.Execute()
}
Loading