Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Scale out with propeller manager and workflow sharding (#351)
Browse files Browse the repository at this point in the history
* added 'manager' command

Signed-off-by: Daniel Rammer <[email protected]>

* using go routine and timer for manager loop

Signed-off-by: Daniel Rammer <[email protected]>

* moved manager loop out of cmd and into pkg directory

Signed-off-by: Daniel Rammer <[email protected]>

* detecting missing replicas

Signed-off-by: Daniel Rammer <[email protected]>

* moved extracting replica from pod name to new function

Signed-off-by: Daniel Rammer <[email protected]>

* creating managed flytepropeller pods

Signed-off-by: Daniel Rammer <[email protected]>

* refactored configuration

Signed-off-by: Daniel Rammer <[email protected]>

* removed regex parsing for replica - checking for existance with fully qualified pod name

Signed-off-by: Daniel Rammer <[email protected]>

* mocked out shard strategy abstraction

Signed-off-by: Daniel Rammer <[email protected]>

* adding arguments to podspec for ConsistentHashingShardStrategy

Signed-off-by: Daniel Rammer <[email protected]>

* updated import naming

Signed-off-by: Daniel Rammer <[email protected]>

* moved manager to a top-level package

Signed-off-by: Daniel Rammer <[email protected]>

* added shard strategy to manager configuration

Signed-off-by: Daniel Rammer <[email protected]>

* setting shard key label selector on managed propeller instances

Signed-off-by: Daniel Rammer <[email protected]>

* fixed random lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* split pod name generate to separate function to ease future auto-scaler implementation

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up pod label selector

Signed-off-by: Daniel Rammer <[email protected]>

* delete pods on shutdown

Signed-off-by: Daniel Rammer <[email protected]>

* added prometheus metric reporting

Signed-off-by: Daniel Rammer <[email protected]>

* updated manager run loop to use k8s wait.UntilWithContext

Signed-off-by: Daniel Rammer <[email protected]>

* moved getKubeConfig into a shared package

Signed-off-by: Daniel Rammer <[email protected]>

* assigning shard and namespace labels on FlyteWorkflow

Signed-off-by: Daniel Rammer <[email protected]>

* implement NamespaceShardStrategy

Signed-off-by: Daniel Rammer <[email protected]>

* implemented NamespaceShardStrategy

Signed-off-by: Daniel Rammer <[email protected]>

* fixed shard label

Signed-off-by: Daniel Rammer <[email protected]>

* added comments

Signed-off-by: Daniel Rammer <[email protected]>

* checking for existing pods on startup

Signed-off-by: Daniel Rammer <[email protected]>

* handling delete of non-existent pod

Signed-off-by: Daniel Rammer <[email protected]>

* changes ConsistentHashing name to Random - because that's what it really is

Signed-off-by: Daniel Rammer <[email protected]>

* implemented EnableUncoveredReplica configuration option

Signed-off-by: Daniel Rammer <[email protected]>

* added leader election to manager using existing propeller config

Signed-off-by: Daniel Rammer <[email protected]>

* fixed disable leader election in managed propeller pods

Signed-off-by: Daniel Rammer <[email protected]>

* removed listPods function

Signed-off-by: Daniel Rammer <[email protected]>

* added leader election to mitigate concurrent modification issues

Signed-off-by: Daniel Rammer <[email protected]>

* enabled pprof to profile resource metrics

Signed-off-by: Daniel Rammer <[email protected]>

* added 'manager' target to Makefile to start manager in development mode (similar to existing server)

Signed-off-by: Daniel Rammer <[email protected]>

* added shard strategy test for computing key ranges

Signed-off-by: Daniel Rammer <[email protected]>

* fixed key range computation

Signed-off-by: Daniel Rammer <[email protected]>

* implemented project and domain shard types

Signed-off-by: Daniel Rammer <[email protected]>

* returning error on out of range podIndex during UpdatePodSpec call on shard strategy

Signed-off-by: Daniel Rammer <[email protected]>

* fixed random lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* added manager tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* added doc comments on exported types and functions

Signed-off-by: Daniel Rammer <[email protected]>

* exporting ComputeKeyRange function and changed adding addLabelSelector function name to addLabelSelectorIfExists to better reflect functionality

Signed-off-by: Daniel Rammer <[email protected]>

* adding pod template resource version and shard config hash annotations to fuel automatic pod management on updates

Signed-off-by: Daniel Rammer <[email protected]>

* removed pod deletion on manager shutdown

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up unit tests and lint

Signed-off-by: Daniel Rammer <[email protected]>

* updated getContainer function to retrive flytepropeller container from pod spec using container name instead of command

Signed-off-by: Daniel Rammer <[email protected]>

* removed addLabelSelectorIfExists function call

Signed-off-by: Daniel Rammer <[email protected]>

* changed bytes.Buffer from a var to declaring with new

Signed-off-by: Daniel Rammer <[email protected]>

* created a new shardstrategy package

Signed-off-by: Daniel Rammer <[email protected]>

* generating mocks for ShardStrategy to decouple manager package tests from shardstrategy package tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* changed shard configuration defintions and added support for wildcard id in EnvironmentShardStrategy

Signed-off-by: Daniel Rammer <[email protected]>

* updated documentation

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* setting managed pod owner references

Signed-off-by: Daniel Rammer <[email protected]>

* updated documentation

Signed-off-by: Daniel Rammer <[email protected]>

* fixed a few nits

Signed-off-by: Daniel Rammer <[email protected]>

* delete pods with failed state

Signed-off-by: Daniel Rammer <[email protected]>

* changed ShardType type to int instead of string

Signed-off-by: Daniel Rammer <[email protected]>

* removed default values in manager config

Signed-off-by: Daniel Rammer <[email protected]>

* updated config_flags with pflags generation

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 3, 2021
1 parent fa18267 commit c485750
Show file tree
Hide file tree
Showing 27 changed files with 1,914 additions and 59 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@ update_boilerplate:
.PHONY: linux_compile
linux_compile:
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /artifacts/kubectl-flyte ./cmd/kubectl-flyte/main.go

.PHONY: compile
compile:
mkdir -p ./bin
go build -o bin/flytepropeller ./cmd/controller/main.go
go build -o bin/flytepropeller-manager ./cmd/manager/main.go
go build -o bin/kubectl-flyte ./cmd/kubectl-flyte/main.go && cp bin/kubectl-flyte ${GOPATH}/bin

cross_compile:
@glide install
@mkdir -p ./bin/cross
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller ./cmd/controller/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/flytepropeller-manager ./cmd/manager/main.go
GOOS=linux GOARCH=amd64 go build -o bin/cross/kubectl-flyte ./cmd/kubectl-flyte/main.go

op_code_generate:
Expand All @@ -38,6 +41,11 @@ benchmark:
server:
@go run ./cmd/controller/main.go --alsologtostderr --propeller.kube-config=$(HOME)/.kube/config

# manager starts the manager service in development mode
.PHONY: manager
manager:
@go run ./cmd/manager/main.go --alsologtostderr --propeller.kube-config=$(HOME)/.kube/config

clean:
rm -rf bin

Expand Down
3 changes: 2 additions & 1 deletion cmd/controller/cmd/init_certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kubeErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/utils"

corev1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -84,7 +85,7 @@ func runCertsCmd(ctx context.Context, propellerCfg *config.Config, cfg *webhookC
return err
}

kubeClient, _, err := getKubeConfig(ctx, propellerCfg)
kubeClient, _, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}
Expand Down
58 changes: 27 additions & 31 deletions cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/flyteorg/flytestdlib/contextutils"

