diff --git a/flyteadmin/flyteadmin_config.yaml b/flyteadmin/flyteadmin_config.yaml index 666d651d92..a64457ddab 100644 --- a/flyteadmin/flyteadmin_config.yaml +++ b/flyteadmin/flyteadmin_config.yaml @@ -120,12 +120,12 @@ storage: auth-type: accesskey secret-key: miniostorage disable-ssl: true - endpoint: "http://localhost:9000" + endpoint: "http://localhost:30084" region: my-region cache: max_size_mbs: 10 target_gc_percent: 100 - container: "flyte" + container: "my-s3-bucket" queues: executionQueues: - dynamic: "gpu_dynamic" @@ -157,13 +157,12 @@ queues: task_resources: defaults: cpu: 100m - gpu: 20m - memory: 1Mi - storage: 10M + memory: 200Mi + storage: 100M limits: cpu: 500m - gpu: 100m - memory: 1Mi + gpu: 1 + memory: 300Mi storage: 10G task_type_whitelist: sparkonk8s: diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 905a8b8650..9d99795b09 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -568,13 +568,15 @@ func (m *ExecutionManager) launchSingleTaskExecution( annotations = requestSpec.Annotations.Values } + resolvedAuthRole := resolveAuthRole(request, launchPlan) + resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole) executionParameters := workflowengineInterfaces.ExecutionParameters{ Inputs: request.Inputs, AcceptedAt: requestedAt, Labels: labels, Annotations: annotations, ExecutionConfig: executionConfig, - Auth: resolvePermissions(&request, launchPlan), + SecurityContext: resolvedSecurityCtx, TaskResources: &platformTaskResources, EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion, RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey, @@ -650,7 +652,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( return ctx, executionModel, nil } -func resolvePermissions(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { +func resolveAuthRole(request admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole { if request.Spec.AuthRole != nil { return request.Spec.AuthRole } @@ -672,6 +674,26 @@ func resolvePermissions(request *admin.ExecutionCreateRequest, launchPlan *admin return &admin.AuthRole{} } +func resolveSecurityCtx(ctx context.Context, request admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan, + resolvedAuthRole *admin.AuthRole) *core.SecurityContext { + // Use security context from the request if its set + if request.Spec.SecurityContext != nil { + return request.Spec.SecurityContext + } + + // Use launchplans security context if its set + if launchPlan.Spec.SecurityContext != nil { + return launchPlan.Spec.SecurityContext + } + logger.Warn(ctx, "Setting security context from auth Role") + return &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: resolvedAuthRole.AssumableIamRole, + K8SServiceAccount: resolvedAuthRole.KubernetesServiceAccount, + }, + } +} + func (m *ExecutionManager) launchExecutionAndPrepareModel( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( context.Context, *models.Execution, error) { @@ -774,13 +796,15 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( return nil, nil, err } + resolvedAuthRole := resolveAuthRole(request, launchPlan) + resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole) executionParameters := workflowengineInterfaces.ExecutionParameters{ Inputs: executionInputs, AcceptedAt: requestedAt, Labels: labels, Annotations: annotations, ExecutionConfig: executionConfig, - Auth: resolvePermissions(&request, launchPlan), + SecurityContext: resolvedSecurityCtx, TaskResources: &platformTaskResources, EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion, RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey, diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 5de135d38c..ea2b672df6 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -3382,29 +3382,72 @@ func TestResolvePermissions(t *testing.T) { assumableIamRole := "role" k8sServiceAccount := "sa" - t.Run("use request values", func(t *testing.T) { - auth := resolvePermissions(&admin.ExecutionCreateRequest{ + assumableIamRoleLp := "roleLp" + k8sServiceAccountLp := "saLp" + + assumableIamRoleSc := "roleSc" + k8sServiceAccountSc := "saSc" + + t.Run("backward compat use request values from auth", func(t *testing.T) { + execRequest := admin.ExecutionCreateRequest{ Spec: &admin.ExecutionSpec{ AuthRole: &admin.AuthRole{ AssumableIamRole: assumableIamRole, KubernetesServiceAccount: k8sServiceAccount, }, }, - }, &admin.LaunchPlan{ + } + lp := &admin.LaunchPlan{ Spec: &admin.LaunchPlanSpec{ AuthRole: &admin.AuthRole{ AssumableIamRole: "lp role", KubernetesServiceAccount: "k8s sa", }, }, - }) - assert.Equal(t, assumableIamRole, auth.AssumableIamRole) - assert.Equal(t, k8sServiceAccount, auth.KubernetesServiceAccount) + } + authRole := resolveAuthRole(execRequest, lp) + sc := resolveSecurityCtx(context.TODO(), execRequest, lp, authRole) + assert.Equal(t, assumableIamRole, authRole.AssumableIamRole) + assert.Equal(t, k8sServiceAccount, authRole.KubernetesServiceAccount) + assert.Equal(t, &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRole, + K8SServiceAccount: k8sServiceAccount, + }}, sc) + }) + t.Run("use request values security context", func(t *testing.T) { + execRequest := admin.ExecutionCreateRequest{ + Spec: &admin.ExecutionSpec{ + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRoleSc, + K8SServiceAccount: k8sServiceAccountSc, + }, + }, + }, + } + lp := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRoleSc, + K8SServiceAccount: k8sServiceAccountSc, + }, + }, + }, + } + authRole := resolveAuthRole(execRequest, lp) + sc := resolveSecurityCtx(context.TODO(), execRequest, lp, authRole) + assert.Equal(t, "", authRole.AssumableIamRole) + assert.Equal(t, "", authRole.KubernetesServiceAccount) + assert.Equal(t, assumableIamRoleSc, sc.RunAs.IamRole) + assert.Equal(t, k8sServiceAccountSc, sc.RunAs.K8SServiceAccount) }) t.Run("prefer lp auth role over auth", func(t *testing.T) { - auth := resolvePermissions(&admin.ExecutionCreateRequest{ + execRequest := admin.ExecutionCreateRequest{ Spec: &admin.ExecutionSpec{}, - }, &admin.LaunchPlan{ + } + lp := &admin.LaunchPlan{ Spec: &admin.LaunchPlanSpec{ AuthRole: &admin.AuthRole{ AssumableIamRole: assumableIamRole, @@ -3415,14 +3458,59 @@ func TestResolvePermissions(t *testing.T) { KubernetesServiceAccount: "k8s sa", }, }, - }) - assert.Equal(t, assumableIamRole, auth.AssumableIamRole) - assert.Equal(t, k8sServiceAccount, auth.KubernetesServiceAccount) + } + authRole := resolveAuthRole(execRequest, lp) + sc := resolveSecurityCtx(context.TODO(), execRequest, lp, authRole) + assert.Equal(t, assumableIamRole, authRole.AssumableIamRole) + assert.Equal(t, k8sServiceAccount, authRole.KubernetesServiceAccount) + assert.Equal(t, &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRole, + K8SServiceAccount: k8sServiceAccount, + }, + }, sc) + }) + t.Run("prefer security context over auth context", func(t *testing.T) { + execRequest := admin.ExecutionCreateRequest{ + Spec: &admin.ExecutionSpec{ + AuthRole: &admin.AuthRole{ + AssumableIamRole: assumableIamRole, + KubernetesServiceAccount: k8sServiceAccount, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRoleSc, + K8SServiceAccount: k8sServiceAccountSc, + }, + }, + }, + } + lp := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + AuthRole: &admin.AuthRole{ + AssumableIamRole: assumableIamRole, + KubernetesServiceAccount: k8sServiceAccount, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRoleSc, + K8SServiceAccount: k8sServiceAccountSc, + }, + }, + }, + } + authRole := resolveAuthRole(execRequest, lp) + sc := resolveSecurityCtx(context.TODO(), execRequest, lp, authRole) + assert.Equal(t, assumableIamRole, authRole.AssumableIamRole) + assert.Equal(t, k8sServiceAccount, authRole.KubernetesServiceAccount) + assert.Equal(t, assumableIamRoleSc, sc.RunAs.IamRole) + assert.Equal(t, k8sServiceAccountSc, sc.RunAs.K8SServiceAccount) }) t.Run("prefer lp auth over role", func(t *testing.T) { - auth := resolvePermissions(&admin.ExecutionCreateRequest{ + execRequest := admin.ExecutionCreateRequest{ Spec: &admin.ExecutionSpec{}, - }, &admin.LaunchPlan{ + } + lp := &admin.LaunchPlan{ Spec: &admin.LaunchPlanSpec{ Auth: &admin.Auth{ AssumableIamRole: assumableIamRole, @@ -3430,9 +3518,32 @@ func TestResolvePermissions(t *testing.T) { }, Role: "old role", }, + } + authRole := resolveAuthRole(execRequest, lp) + sc := resolveSecurityCtx(context.TODO(), execRequest, lp, authRole) + assert.Equal(t, assumableIamRole, authRole.AssumableIamRole) + assert.Equal(t, k8sServiceAccount, authRole.KubernetesServiceAccount) + assert.Equal(t, &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: assumableIamRole, + K8SServiceAccount: k8sServiceAccount, + }, + }, sc) + }) + t.Run("prefer lp auth over role", func(t *testing.T) { + authRole := resolveAuthRole(admin.ExecutionCreateRequest{ + Spec: &admin.ExecutionSpec{}, + }, &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Auth: &admin.Auth{ + AssumableIamRole: assumableIamRoleLp, + KubernetesServiceAccount: k8sServiceAccountLp, + }, + Role: "old role", + }, }) - assert.Equal(t, assumableIamRole, auth.AssumableIamRole) - assert.Equal(t, k8sServiceAccount, auth.KubernetesServiceAccount) + assert.Equal(t, assumableIamRoleLp, authRole.AssumableIamRole) + assert.Equal(t, k8sServiceAccountLp, authRole.KubernetesServiceAccount) }) } diff --git a/flyteadmin/pkg/manager/impl/util/single_task_execution.go b/flyteadmin/pkg/manager/impl/util/single_task_execution.go index 88d0af12cd..3ebed4fc2d 100644 --- a/flyteadmin/pkg/manager/impl/util/single_task_execution.go +++ b/flyteadmin/pkg/manager/impl/util/single_task_execution.go @@ -190,12 +190,13 @@ func CreateOrGetLaunchPlan(ctx context.Context, Name: taskIdentifier.Name, Version: taskIdentifier.Version, }, - EntityMetadata: &admin.LaunchPlanMetadata{}, - DefaultInputs: &core.ParameterMap{}, - FixedInputs: &core.LiteralMap{}, - Labels: &admin.Labels{}, - Annotations: &admin.Annotations{}, - AuthRole: spec.AuthRole, + EntityMetadata: &admin.LaunchPlanMetadata{}, + DefaultInputs: &core.ParameterMap{}, + FixedInputs: &core.LiteralMap{}, + Labels: &admin.Labels{}, + Annotations: &admin.Annotations{}, + AuthRole: spec.AuthRole, + SecurityContext: spec.SecurityContext, }, } if err := validation.ValidateLaunchPlan(ctx, generatedCreateLaunchPlanReq, db, config.ApplicationConfiguration(), workflowInterface); err != nil { diff --git a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go index 87e19df649..5da21108b7 100644 --- a/flyteadmin/pkg/workflowengine/impl/prepare_execution.go +++ b/flyteadmin/pkg/workflowengine/impl/prepare_execution.go @@ -23,20 +23,18 @@ func addMapValues(overrides map[string]string, defaultValues map[string]string) return defaultValues } -func addPermissions(auth *admin.AuthRole, roleNameKey string, flyteWf *v1alpha1.FlyteWorkflow) { - // Set role permissions based on launch plan Auth values. - // The branched-ness of this check is due to the presence numerous deprecated fields - if auth == nil { +func addPermissions(securityCtx *core.SecurityContext, roleNameKey string, flyteWf *v1alpha1.FlyteWorkflow) { + if securityCtx == nil || securityCtx.RunAs == nil { return } - if len(auth.AssumableIamRole) > 0 { + if len(securityCtx.RunAs.IamRole) > 0 { if flyteWf.Annotations == nil { flyteWf.Annotations = map[string]string{} } - flyteWf.Annotations[roleNameKey] = auth.AssumableIamRole + flyteWf.Annotations[roleNameKey] = securityCtx.RunAs.IamRole } - if len(auth.KubernetesServiceAccount) > 0 { - flyteWf.ServiceAccountName = auth.KubernetesServiceAccount + if len(securityCtx.RunAs.K8SServiceAccount) > 0 { + flyteWf.ServiceAccountName = securityCtx.RunAs.K8SServiceAccount } } @@ -117,7 +115,10 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1 acceptAtWrapper := v1.NewTime(data.ExecutionParameters.AcceptedAt) flyteWorkflow.AcceptedAt = &acceptAtWrapper - addPermissions(data.ExecutionParameters.Auth, data.ExecutionParameters.RoleNameKey, flyteWorkflow) + // add permissions from auth and security context. Adding permissions from auth would be removed once all clients + // have migrated over to security context + addPermissions(data.ExecutionParameters.SecurityContext, + data.ExecutionParameters.RoleNameKey, flyteWorkflow) labels := addMapValues(data.ExecutionParameters.Labels, flyteWorkflow.Labels) flyteWorkflow.Labels = labels diff --git a/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go b/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go index 9b1f17b7c9..c6040273fd 100644 --- a/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go +++ b/flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go @@ -18,6 +18,9 @@ import ( const testRole = "role" const testK8sServiceAccount = "sa" +const testRoleSc = "roleSc" +const testK8sServiceAccountSc = "saSc" + var roleNameKey = "iam.amazonaws.com/role" func TestAddMapValues(t *testing.T) { @@ -48,15 +51,35 @@ func TestAddMapValues(t *testing.T) { } func TestAddPermissions(t *testing.T) { - flyteWf := v1alpha1.FlyteWorkflow{} - addPermissions(&admin.AuthRole{ - AssumableIamRole: testRole, - KubernetesServiceAccount: testK8sServiceAccount, - }, roleNameKey, &flyteWf) - assert.EqualValues(t, flyteWf.Annotations, map[string]string{ - roleNameKey: testRole, + securityCtx := &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: testRoleSc, + K8SServiceAccount: testK8sServiceAccountSc, + }, + } + securityCtxFromAuth := &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: testRole, + K8SServiceAccount: testK8sServiceAccount, + }, + } + t.Run("security ctx from auth", func(t *testing.T) { + flyteWf := v1alpha1.FlyteWorkflow{} + addPermissions(securityCtxFromAuth, roleNameKey, &flyteWf) + assert.EqualValues(t, flyteWf.Annotations, map[string]string{ + roleNameKey: testRole, + }) + assert.Equal(t, testK8sServiceAccount, flyteWf.ServiceAccountName) + }) + + t.Run("override using security ctx", func(t *testing.T) { + flyteWf := v1alpha1.FlyteWorkflow{} + addPermissions(securityCtx, roleNameKey, &flyteWf) + assert.EqualValues(t, flyteWf.Annotations, map[string]string{ + roleNameKey: testRoleSc, + }) + assert.Equal(t, testK8sServiceAccountSc, flyteWf.ServiceAccountName) }) - assert.Equal(t, testK8sServiceAccount, flyteWf.ServiceAccountName) } func TestAddExecutionOverrides(t *testing.T) { @@ -156,13 +179,15 @@ func TestPrepareFlyteWorkflow(t *testing.T) { MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT, }, }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: testRoleSc, + K8SServiceAccount: testK8sServiceAccountSc, + }, + }, ExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: 50, }, - Auth: &admin.AuthRole{ - AssumableIamRole: testRole, - KubernetesServiceAccount: testK8sServiceAccount, - }, RecoveryExecution: recoveryNodeExecutionID, EventVersion: 1, RoleNameKey: roleNameKey, @@ -180,7 +205,7 @@ func TestPrepareFlyteWorkflow(t *testing.T) { "customlabel": "labelval", }, flyteWorkflow.Labels) expectedAnnotations := map[string]string{ - roleNameKey: testRole, + roleNameKey: testRoleSc, "customannotation": "annotationval", } assert.EqualValues(t, expectedAnnotations, flyteWorkflow.Annotations) @@ -191,7 +216,7 @@ func TestPrepareFlyteWorkflow(t *testing.T) { MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT, }, }, flyteWorkflow.ExecutionConfig.TaskPluginImpls) - assert.Equal(t, flyteWorkflow.ServiceAccountName, testK8sServiceAccount) + assert.Equal(t, flyteWorkflow.ServiceAccountName, testK8sServiceAccountSc) assert.Equal(t, flyteWorkflow.ExecutionConfig.MaxParallelism, uint32(50)) assert.True(t, proto.Equal(recoveryNodeExecutionID, flyteWorkflow.ExecutionConfig.RecoveryExecution.WorkflowExecutionIdentifier)) assert.Equal(t, flyteWorkflow.WorkflowMeta.EventVersion, v1alpha1.EventVersion(1)) diff --git a/flyteadmin/pkg/workflowengine/interfaces/executor.go b/flyteadmin/pkg/workflowengine/interfaces/executor.go index 5d8471f871..b22cd8c4b2 100644 --- a/flyteadmin/pkg/workflowengine/interfaces/executor.go +++ b/flyteadmin/pkg/workflowengine/interfaces/executor.go @@ -24,7 +24,7 @@ type ExecutionParameters struct { Annotations map[string]string TaskPluginOverrides []*admin.PluginOverride ExecutionConfig *admin.WorkflowExecutionConfig - Auth *admin.AuthRole + SecurityContext *core.SecurityContext RecoveryExecution *core.WorkflowExecutionIdentifier TaskResources *TaskResources EventVersion int