Skip to content

Commit

Permalink
Refactor for single binary (flyteorg#399)
Browse files Browse the repository at this point in the history
* default config for events

Signed-off-by: Ketan Umare <[email protected]>

* Refactor flytepropeller to allow external start

Signed-off-by: Ketan Umare <[email protected]>

* refactor lint fix

Signed-off-by: Ketan Umare <[email protected]>

* entrypoint

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Feb 15, 2022
1 parent 40adcd1 commit 83f5d32
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 245 deletions.
4 changes: 3 additions & 1 deletion flytepropeller/cmd/controller/cmd/init_certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
cryptorand "crypto/rand"

"github.com/flyteorg/flytepropeller/pkg/webhook"

webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config"

"github.com/flyteorg/flytestdlib/logger"
Expand Down Expand Up @@ -74,7 +76,7 @@ func init() {
}

func runCertsCmd(ctx context.Context, propellerCfg *config.Config, cfg *webhookConfig.Config) error {
podNamespace, found := os.LookupEnv(PodNamespaceEnvVar)
podNamespace, found := os.LookupEnv(webhook.PodNamespaceEnvVar)
if !found {
podNamespace = podDefaultNamespace
}
Expand Down
125 changes: 8 additions & 117 deletions flytepropeller/cmd/controller/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
// Commands for FlytePropeller controller.
// Package cmd contains commands for FlytePropeller controller.
package cmd

import (
"context"
"flag"
"os"
"runtime"
"runtime/pprof"

"github.com/flyteorg/flytestdlib/contextutils"

transformers "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytestdlib/profutils"
"k8s.io/klog"

"sigs.k8s.io/controller-runtime/pkg/manager"

config2 "github.com/flyteorg/flytepropeller/pkg/controller/config"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flytestdlib/config/viper"
"github.com/flyteorg/flytestdlib/version"

"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/spf13/pflag"

"github.com/spf13/cobra"

clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
"github.com/flyteorg/flytepropeller/pkg/controller"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
)

const (
Expand All @@ -55,8 +42,9 @@ var rootCmd = &cobra.Command{
Long: `Flyte Propeller runs a workflow to completion by recursing through the nodes,
handling their tasks to completion and propagating their status upstream.`,
PersistentPreRunE: initConfig,
Run: func(cmd *cobra.Command, args []string) {
executeRootCmd(config2.GetConfig())
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
return executeRootCmd(ctx, config2.GetConfig())
},
}

Expand Down Expand Up @@ -111,113 +99,16 @@ func logAndExit(err error) {
os.Exit(-1)
}

func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption {
selectors := []struct {
label string
operation v1.LabelSelectorOperator
values []string
}{
{transformers.ShardKeyLabel, v1.LabelSelectorOpIn, cfg.IncludeShardKeyLabel},
{transformers.ShardKeyLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeShardKeyLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpIn, cfg.IncludeProjectLabel},
{transformers.ProjectLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeProjectLabel},
{transformers.DomainLabel, v1.LabelSelectorOpIn, cfg.IncludeDomainLabel},
{transformers.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel},
}

labelSelector := controller.IgnoreCompletedWorkflowsLabelSelector()
for _, selector := range selectors {
if len(selector.values) > 0 {
labelSelectorRequirement := v1.LabelSelectorRequirement{
Key: selector.label,
Operator: selector.operation,
Values: selector.values,
}

labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement)
}
}

opts := []informers.SharedInformerOption{
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = v1.FormatLabelSelector(labelSelector)
}),
}

if cfg.LimitNamespace != defaultNamespace {
opts = append(opts, informers.WithNamespace(cfg.LimitNamespace))
}
return opts
}

func executeRootCmd(cfg *config2.Config) {
baseCtx := context.Background()

func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(baseCtx)

kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
logger.Fatalf(ctx, "Error building kubernetes clientset: %s", err.Error())
}

flyteworkflowClient, err := clientset.NewForConfig(kubecfg)
if err != nil {
logger.Fatalf(ctx, "Error building example clientset: %s", err.Error())
}

opts := sharedInformerOptions(cfg)
flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...)

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, cfg.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
logger.Fatalf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
}
}()

limitNamespace := ""
if cfg.LimitNamespace != defaultNamespace {
limitNamespace = cfg.LimitNamespace
}

mgr, err := manager.New(kubecfg, manager.Options{
Namespace: limitNamespace,
SyncPeriod: &cfg.DownstreamEval.Duration,
ClientBuilder: executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")),
})
if err != nil {
logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err)
}

// Start controller runtime manager to start listening to resource changes.
// K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer
// EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect
// workflow changes faster than the default sync interval for workflow CRDs.
go func(ctx context.Context) {
ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager")
pprof.SetGoroutineLabels(ctx)
logger.Infof(ctx, "Starting controller-runtime manager")
err := mgr.Start(ctx)
if err != nil {
logger.Fatalf(ctx, "Failed to start manager. Error: %v", err)
}
}(ctx)

