diff --git a/flytepropeller/cmd/controller/cmd/init_certs.go b/flytepropeller/cmd/controller/cmd/init_certs.go index 9e2167729c..8848176d0e 100644 --- a/flytepropeller/cmd/controller/cmd/init_certs.go +++ b/flytepropeller/cmd/controller/cmd/init_certs.go @@ -5,6 +5,8 @@ import ( "context" cryptorand "crypto/rand" + "github.com/flyteorg/flytepropeller/pkg/webhook" + webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config" "github.com/flyteorg/flytestdlib/logger" @@ -74,7 +76,7 @@ func init() { } func runCertsCmd(ctx context.Context, propellerCfg *config.Config, cfg *webhookConfig.Config) error { - podNamespace, found := os.LookupEnv(PodNamespaceEnvVar) + podNamespace, found := os.LookupEnv(webhook.PodNamespaceEnvVar) if !found { podNamespace = podDefaultNamespace } diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index ed6c159c07..cb83526d4b 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -1,4 +1,4 @@ -// Commands for FlytePropeller controller. +// Package cmd contains commands for FlytePropeller controller. package cmd import ( @@ -6,36 +6,23 @@ import ( "flag" "os" "runtime" - "runtime/pprof" - "github.com/flyteorg/flytestdlib/contextutils" - - transformers "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" - "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flytestdlib/profutils" "k8s.io/klog" - "sigs.k8s.io/controller-runtime/pkg/manager" - config2 "github.com/flyteorg/flytepropeller/pkg/controller/config" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/flyteorg/flytestdlib/config/viper" "github.com/flyteorg/flytestdlib/version" "github.com/flyteorg/flytestdlib/config" "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/profutils" - "github.com/flyteorg/flytestdlib/promutils" "github.com/spf13/pflag" "github.com/spf13/cobra" - clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned" - informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions" "github.com/flyteorg/flytepropeller/pkg/controller" "github.com/flyteorg/flytepropeller/pkg/signals" - "github.com/flyteorg/flytepropeller/pkg/utils" ) const ( @@ -55,8 +42,9 @@ var rootCmd = &cobra.Command{ Long: `Flyte Propeller runs a workflow to completion by recursing through the nodes, handling their tasks to completion and propagating their status upstream.`, PersistentPreRunE: initConfig, - Run: func(cmd *cobra.Command, args []string) { - executeRootCmd(config2.GetConfig()) + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + return executeRootCmd(ctx, config2.GetConfig()) }, } @@ -111,113 +99,16 @@ func logAndExit(err error) { os.Exit(-1) } -func sharedInformerOptions(cfg *config2.Config) []informers.SharedInformerOption { - selectors := []struct { - label string - operation v1.LabelSelectorOperator - values []string - }{ - {transformers.ShardKeyLabel, v1.LabelSelectorOpIn, cfg.IncludeShardKeyLabel}, - {transformers.ShardKeyLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeShardKeyLabel}, - {transformers.ProjectLabel, v1.LabelSelectorOpIn, cfg.IncludeProjectLabel}, - {transformers.ProjectLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeProjectLabel}, - {transformers.DomainLabel, v1.LabelSelectorOpIn, cfg.IncludeDomainLabel}, - {transformers.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel}, - } - - labelSelector := controller.IgnoreCompletedWorkflowsLabelSelector() - for _, selector := range selectors { - if len(selector.values) > 0 { - labelSelectorRequirement := v1.LabelSelectorRequirement{ - Key: selector.label, - Operator: selector.operation, - Values: selector.values, - } - - labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement) - } - } - - opts := []informers.SharedInformerOption{ - informers.WithTweakListOptions(func(options *v1.ListOptions) { - options.LabelSelector = v1.FormatLabelSelector(labelSelector) - }), - } - - if cfg.LimitNamespace != defaultNamespace { - opts = append(opts, informers.WithNamespace(cfg.LimitNamespace)) - } - return opts -} - -func executeRootCmd(cfg *config2.Config) { - baseCtx := context.Background() - +func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { // set up signals so we handle the first shutdown signal gracefully ctx := signals.SetupSignalHandler(baseCtx) - kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg) - if err != nil { - logger.Fatalf(ctx, "Error building kubernetes clientset: %s", err.Error()) - } - - flyteworkflowClient, err := clientset.NewForConfig(kubecfg) - if err != nil { - logger.Fatalf(ctx, "Error building example clientset: %s", err.Error()) - } - - opts := sharedInformerOptions(cfg) - flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...) - - // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. - propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace) - go func() { err := profutils.StartProfilingServerWithDefaultHandlers(ctx, cfg.ProfilerPort.Port, nil) if err != nil { - logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err) + logger.Fatalf(ctx, "Failed to Start profiling and metrics server. Error: %v", err) } }() - limitNamespace := "" - if cfg.LimitNamespace != defaultNamespace { - limitNamespace = cfg.LimitNamespace - } - - mgr, err := manager.New(kubecfg, manager.Options{ - Namespace: limitNamespace, - SyncPeriod: &cfg.DownstreamEval.Duration, - ClientBuilder: executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")), - }) - if err != nil { - logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err) - } - - // Start controller runtime manager to start listening to resource changes. - // K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer - // EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect - // workflow changes faster than the default sync interval for workflow CRDs. - go func(ctx context.Context) { - ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager") - pprof.SetGoroutineLabels(ctx) - logger.Infof(ctx, "Starting controller-runtime manager") - err := mgr.Start(ctx) - if err != nil { - logger.Fatalf(ctx, "Failed to start manager. Error: %v", err) - } - }(ctx) - - c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope) - if err != nil { - logger.Fatalf(ctx, "Failed to start Controller - [%v]", err.Error()) - return - } else if c == nil { - logger.Fatalf(ctx, "Failed to start Controller, nil controller received.") - } - - go flyteworkflowInformerFactory.Start(ctx.Done()) - - if err = c.Run(ctx); err != nil { - logger.Fatalf(ctx, "Error running controller: %s", err.Error()) - } + return controller.StartController(ctx, cfg, defaultNamespace) } diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index eb586690a4..40d94cab99 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -2,35 +2,18 @@ package cmd import ( "context" - "encoding/json" - "errors" - "fmt" - "os" webhookConfig "github.com/flyteorg/flytepropeller/pkg/webhook/config" + "github.com/flyteorg/flytestdlib/profutils" - apiErrors "k8s.io/apimachinery/pkg/api/errors" - - "k8s.io/client-go/kubernetes" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/signals" - "github.com/flyteorg/flytepropeller/pkg/utils" "github.com/flyteorg/flytepropeller/pkg/webhook" "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/profutils" - "github.com/flyteorg/flytestdlib/promutils" - "sigs.k8s.io/controller-runtime/pkg/manager" - - "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/spf13/cobra" ) const ( - PodNameEnvVar = "POD_NAME" - PodNamespaceEnvVar = "POD_NAMESPACE" podDefaultNamespace = "default" ) @@ -99,112 +82,11 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w // set up signals so we handle the first shutdown signal gracefully ctx := signals.SetupSignalHandler(origContext) - raw, err := json.Marshal(cfg) - if err != nil { - return err - } - - fmt.Println(string(raw)) - - kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, propellerCfg) - if err != nil { - return err - } - - // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. - propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace) - webhookScope := propellerScope.NewSubScope("webhook") - go func() { err := profutils.StartProfilingServerWithDefaultHandlers(ctx, propellerCfg.ProfilerPort.Port, nil) if err != nil { logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err) } }() - - limitNamespace := "" - if propellerCfg.LimitNamespace != defaultNamespace { - limitNamespace = propellerCfg.LimitNamespace - } - - secretsWebhook := webhook.NewPodMutator(cfg, webhookScope) - - // Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created. - err = createMutationConfig(ctx, kubeClient, secretsWebhook) - if err != nil { - return err - } - - mgr, err := manager.New(kubecfg, manager.Options{ - Port: cfg.ListenPort, - CertDir: cfg.CertDir, - Namespace: limitNamespace, - SyncPeriod: &propellerCfg.DownstreamEval.Duration, - ClientBuilder: executors.NewFallbackClientBuilder(webhookScope), - }) - - if err != nil { - logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err) - } - - err = secretsWebhook.Register(ctx, mgr) - if err != nil { - logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err) - } - - logger.Infof(ctx, "Starting controller-runtime manager") - return mgr.Start(ctx) -} - -func createMutationConfig(ctx context.Context, kubeClient *kubernetes.Clientset, webhookObj *webhook.PodMutator) error { - shouldAddOwnerRef := true - podName, found := os.LookupEnv(PodNameEnvVar) - if !found { - shouldAddOwnerRef = false - } - - podNamespace, found := os.LookupEnv(PodNamespaceEnvVar) - if !found { - shouldAddOwnerRef = false - podNamespace = podDefaultNamespace - } - - mutateConfig, err := webhookObj.CreateMutationWebhookConfiguration(podNamespace) - if err != nil { - return err - } - - if shouldAddOwnerRef { - // Lookup the pod to retrieve its UID - p, err := kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{}) - if err != nil { - logger.Infof(ctx, "Failed to get Pod [%v/%v]. Error: %v", podNamespace, podName, err) - return fmt.Errorf("failed to get pod. Error: %w", err) - } - - mutateConfig.OwnerReferences = p.OwnerReferences - } - - logger.Infof(ctx, "Creating MutatingWebhookConfiguration [%v/%v]", mutateConfig.GetNamespace(), mutateConfig.GetName()) - - _, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, mutateConfig, metav1.CreateOptions{}) - var statusErr *apiErrors.StatusError - if err != nil && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonAlreadyExists { - logger.Infof(ctx, "Failed to create MutatingWebhookConfiguration. Will attempt to update. Error: %v", err) - obj, getErr := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(ctx, mutateConfig.Name, metav1.GetOptions{}) - if getErr != nil { - logger.Infof(ctx, "Failed to get MutatingWebhookConfiguration. Error: %v", getErr) - return err - } - - obj.Webhooks = mutateConfig.Webhooks - _, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(ctx, obj, metav1.UpdateOptions{}) - if err == nil { - logger.Infof(ctx, "Successfully updated existing mutating webhook config.") - } - - return err - } - - return nil + return webhook.Run(ctx, propellerCfg, cfg, defaultNamespace) } diff --git a/flytepropeller/events/config.go b/flytepropeller/events/config.go index 5b0213c4a7..48625b57e1 100644 --- a/flytepropeller/events/config.go +++ b/flytepropeller/events/config.go @@ -31,12 +31,13 @@ var ( defaultConfig = Config{ Rate: int64(500), Capacity: 1000, + Type: EventSinkAdmin, } configSection = config.MustRegisterSection(configSectionKey, &defaultConfig) ) -// Retrieve current global config for storage. +// GetConfig Retrieves current global config for storage. func GetConfig(ctx context.Context) *Config { if c, ok := configSection.GetConfig().(*Config); ok { return c diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index a6a682e27e..ec7648de02 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -8,6 +8,10 @@ import ( "runtime/pprof" "time" + "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/manager" + "google.golang.org/grpc" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" @@ -76,7 +80,7 @@ type Controller struct { levelMonitor *ResourceLevelMonitor } -// Runs either as a leader -if configured- or as a standalone process. +// Run either as a leader -if configured- or as a standalone process. func (c *Controller) Run(ctx context.Context) error { if c.leaderElector == nil { logger.Infof(ctx, "Running without leader election.") @@ -189,9 +193,9 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler { } } -// This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain. -// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also -// a timer measuring how long it takes to run each measurement cycle. +// ResourceLevelMonitor is responsible for emitting metrics that show the current number of Flyte workflows, +// by project and domain. It needs to be kicked off. The periodicity is not currently configurable because it seems +// unnecessary. It will also a timer measuring how long it takes to run each measurement cycle. type ResourceLevelMonitor struct { Scope promutils.Scope @@ -297,7 +301,7 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, opt return clients.AdminClient(), clients.AuthOpt(), nil } -// NewController returns a new FlyteWorkflow controller +// New returns a new FlyteWorkflow controller func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) { @@ -422,3 +426,108 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter flyteworkflowInformer.Informer().AddEventHandler(controller.getWorkflowUpdatesHandler()) return controller, nil } + +// SharedInformerOptions creates informer options to work with FlytePropeller Sharding +func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption { + selectors := []struct { + label string + operation v1.LabelSelectorOperator + values []string + }{ + {k8s.ShardKeyLabel, v1.LabelSelectorOpIn, cfg.IncludeShardKeyLabel}, + {k8s.ShardKeyLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeShardKeyLabel}, + {k8s.ProjectLabel, v1.LabelSelectorOpIn, cfg.IncludeProjectLabel}, + {k8s.ProjectLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeProjectLabel}, + {k8s.DomainLabel, v1.LabelSelectorOpIn, cfg.IncludeDomainLabel}, + {k8s.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel}, + } + + labelSelector := IgnoreCompletedWorkflowsLabelSelector() + for _, selector := range selectors { + if len(selector.values) > 0 { + labelSelectorRequirement := v1.LabelSelectorRequirement{ + Key: selector.label, + Operator: selector.operation, + Values: selector.values, + } + + labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement) + } + } + + opts := []informers.SharedInformerOption{ + informers.WithTweakListOptions(func(options *v1.ListOptions) { + options.LabelSelector = v1.FormatLabelSelector(labelSelector) + }), + } + + if cfg.LimitNamespace != defaultNamespace { + opts = append(opts, informers.WithNamespace(cfg.LimitNamespace)) + } + return opts +} + +// StartController creates a new FlytePropeller Controller and starts it +func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string) error { + // Setup cancel on the context + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg) + if err != nil { + return errors.Wrapf(err, "error building Kubernetes Clientset") + } + + flyteworkflowClient, err := clientset.NewForConfig(kubecfg) + if err != nil { + return errors.Wrapf(err, "error building FlyteWorkflow clientset") + } + + opts := SharedInformerOptions(cfg, defaultNamespace) + flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...) + + // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. + propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace) + + limitNamespace := "" + if cfg.LimitNamespace != defaultNamespace { + limitNamespace = cfg.LimitNamespace + } + + mgr, err := manager.New(kubecfg, manager.Options{ + Namespace: limitNamespace, + SyncPeriod: &cfg.DownstreamEval.Duration, + ClientBuilder: executors.NewFallbackClientBuilder(propellerScope.NewSubScope("kube")), + }) + if err != nil { + return errors.Wrapf(err, "failed to initialize controller-runtime manager") + } + + // Start controller runtime manager to start listening to resource changes. + // K8sPluginManager uses controller runtime to create informers for the CRDs being monitored by plugins. The informer + // EventHandler enqueues the owner workflow for reevaluation. These informer events allow propeller to detect + // workflow changes faster than the default sync interval for workflow CRDs. + go func(ctx context.Context) { + ctx = contextutils.WithGoroutineLabel(ctx, "controller-runtime-manager") + pprof.SetGoroutineLabels(ctx) + logger.Infof(ctx, "Starting controller-runtime manager") + err := mgr.Start(ctx) + if err != nil { + logger.Fatalf(ctx, "Failed to start manager. Error: %v", err) + } + }(ctx) + + c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope) + if err != nil { + return errors.Wrap(err, "failed to start FlytePropeller") + } else if c == nil { + return errors.Errorf("Failed to create a new instance of FlytePropeller") + } + + go flyteworkflowInformerFactory.Start(ctx.Done()) + + if err = c.Run(ctx); err != nil { + return errors.Wrapf(err, "Error running FlytePropeller.") + } + return nil +} diff --git a/flytepropeller/pkg/webhook/entrypoint.go b/flytepropeller/pkg/webhook/entrypoint.go new file mode 100644 index 0000000000..1f0828c8bb --- /dev/null +++ b/flytepropeller/pkg/webhook/entrypoint.go @@ -0,0 +1,129 @@ +package webhook + +import ( + "context" + "encoding/json" + errors2 "errors" + "fmt" + "os" + + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/flyteorg/flytepropeller/pkg/utils" + config2 "github.com/flyteorg/flytepropeller/pkg/webhook/config" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + PodNameEnvVar = "POD_NAME" + PodNamespaceEnvVar = "POD_NAMESPACE" +) + +func Run(ctx context.Context, propellerCfg *config.Config, cfg *config2.Config, defaultNamespace string) error { + raw, err := json.Marshal(cfg) + if err != nil { + return err + } + + fmt.Println(string(raw)) + + kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, propellerCfg) + if err != nil { + return err + } + + // Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics. + propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(propellerCfg.LimitNamespace) + webhookScope := propellerScope.NewSubScope("webhook") + + limitNamespace := "" + if propellerCfg.LimitNamespace != defaultNamespace { + limitNamespace = propellerCfg.LimitNamespace + } + + secretsWebhook := NewPodMutator(cfg, webhookScope) + + // Creates a MutationConfig to instruct ApiServer to call this service whenever a Pod is being created. + err = createMutationConfig(ctx, kubeClient, secretsWebhook, defaultNamespace) + if err != nil { + return err + } + + mgr, err := manager.New(kubecfg, manager.Options{ + Port: cfg.ListenPort, + CertDir: cfg.CertDir, + Namespace: limitNamespace, + SyncPeriod: &propellerCfg.DownstreamEval.Duration, + ClientBuilder: executors.NewFallbackClientBuilder(webhookScope), + }) + + if err != nil { + logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err) + } + + err = secretsWebhook.Register(ctx, mgr) + if err != nil { + logger.Fatalf(ctx, "Failed to register webhook with manager. Error: %v", err) + } + + logger.Infof(ctx, "Starting controller-runtime manager") + return mgr.Start(ctx) +} + +func createMutationConfig(ctx context.Context, kubeClient *kubernetes.Clientset, webhookObj *PodMutator, defaultNamespace string) error { + shouldAddOwnerRef := true + podName, found := os.LookupEnv(PodNameEnvVar) + if !found { + shouldAddOwnerRef = false + } + + podNamespace, found := os.LookupEnv(PodNamespaceEnvVar) + if !found { + shouldAddOwnerRef = false + podNamespace = defaultNamespace + } + + mutateConfig, err := webhookObj.CreateMutationWebhookConfiguration(podNamespace) + if err != nil { + return err + } + + if shouldAddOwnerRef { + // Lookup the pod to retrieve its UID + p, err := kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, v1.GetOptions{}) + if err != nil { + logger.Infof(ctx, "Failed to get Pod [%v/%v]. Error: %v", podNamespace, podName, err) + return fmt.Errorf("failed to get pod. Error: %w", err) + } + + mutateConfig.OwnerReferences = p.OwnerReferences + } + + logger.Infof(ctx, "Creating MutatingWebhookConfiguration [%v/%v]", mutateConfig.GetNamespace(), mutateConfig.GetName()) + + _, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, mutateConfig, v1.CreateOptions{}) + var statusErr *errors.StatusError + if err != nil && errors2.As(err, &statusErr) && statusErr.Status().Reason == v1.StatusReasonAlreadyExists { + logger.Infof(ctx, "Failed to create MutatingWebhookConfiguration. Will attempt to update. Error: %v", err) + obj, getErr := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(ctx, mutateConfig.Name, v1.GetOptions{}) + if getErr != nil { + logger.Infof(ctx, "Failed to get MutatingWebhookConfiguration. Error: %v", getErr) + return err + } + + obj.Webhooks = mutateConfig.Webhooks + _, err = kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(ctx, obj, v1.UpdateOptions{}) + if err == nil { + logger.Infof(ctx, "Successfully updated existing mutating webhook config.") + } + + return err + } + + return nil +}