transformers "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"k8s.io/klog"

Expand All @@ -27,20 +28,15 @@ import (
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/pkg/errors"
"github.com/spf13/pflag"

"github.com/spf13/cobra"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

restclient "k8s.io/client-go/rest"

clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
"github.com/flyteorg/flytepropeller/pkg/controller"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
)

const (
Expand Down Expand Up @@ -116,39 +112,39 @@ func logAndExit(err error) {
os.Exit(-1)
}

func getKubeConfig(_ context.Context, cfg *config2.Config) (*kubernetes.Clientset, *restclient.Config, error) {
var kubecfg *restclient.Config
var err error
if cfg.KubeConfigPath != "" {
kubeConfigPath := os.ExpandEnv(cfg.KubeConfigPath)
kubecfg, err = clientcmd.BuildConfigFromFlags(cfg.MasterURL, kubeConfigPath)
if err != nil {
return nil, nil, errors.Wrapf(err, "Error building kubeconfig")
}
} else {
kubecfg, err = restclient.InClusterConfig()
if err != nil {
return nil, nil, errors.Wrapf(err, "Cannot get InCluster kubeconfig")
}
func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption {
selectors := []struct {
label string
operation v1.LabelSelectorOperator
values []string
}{
{transformers.ShardKeyLabel, v1.LabelSelectorOpIn, cfg.IncludeShardKeyLabel},
{transformers.ShardKeyLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeShardKeyLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpIn, cfg.IncludeProjectLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeProjectLabel},
{transformers.DomainLabel, v1.LabelSelectorOpIn, cfg.IncludeDomainLabel},
{transformers.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel},
}

kubecfg.QPS = cfg.KubeConfig.QPS
kubecfg.Burst = cfg.KubeConfig.Burst
kubecfg.Timeout = cfg.KubeConfig.Timeout.Duration
labelSelector := controller.IgnoreCompletedWorkflowsLabelSelector()
for _, selector := range selectors {
if len(selector.values) > 0 {
labelSelectorRequirement := v1.LabelSelectorRequirement{
Key: selector.label,
Operator: selector.operation,
Values: selector.values,
}

kubeClient, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return nil, nil, errors.Wrapf(err, "Error building kubernetes clientset")
labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement)
}
}
return kubeClient, kubecfg, err
}

func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption {
opts := []informers.SharedInformerOption{
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = v1.FormatLabelSelector(controller.IgnoreCompletedWorkflowsLabelSelector())
options.LabelSelector = v1.FormatLabelSelector(labelSelector)
}),
}

if cfg.LimitNamespace != defaultNamespace {
opts = append(opts, informers.WithNamespace(cfg.LimitNamespace))
}
Expand All @@ -166,7 +162,7 @@ func executeRootCmd(cfg *config2.Config) {
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

kubeClient, kubecfg, err := getKubeConfig(ctx, cfg)
kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
logger.Fatalf(ctx, "Error building kubernetes clientset: %s", err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
"github.com/flyteorg/flytepropeller/pkg/webhook"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
Expand Down Expand Up @@ -105,7 +106,7 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w

fmt.Println(string(raw))

kubeClient, kubecfg, err := getKubeConfig(ctx, propellerCfg)
kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}
Expand Down
202 changes: 202 additions & 0 deletions cmd/manager/cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Commands for FlytePropeller manager.
package cmd

import (
"context"
"flag"
"os"
"runtime"

"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/config/viper"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/version"

"github.com/flyteorg/flytepropeller/manager"
managerConfig "github.com/flyteorg/flytepropeller/manager/config"
propellerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)

const (
appName = "flytepropeller-manager"
podDefaultNamespace = "flyte"
podNameEnvVar = "POD_NAME"
podNamespaceEnvVar = "POD_NAMESPACE"
)

var (
cfgFile string
configAccessor = viper.NewAccessor(config.Options{StrictMode: true})
)

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: appName,
Short: "Runs FlytePropeller Manager to scale out FlytePropeller by executing multiple instances configured according to the defined sharding scheme.",
Long: `
FlytePropeller Manager is used to effectively scale out FlyteWorkflow processing among a collection of FlytePropeller instances. Users configure a sharding mechanism (ex. 'hash', 'project', or 'domain') to define the sharding environment.
The FlytePropeller Manager uses a kubernetes PodTemplate to construct the base FlytePropeller PodSpec. This means, apart from the configured sharding scheme, all managed FlytePropeller instances will be identical.
The Manager ensures liveness and correctness by periodically scanning kubernets pods and recovering state (ie. starting missing pods, etc). Live configuration updates are currently unsupported, meaning configuration changes require an application restart.
Sample configuration, illustrating 3 separate sharding techniques, is provided below:
manager:
pod-application: "flytepropeller"
pod-namespace: "flyte"
pod-template-name: "flytepropeller-template"
pod-template-namespace: "flyte"
scan-interval: 10s
shard:
# distribute FlyteWorkflow processing over 3 machines evenly
type: hash
pod-count: 3
# process the specified projects on defined replicas and all uncovered projects on another
type: project
enableUncoveredReplica: true
replicas:
- entities:
- flytesnacks
- entities:
- flyteexamples
- flytelab
# process the 'production' domain on a single instace and all other domains on another
type: domain
enableUncoveredReplica: true
replicas:
- entities:
- production
`,
PersistentPreRunE: initConfig,
Run: func(cmd *cobra.Command, args []string) {
executeRootCmd(propellerConfig.GetConfig(), managerConfig.GetConfig())
},
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
version.LogBuildInformation(appName)
logger.Infof(context.TODO(), "detected %d CPU's\n", runtime.NumCPU())
if err := rootCmd.Execute(); err != nil {
logger.Error(context.TODO(), err)
os.Exit(1)
}
}

func init() {
// allows `$ flytepropeller-manager --logtostderr` to work
klog.InitFlags(flag.CommandLine)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
err := flag.CommandLine.Parse([]string{})
if err != nil {
logAndExit(err)
}

// Here you will define your flags and configuration settings. Cobra supports persistent flags, which, if defined
// here, will be global for your application.
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "",
"config file (default is $HOME/config.yaml)")

configAccessor.InitializePflags(rootCmd.PersistentFlags())

rootCmd.AddCommand(viper.GetConfigCommand())
}