c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope)
if err != nil {
logger.Fatalf(ctx, "Failed to start Controller - [%v]", err.Error())
return
} else if c == nil {
logger.Fatalf(ctx, "Failed to start Controller, nil controller received.")
}

go flyteworkflowInformerFactory.Start(ctx.Done())

if err = c.Run(ctx); err != nil {
logger.Fatalf(ctx, "Error running controller: %s", err.Error())
}
return controller.StartController(ctx, cfg, defaultNamespace)
}
124 changes: 3 additions & 121 deletions flytepropeller/cmd/controller/cmd/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,18 @@ package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"

webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config"
"github.com/flyteorg/flytestdlib/profutils"

apiErrors "k8s.io/apimachinery/pkg/api/errors"

"k8s.io/client-go/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/signals"
"github.com/flyteorg/flytepropeller/pkg/utils"
"github.com/flyteorg/flytepropeller/pkg/webhook"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/spf13/cobra"
)

const (
PodNameEnvVar = "POD_NAME"
PodNamespaceEnvVar = "POD_NAMESPACE"
podDefaultNamespace = "default"
)

Expand Down Expand Up @@ -99,112 +82,11 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.SetupSignalHandler(origContext)

raw, err := json.Marshal(cfg)
if err != nil {
return err
}

fmt.Println(string(raw))

kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, propellerCfg)
if err != nil {
return err
}

// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace)
webhookScope := propellerScope.NewSubScope("webhook")

go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
}
}()

limitNamespace := ""
if propellerCfg.LimitNamespace != defaultNamespace {
limitNamespace = propellerCfg.LimitNamespace
}

secretsWebhook := webhook.NewPodMutator(cfg, webhookScope)

// Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created.
err = createMutationConfig(ctx, kubeClient, secretsWebhook)
if err != nil {
return err
}

mgr, err := manager.New(kubecfg, manager.Options{
Port: cfg.ListenPort,
CertDir: cfg.CertDir,
Namespace: limitNamespace,
SyncPeriod: &propellerCfg.DownstreamEval.Duration,
ClientBuilder: executors.NewFallbackClientBuilder(webhookScope),
})

if err != nil {
logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err)
}

err = secretsWebhook.Register(ctx, mgr)
if err != nil {
logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err)
}

logger.Infof(ctx, "Starting controller-runtime manager")
return mgr.Start(ctx)
}

func createMutationConfig(ctx context.Context, kubeClient *kubernetes.Clientset, webhookObj *webhook.PodMutator) error {
shouldAddOwnerRef := true
podName, found := os.LookupEnv(PodNameEnvVar)
if !found {
shouldAddOwnerRef = false
}

podNamespace, found := os.LookupEnv(PodNamespaceEnvVar)
if !found {
shouldAddOwnerRef = false
podNamespace = podDefaultNamespace
}

mutateConfig, err := webhookObj.CreateMutationWebhookConfiguration(podNamespace)
if err != nil {
return err
}

if shouldAddOwnerRef {
// Lookup the pod to retrieve its UID
p, err := kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
logger.Infof(ctx, "Failed to get Pod [%v/%v]. Error: %v", podNamespace, podName, err)
return fmt.Errorf("failed to get pod. Error: %w", err)
}

mutateConfig.OwnerReferences = p.OwnerReferences
}

logger.Infof(ctx, "Creating MutatingWebhookConfiguration [%v/%v]", mutateConfig.GetNamespace(), mutateConfig.GetName())

_, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, mutateConfig, metav1.CreateOptions{})
var statusErr *apiErrors.StatusError
if err != nil && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonAlreadyExists {
logger.Infof(ctx, "Failed to create MutatingWebhookConfiguration. Will attempt to update. Error: %v", err)
obj, getErr := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(ctx, mutateConfig.Name, metav1.GetOptions{})
if getErr != nil {
logger.Infof(ctx, "Failed to get MutatingWebhookConfiguration. Error: %v", getErr)
return err
}

obj.Webhooks = mutateConfig.Webhooks
_, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(ctx, obj, metav1.UpdateOptions{})
if err == nil {
logger.Infof(ctx, "Successfully updated existing mutating webhook config.")
}

return err
}

return nil
return webhook.Run(ctx, propellerCfg, cfg, defaultNamespace)
}
3 changes: 2 additions & 1 deletion flytepropeller/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ var (
defaultConfig = Config{
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
}

configSection = config.MustRegisterSection(configSectionKey, &defaultConfig)
)

// Retrieve current global config for storage.
// GetConfig Retrieves current global config for storage.
func GetConfig(ctx context.Context) *Config {
if c, ok := configSection.GetConfig().(*Config); ok {
return c
Expand Down
Loading

0 comments on commit 83f5d32

Please sign in to comment.