From 01218d2636a30c05b3daf362c335f2b21b793b49 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 30 Mar 2023 12:21:50 -0500 Subject: [PATCH 1/3] persisting k8s plugin state between evaluations (#540) * persisting k8s plugin state between evaluations Signed-off-by: Daniel Rammer * fixed unit tests and linter Signed-off-by: Daniel Rammer * added docs Signed-off-by: Daniel Rammer * updating flyteplugins dep Signed-off-by: Daniel Rammer * added unit tests Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 +- .../nodes/task/k8s/plugin_context.go | 35 +++- .../nodes/task/k8s/plugin_manager.go | 50 ++++-- .../nodes/task/k8s/plugin_manager_test.go | 152 ++++++++++++++++++ 5 files changed, 228 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index a2e9a134b..7a5caddbf 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 github.com/flyteorg/flyteidl v1.3.14 - github.com/flyteorg/flyteplugins v1.0.43 + github.com/flyteorg/flyteplugins v1.0.44 github.com/flyteorg/flytestdlib v1.0.15 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 92e4c9d37..6b4e8cba5 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.43 h1:uI/Y88xqJKfvfuxfu0Sw9CNZ7iu3+HUwwRhxh558cbs= -github.com/flyteorg/flyteplugins v1.0.43/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= +github.com/flyteorg/flyteplugins v1.0.44 h1:uKizng+i0vfXslyPBlrsfecInhvy71fTB4kRg7eiifE= +github.com/flyteorg/flyteplugins v1.0.44/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/pkg/controller/nodes/task/k8s/plugin_context.go b/pkg/controller/nodes/task/k8s/plugin_context.go index cb90edfb3..aed5bc468 100644 --- a/pkg/controller/nodes/task/k8s/plugin_context.go +++ b/pkg/controller/nodes/task/k8s/plugin_context.go @@ -2,6 +2,7 @@ package k8s import ( "context" + "fmt" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" @@ -15,7 +16,8 @@ var _ k8s.PluginContext = &pluginContext{} type pluginContext struct { pluginsCore.TaskExecutionContext // Lazily creates a buffered outputWriter, overriding the input outputWriter. - ow *ioutils.BufferedOutputWriter + ow *ioutils.BufferedOutputWriter + k8sPluginState *k8s.PluginState } // Provides an output sync of type io.OutputWriter @@ -26,9 +28,38 @@ func (p *pluginContext) OutputWriter() io.OutputWriter { return buf } -func newPluginContext(tCtx pluginsCore.TaskExecutionContext) *pluginContext { +// pluginStateReader overrides the default PluginStateReader to return a pre-assigned PluginState. This allows us to +// encapsulate plugin state persistence in the existing k8s PluginManager and only expose the ability to read the +// previous Phase, PhaseVersion, and Reason for all k8s plugins. +type pluginStateReader struct { + k8sPluginState *k8s.PluginState +} + +func (p pluginStateReader) GetStateVersion() uint8 { + return 0 +} + +func (p pluginStateReader) Get(t interface{}) (stateVersion uint8, err error) { + if pointer, ok := t.(*k8s.PluginState); ok { + *pointer = *p.k8sPluginState + } else { + return 0, fmt.Errorf("unexpected type when reading plugin state") + } + + return 0, nil +} + +// PluginStateReader overrides the default behavior to return our k8s plugin specific reader. +func (p *pluginContext) PluginStateReader() pluginsCore.PluginStateReader { + return pluginStateReader{ + k8sPluginState: p.k8sPluginState, + } +} + +func newPluginContext(tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) *pluginContext { return &pluginContext{ TaskExecutionContext: tCtx, ow: nil, + k8sPluginState: k8sPluginState, } } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index e0d786858..67b0356a3 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -59,7 +59,8 @@ const ( ) type PluginState struct { - Phase PluginPhase + Phase PluginPhase + K8sPluginState k8s.PluginState } type PluginMetrics struct { @@ -247,7 +248,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil } -func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { +func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) if err != nil { @@ -274,7 +275,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore e.metrics.ResourceDeleted.Inc(ctx) } - pCtx := newPluginContext(tCtx) + pCtx := newPluginContext(tCtx, k8sPluginState) p, err := e.plugin.GetTaskPhase(ctx, pCtx, o) if err != nil { logger.Warnf(ctx, "failed to check status of resource in plugin [%s], with error: %s", e.GetID(), err.Error()) @@ -311,6 +312,7 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore } func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { + // read phase state ps := PluginState{} if v, err := tCtx.PluginStateReader().Get(&ps); err != nil { if v != pluginStateVersion { @@ -318,16 +320,44 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } + + // evaluate plugin + var err error + var transition pluginsCore.Transition + pluginPhase := ps.Phase if ps.Phase == PluginPhaseNotStarted { - t, err := e.LaunchResource(ctx, tCtx) - if err == nil && t.Info().Phase() == pluginsCore.PhaseQueued { - if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &PluginState{Phase: PluginPhaseStarted}); err != nil { - return pluginsCore.UnknownTransition, err - } + transition, err = e.LaunchResource(ctx, tCtx) + if err == nil && transition.Info().Phase() == pluginsCore.PhaseQueued { + pluginPhase = PluginPhaseStarted } - return t, err + } else { + transition, err = e.CheckResourcePhase(ctx, tCtx, &ps.K8sPluginState) + } + + if err != nil { + return transition, err } - return e.CheckResourcePhase(ctx, tCtx) + + // persist any changes in phase state + k8sPluginState := ps.K8sPluginState + if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || + k8sPluginState.PhaseVersion != transition.Info().Version() || k8sPluginState.Reason != transition.Info().Reason() { + + newPluginState := PluginState{ + Phase: pluginPhase, + K8sPluginState: k8s.PluginState{ + Phase: transition.Info().Phase(), + PhaseVersion: transition.Info().Version(), + Reason: transition.Info().Reason(), + }, + } + + if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &newPluginState); err != nil { + return pluginsCore.UnknownTransition, err + } + } + + return transition, err } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 160dc335f..94b6b5524 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "testing" "k8s.io/client-go/kubernetes/scheme" @@ -715,6 +716,157 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { } } +func TestPluginManager_Handle_PluginState(t *testing.T) { + ctx := context.TODO() + tm := getMockTaskExecutionMetadata() + res := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: tm.GetTaskExecutionID().GetGeneratedName(), + Namespace: tm.GetNamespace(), + }, + } + + pluginStateQueued := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 0, + Reason: "foo", + }, + } + pluginStateQueuedVersion1 := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 1, + Reason: "foo", + }, + } + pluginStateQueuedReasonBar := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseQueued, + PhaseVersion: 0, + Reason: "bar", + }, + } + pluginStateRunning := PluginState{ + Phase: PluginPhaseStarted, + K8sPluginState: k8s.PluginState{ + Phase: pluginsCore.PhaseRunning, + PhaseVersion: 0, + Reason: "", + }, + } + + phaseInfoQueued := pluginsCore.PhaseInfoQueuedWithTaskInfo(pluginStateQueued.K8sPluginState.PhaseVersion, pluginStateQueued.K8sPluginState.Reason, nil) + phaseInfoQueuedVersion1 := pluginsCore.PhaseInfoQueuedWithTaskInfo( + pluginStateQueuedVersion1.K8sPluginState.PhaseVersion, + pluginStateQueuedVersion1.K8sPluginState.Reason, + nil, + ) + phaseInfoQueuedReasonBar := pluginsCore.PhaseInfoQueuedWithTaskInfo( + pluginStateQueuedReasonBar.K8sPluginState.PhaseVersion, + pluginStateQueuedReasonBar.K8sPluginState.Reason, + nil, + ) + phaseInfoRunning := pluginsCore.PhaseInfoRunning(0, nil) + + tests := []struct { + name string + startPluginState PluginState + reportedPhaseInfo pluginsCore.PhaseInfo + expectedPluginState PluginState + }{ + { + "NoChange", + pluginStateQueued, + phaseInfoQueued, + pluginStateQueued, + }, + { + "K8sPhaseChange", + pluginStateQueued, + phaseInfoRunning, + pluginStateRunning, + }, + { + "PhaseVersionChange", + pluginStateQueued, + phaseInfoQueuedVersion1, + pluginStateQueuedVersion1, + }, + { + "ReasonChange", + pluginStateQueued, + phaseInfoQueuedReasonBar, + pluginStateQueuedReasonBar, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // mock TaskExecutionContext + tCtx := &pluginsCoreMock.TaskExecutionContext{} + tCtx.OnTaskExecutionMetadata().Return(getMockTaskExecutionMetadata()) + + tReader := &pluginsCoreMock.TaskReader{} + tReader.OnReadMatch(mock.Anything).Return(&core.TaskTemplate{}, nil) + tCtx.OnTaskReader().Return(tReader) + + // mock state reader / writer to use local pluginState variable + pluginState := &tt.startPluginState + customStateReader := &pluginsCoreMock.PluginStateReader{} + customStateReader.OnGetMatch(mock.MatchedBy(func(i interface{}) bool { + ps, ok := i.(*PluginState) + if ok { + *ps = *pluginState + return true + } + return false + })).Return(uint8(0), nil) + tCtx.OnPluginStateReader().Return(customStateReader) + + customStateWriter := &pluginsCoreMock.PluginStateWriter{} + customStateWriter.OnPutMatch(mock.Anything, mock.MatchedBy(func(i interface{}) bool { + ps, ok := i.(*PluginState) + if ok { + *pluginState = *ps + } + return ok + })).Return(nil) + tCtx.OnPluginStateWriter().Return(customStateWriter) + tCtx.OnOutputWriter().Return(&dummyOutputWriter{}) + + fc := extendedFakeClient{Client: fake.NewFakeClient(res)} + + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) + mockResourceHandler.On("BuildIdentityResource", mock.Anything, tCtx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.On("GetTaskPhase", mock.Anything, mock.Anything, mock.Anything).Return(tt.reportedPhaseInfo, nil) + + // create new PluginManager + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }, NewResourceMonitorIndex()) + assert.NoError(t, err) + + // handle plugin + _, err = pluginManager.Handle(ctx, tCtx) + assert.NoError(t, err) + + // verify expected PluginState + newPluginState := PluginState{} + _, err = tCtx.PluginStateReader().Get(&newPluginState) + assert.NoError(t, err) + + assert.True(t, reflect.DeepEqual(newPluginState, tt.expectedPluginState)) + }) + } +} + func TestPluginManager_CustomKubeClient(t *testing.T) { ctx := context.TODO() tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted) From f88f165ccf8f9896223dfc61936948fda3de29e4 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Thu, 30 Mar 2023 17:57:36 -0700 Subject: [PATCH 2/3] Add support for GCP secret manager (#547) Signed-off-by: Jeev B --- pkg/webhook/config/config.go | 23 +++ pkg/webhook/config/config_flags.go | 1 + pkg/webhook/config/config_flags_test.go | 14 ++ .../config/secretmanagertype_enumer.go | 9 +- pkg/webhook/gcp_secret_manager.go | 156 ++++++++++++++++++ pkg/webhook/gcp_secret_manager_test.go | 79 +++++++++ pkg/webhook/secrets.go | 1 + 7 files changed, 279 insertions(+), 4 deletions(-) create mode 100644 pkg/webhook/gcp_secret_manager.go create mode 100644 pkg/webhook/gcp_secret_manager_test.go diff --git a/pkg/webhook/config/config.go b/pkg/webhook/config/config.go index 61c598c1d..c3cf9f2d8 100644 --- a/pkg/webhook/config/config.go +++ b/pkg/webhook/config/config.go @@ -33,6 +33,19 @@ var ( }, }, }, + GCPSecretManagerConfig: GCPSecretManagerConfig{ + SidecarImage: "gcr.io/google.com/cloudsdktool/cloud-sdk:alpine", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("500Mi"), + corev1.ResourceCPU: resource.MustParse("200m"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("500Mi"), + corev1.ResourceCPU: resource.MustParse("200m"), + }, + }, + }, VaultSecretManagerConfig: VaultSecretManagerConfig{ Role: "flyte", KVVersion: KVVersion2, @@ -57,6 +70,10 @@ const ( // Manager and mount them to a local file system (in memory) and share that mount with other containers in the pod. SecretManagerTypeAWS + // SecretManagerTypeGCP defines a secret manager webhook that injects a side car to pull secrets from GCP Secret + // Manager and mount them to a local file system (in memory) and share that mount with other containers in the pod. + SecretManagerTypeGCP + // SecretManagerTypeVault defines a secret manager webhook that pulls secrets from Hashicorp Vault. SecretManagerTypeVault ) @@ -81,6 +98,7 @@ type Config struct { SecretName string `json:"secretName" pflag:",Secret name to write generated certs to."` SecretManagerType SecretManagerType `json:"secretManagerType" pflag:"-,Secret manager type to use if secrets are not found in global secrets."` AWSSecretManagerConfig AWSSecretManagerConfig `json:"awsSecretManager" pflag:",AWS Secret Manager config."` + GCPSecretManagerConfig GCPSecretManagerConfig `json:"gcpSecretManager" pflag:",GCP Secret Manager config."` VaultSecretManagerConfig VaultSecretManagerConfig `json:"vaultSecretManager" pflag:",Vault Secret Manager config."` } @@ -89,6 +107,11 @@ type AWSSecretManagerConfig struct { Resources corev1.ResourceRequirements `json:"resources" pflag:"-,Specifies resource requirements for the init container."` } +type GCPSecretManagerConfig struct { + SidecarImage string `json:"sidecarImage" pflag:",Specifies the sidecar docker image to use"` + Resources corev1.ResourceRequirements `json:"resources" pflag:"-,Specifies resource requirements for the init container."` +} + type VaultSecretManagerConfig struct { Role string `json:"role" pflag:",Specifies the vault role to use"` KVVersion KVVersion `json:"kvVersion" pflag:"-,The KV Engine Version. Defaults to 2. Use 1 for unversioned secrets. Refer to - https://www.vaultproject.io/docs/secrets/kv#kv-secrets-engine."` diff --git a/pkg/webhook/config/config_flags.go b/pkg/webhook/config/config_flags.go index 7ef9575d7..089bc0064 100755 --- a/pkg/webhook/config/config_flags.go +++ b/pkg/webhook/config/config_flags.go @@ -58,6 +58,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "servicePort"), DefaultConfig.ServicePort, "The port on the service that hosting webhook.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "secretName"), DefaultConfig.SecretName, "Secret name to write generated certs to.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "awsSecretManager.sidecarImage"), DefaultConfig.AWSSecretManagerConfig.SidecarImage, "Specifies the sidecar docker image to use") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gcpSecretManager.sidecarImage"), DefaultConfig.GCPSecretManagerConfig.SidecarImage, "Specifies the sidecar docker image to use") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "vaultSecretManager.role"), DefaultConfig.VaultSecretManagerConfig.Role, "Specifies the vault role to use") return cmdFlags } diff --git a/pkg/webhook/config/config_flags_test.go b/pkg/webhook/config/config_flags_test.go index e68b5af13..613a0f6a3 100755 --- a/pkg/webhook/config/config_flags_test.go +++ b/pkg/webhook/config/config_flags_test.go @@ -211,6 +211,20 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_gcpSecretManager.sidecarImage", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("gcpSecretManager.sidecarImage", testValue) + if vString, err := cmdFlags.GetString("gcpSecretManager.sidecarImage"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.GCPSecretManagerConfig.SidecarImage) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_vaultSecretManager.role", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/pkg/webhook/config/secretmanagertype_enumer.go b/pkg/webhook/config/secretmanagertype_enumer.go index ce33f910f..986b8b135 100644 --- a/pkg/webhook/config/secretmanagertype_enumer.go +++ b/pkg/webhook/config/secretmanagertype_enumer.go @@ -7,9 +7,9 @@ import ( "fmt" ) -const _SecretManagerTypeName = "GlobalK8sAWSVault" +const _SecretManagerTypeName = "GlobalK8sAWSGCPVault" -var _SecretManagerTypeIndex = [...]uint8{0, 6, 9, 12, 17} +var _SecretManagerTypeIndex = [...]uint8{0, 6, 9, 12, 15, 20} func (i SecretManagerType) String() string { if i < 0 || i >= SecretManagerType(len(_SecretManagerTypeIndex)-1) { @@ -18,13 +18,14 @@ func (i SecretManagerType) String() string { return _SecretManagerTypeName[_SecretManagerTypeIndex[i]:_SecretManagerTypeIndex[i+1]] } -var _SecretManagerTypeValues = []SecretManagerType{0, 1, 2, 3} +var _SecretManagerTypeValues = []SecretManagerType{0, 1, 2, 3, 4} var _SecretManagerTypeNameToValueMap = map[string]SecretManagerType{ _SecretManagerTypeName[0:6]: 0, _SecretManagerTypeName[6:9]: 1, _SecretManagerTypeName[9:12]: 2, - _SecretManagerTypeName[12:17]: 3, + _SecretManagerTypeName[12:15]: 3, + _SecretManagerTypeName[15:20]: 4, } // SecretManagerTypeString retrieves an enum value from the enum constants string name. diff --git a/pkg/webhook/gcp_secret_manager.go b/pkg/webhook/gcp_secret_manager.go new file mode 100644 index 000000000..f17c1509a --- /dev/null +++ b/pkg/webhook/gcp_secret_manager.go @@ -0,0 +1,156 @@ +package webhook + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytepropeller/pkg/webhook/config" + "github.com/flyteorg/flytestdlib/logger" + corev1 "k8s.io/api/core/v1" +) + +const ( + // GCPSecretsVolumeName defines the static name of the volume used for mounting/sharing secrets between init-container + // sidecar and the rest of the containers in the pod. + GCPSecretsVolumeName = "gcp-secret-vol" // #nosec +) + +var ( + // GCPSecretMountPath defines the default mount path for secrets + GCPSecretMountPath = filepath.Join(string(os.PathSeparator), "etc", "flyte", "secrets") +) + +// GCPSecretManagerInjector allows injecting of secrets from GCP Secret Manager as files. It uses a Google Cloud +// SDK SideCar as an init-container to download the secret and save it to a local volume shared with all other +// containers in the pod. It supports multiple secrets to be mounted but that will result into adding an init +// container for each secret. The Google serviceaccount (GSA) associated with the Pod, either via Workload +// Identity (recommended) or the underlying node's serviceacccount, must have permissions to pull the secret +// from GCP Secret Manager. Currently, the secret must also exist in the same project. Otherwise, the Pod will +// fail with an init-error. +// Files will be mounted on /etc/flyte/secrets// +type GCPSecretManagerInjector struct { + cfg config.GCPSecretManagerConfig +} + +func formatGCPSecretAccessCommand(secret *core.Secret) []string { + // `gcloud` writes this file with permission 0600. + // This will cause permission issues in the main container when using non-root + // users, so we fix the file permissions with `chmod`. + secretDir := strings.ToLower(filepath.Join(GCPSecretMountPath, secret.Group)) + secretPath := strings.ToLower(filepath.Join(secretDir, secret.GroupVersion)) + args := []string{ + "gcloud", + "secrets", + "versions", + "access", + secret.GroupVersion, + fmt.Sprintf("--secret=%s", secret.Group), + fmt.Sprintf( + "--out-file=%s", + secretPath, + ), + "&&", + "chmod", + "+rX", + secretDir, + secretPath, + } + return []string{"sh", "-c", strings.Join(args, " ")} +} + +func formatGCPInitContainerName(index int) string { + return fmt.Sprintf("gcp-pull-secret-%v", index) +} + +func (i GCPSecretManagerInjector) Type() config.SecretManagerType { + return config.SecretManagerTypeGCP +} + +func (i GCPSecretManagerInjector) Inject(ctx context.Context, secret *core.Secret, p *corev1.Pod) (newP *corev1.Pod, injected bool, err error) { + if len(secret.Group) == 0 || len(secret.GroupVersion) == 0 { + return nil, false, fmt.Errorf("GCP Secrets Webhook require both group and group version to be set. "+ + "Secret: [%v]", secret) + } + + switch secret.MountRequirement { + case core.Secret_ANY: + fallthrough + case core.Secret_FILE: + // A Volume with a static name so that if we try to inject multiple secrets, we won't mount multiple volumes. + // We use Memory as the storage medium for volume source to avoid + vol := corev1.Volume{ + Name: GCPSecretsVolumeName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + }, + }, + } + + p.Spec.Volumes = appendVolumeIfNotExists(p.Spec.Volumes, vol) + p.Spec.InitContainers = append(p.Spec.InitContainers, createGCPSidecarContainer(i.cfg, p, secret)) + + secretVolumeMount := corev1.VolumeMount{ + Name: GCPSecretsVolumeName, + ReadOnly: true, + MountPath: GCPSecretMountPath, + } + + p.Spec.Containers = AppendVolumeMounts(p.Spec.Containers, secretVolumeMount) + p.Spec.InitContainers = AppendVolumeMounts(p.Spec.InitContainers, secretVolumeMount) + + // Inject GCP secret-inject webhook annotations to mount the secret in a predictable location. + envVars := []corev1.EnvVar{ + // Set environment variable to let the container know where to find the mounted files. + { + Name: SecretPathDefaultDirEnvVar, + Value: GCPSecretMountPath, + }, + // Sets an empty prefix to let the containers know the file names will match the secret keys as-is. + { + Name: SecretPathFilePrefixEnvVar, + Value: "", + }, + } + + for _, envVar := range envVars { + p.Spec.InitContainers = AppendEnvVars(p.Spec.InitContainers, envVar) + p.Spec.Containers = AppendEnvVars(p.Spec.Containers, envVar) + } + case core.Secret_ENV_VAR: + fallthrough + default: + err := fmt.Errorf("unrecognized mount requirement [%v] for secret [%v]", secret.MountRequirement.String(), secret.Key) + logger.Error(ctx, err) + return p, false, err + } + + return p, true, nil +} + +func createGCPSidecarContainer(cfg config.GCPSecretManagerConfig, p *corev1.Pod, secret *core.Secret) corev1.Container { + return corev1.Container{ + Image: cfg.SidecarImage, + // Create a unique name to allow multiple secrets to be mounted. + Name: formatGCPInitContainerName(len(p.Spec.InitContainers)), + Command: formatGCPSecretAccessCommand(secret), + VolumeMounts: []corev1.VolumeMount{ + { + Name: GCPSecretsVolumeName, + MountPath: GCPSecretMountPath, + }, + }, + Resources: cfg.Resources, + } +} + +// NewGCPSecretManagerInjector creates a SecretInjector that's able to mount secrets from GCP Secret Manager. +func NewGCPSecretManagerInjector(cfg config.GCPSecretManagerConfig) GCPSecretManagerInjector { + return GCPSecretManagerInjector{ + cfg: cfg, + } +} diff --git a/pkg/webhook/gcp_secret_manager_test.go b/pkg/webhook/gcp_secret_manager_test.go new file mode 100644 index 000000000..26805eafc --- /dev/null +++ b/pkg/webhook/gcp_secret_manager_test.go @@ -0,0 +1,79 @@ +package webhook + +import ( + "context" + "testing" + + "github.com/flyteorg/flytepropeller/pkg/webhook/config" + + "github.com/go-test/deep" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestGCPSecretManagerInjector_Inject(t *testing.T) { + injector := NewGCPSecretManagerInjector(config.DefaultConfig.GCPSecretManagerConfig) + inputSecret := &core.Secret{ + Group: "TestSecret", + GroupVersion: "2", + } + + expected := &corev1.Pod{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "gcp-secret-vol", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + }, + }, + }, + }, + + InitContainers: []corev1.Container{ + { + Name: "gcp-pull-secret-0", + Image: "gcr.io/google.com/cloudsdktool/cloud-sdk:alpine", + Command: []string{ + "sh", + "-c", + "gcloud secrets versions access 2 --secret=TestSecret --out-file=/etc/flyte/secrets/testsecret/2 && chmod +rX /etc/flyte/secrets/testsecret /etc/flyte/secrets/testsecret/2", + }, + Env: []corev1.EnvVar{ + { + Name: "FLYTE_SECRETS_DEFAULT_DIR", + Value: "/etc/flyte/secrets", + }, + { + Name: "FLYTE_SECRETS_FILE_PREFIX", + Value: "", + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "gcp-secret-vol", + MountPath: "/etc/flyte/secrets", + }, + }, + Resources: config.DefaultConfig.GCPSecretManagerConfig.Resources, + }, + }, + Containers: []corev1.Container{}, + }, + } + + p := &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{}, + }, + } + actualP, injected, err := injector.Inject(context.Background(), inputSecret, p) + assert.NoError(t, err) + assert.True(t, injected) + if diff := deep.Equal(actualP, expected); diff != nil { + assert.Fail(t, "actual != expected", "Diff: %v", diff) + } +} diff --git a/pkg/webhook/secrets.go b/pkg/webhook/secrets.go index ffffc53cd..eae878cef 100644 --- a/pkg/webhook/secrets.go +++ b/pkg/webhook/secrets.go @@ -74,6 +74,7 @@ func NewSecretsMutator(cfg *config.Config, _ promutils.Scope) *SecretsMutator { NewGlobalSecrets(secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig())), NewK8sSecretsInjector(), NewAWSSecretManagerInjector(cfg.AWSSecretManagerConfig), + NewGCPSecretManagerInjector(cfg.GCPSecretManagerConfig), NewVaultSecretManagerInjector(cfg.VaultSecretManagerConfig), }, } From 6af4de20b50d85c288d1aca3190b8a4bf03564e0 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Fri, 31 Mar 2023 14:01:32 -0500 Subject: [PATCH 3/3] bumped flyteplugins (#549) Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 7a5caddbf..ca9d98ee2 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 github.com/flyteorg/flyteidl v1.3.14 - github.com/flyteorg/flyteplugins v1.0.44 + github.com/flyteorg/flyteplugins v1.0.45 github.com/flyteorg/flytestdlib v1.0.15 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index 6b4e8cba5..f84741c20 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= -github.com/flyteorg/flyteplugins v1.0.44 h1:uKizng+i0vfXslyPBlrsfecInhvy71fTB4kRg7eiifE= -github.com/flyteorg/flyteplugins v1.0.44/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= +github.com/flyteorg/flyteplugins v1.0.45 h1:I/N4ehOxX6ln8DivyZ9gayp/UYiBcqoizBbG1hfwIXM= +github.com/flyteorg/flyteplugins v1.0.45/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=