func initConfig(cmd *cobra.Command, _ []string) error {
configAccessor = viper.NewAccessor(config.Options{
StrictMode: false,
SearchPaths: []string{cfgFile},
})

configAccessor.InitializePflags(cmd.PersistentFlags())

err := configAccessor.UpdateConfig(context.TODO())
if err != nil {
return err
}

return nil
}

func logAndExit(err error) {
logger.Error(context.Background(), err)
os.Exit(-1)
}

func executeRootCmd(propellerCfg *propellerConfig.Config, cfg *managerConfig.Config) {
baseCtx := context.Background()

// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

// lookup owner reference
kubeClient, _, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
logger.Fatalf(ctx, "error building kubernetes clientset [%v]", err)
}

ownerReferences := make([]metav1.OwnerReference, 0)
lookupOwnerReferences := true
podName, found := os.LookupEnv(podNameEnvVar)
if !found {
lookupOwnerReferences = false
}

podNamespace, found := os.LookupEnv(podNamespaceEnvVar)
if !found {
lookupOwnerReferences = false
podNamespace = podDefaultNamespace
}

if lookupOwnerReferences {
p, err := kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
logger.Fatalf(ctx, "failed to get pod '%v' in namespace '%v' [%v]", podName, podNamespace, err)
}

for _, ownerReference := range p.OwnerReferences {
// must set owner reference controller to false because k8s does not allow setting pod
// owner references to a controller that does not acknowledge ownership. in this case
// the owner is technically the FlytePropeller Manager pod and not that pods owner.
*ownerReference.BlockOwnerDeletion = false
*ownerReference.Controller = false

ownerReferences = append(ownerReferences, ownerReference)
}
}

// Add the propeller_manager subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
scope := promutils.NewScope(propellerCfg.MetricsPrefix).NewSubScope("propeller_manager")

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "failed to start profiling and metrics server [%v]", err)
}
}()

m, err := manager.New(ctx, propellerCfg, cfg, podNamespace, ownerReferences, kubeClient, scope)
if err != nil {
logger.Fatalf(ctx, "failed to start manager [%v]", err)
} else if m == nil {
logger.Fatalf(ctx, "failed to start manager, nil manager received")
}

if err = m.Run(ctx); err != nil {
logger.Fatalf(ctx, "error running manager [%v]", err)
}
}
9 changes: 9 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package main

import (
"github.com/flyteorg/flytepropeller/cmd/manager/cmd"
)

func main() {
cmd.Execute()
}
Loading

0 comments on commit c485750

Please sign in to comment.