Skip to content

Commit

Permalink
Moving backward compat authrole code in one place and also removed re…
Browse files Browse the repository at this point in the history
…dundant fields (flyteorg#439)
  • Loading branch information
pmahindrakar-oss authored Jun 10, 2022
1 parent 7babb88 commit d3550ba
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 25 deletions.
32 changes: 21 additions & 11 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,19 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig,
matchableResource.Attributes.GetWorkflowExecutionConfig())
}
// merge the application config into workflowExecConfig

// Backward compatibility changes to get security context from auth role.
// Older authRole or auth fields in the launchplan spec or execution request need to be used over application defaults.
// This portion of the code makes sure if newer way of setting security context is empty i.e
// K8sServiceAccount and IamRole is empty then get the values from the deprecated fields.
resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, workflowExecConfig.GetSecurityContext(), resolvedAuthRole)
if workflowExecConfig.GetSecurityContext() == nil &&
(len(resolvedSecurityCtx.GetRunAs().GetK8SServiceAccount()) > 0 ||
len(resolvedSecurityCtx.GetRunAs().GetIamRole()) > 0) {
workflowExecConfig.SecurityContext = resolvedSecurityCtx
}
// merge the application config into workflowExecConfig. If even the deprecated fields are not set
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig())
logger.Infof(ctx, "getting the workflow execution config from application configuration")
// Defaults to one from the application config
Expand Down Expand Up @@ -669,16 +681,12 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, executionConfig.GetSecurityContext(), resolvedAuthRole)

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: request.Inputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
SecurityContext: resolvedSecurityCtx,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
Expand Down Expand Up @@ -746,7 +754,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: resolvedSecurityCtx,
SecurityContext: executionConfig.SecurityContext,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand All @@ -757,11 +765,15 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return ctx, executionModel, nil
}

func resolveAuthRole(request admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole {
func resolveAuthRole(request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) *admin.AuthRole {
if request.Spec.AuthRole != nil {
return request.Spec.AuthRole
}

if launchPlan == nil || launchPlan.Spec == nil {
return &admin.AuthRole{}
}

// Set role permissions based on launch plan Auth values.
// The branched-ness of this check is due to the presence numerous deprecated fields
if launchPlan.Spec.GetAuthRole() != nil {
Expand All @@ -776,6 +788,7 @@ func resolveAuthRole(request admin.ExecutionCreateRequest, launchPlan *admin.Lau
AssumableIamRole: launchPlan.GetSpec().GetRole(),
}
}

return &admin.AuthRole{}
}

Expand Down Expand Up @@ -907,15 +920,12 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, executionConfig.GetSecurityContext(), resolvedAuthRole)
executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
SecurityContext: resolvedSecurityCtx,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
Expand Down Expand Up @@ -984,7 +994,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
SecurityContext: resolvedSecurityCtx,
SecurityContext: executionConfig.SecurityContext,
})
if err != nil {
logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v",
Expand Down
49 changes: 43 additions & 6 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3987,6 +3987,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
defaultK8sServiceAccount := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().K8SServiceAccount
defaultMaxParallelism := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().MaxParallelism

deprecatedLaunchPlanK8sServiceAccount := "deprecatedLaunchPlanK8sServiceAccount"
rmLabels := map[string]string{"rmLabelKey": "rmLabelValue"}
rmAnnotations := map[string]string{"rmAnnotationKey": "rmAnnotationValue"}
rmOutputLocationPrefix := "rmOutputLocationPrefix"
Expand Down Expand Up @@ -4266,6 +4267,42 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("fetch security context from deprecated config", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{},
},
},
}, nil
}
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}
launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
AuthRole: &admin.AuthRole{
KubernetesServiceAccount: deprecatedLaunchPlanK8sServiceAccount,
},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, deprecatedLaunchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("matchable resource workflow resource", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
Expand Down Expand Up @@ -4504,7 +4541,7 @@ func TestResolvePermissions(t *testing.T) {
k8sServiceAccountSc := "saSc"

t.Run("backward compat use request values from auth", func(t *testing.T) {
execRequest := admin.ExecutionCreateRequest{
execRequest := &admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{
AuthRole: &admin.AuthRole{
AssumableIamRole: assumableIamRole,
Expand Down Expand Up @@ -4537,7 +4574,7 @@ func TestResolvePermissions(t *testing.T) {
}}, sc)
})
t.Run("use request values security context", func(t *testing.T) {
execRequest := admin.ExecutionCreateRequest{
execRequest := &admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
Expand Down Expand Up @@ -4571,7 +4608,7 @@ func TestResolvePermissions(t *testing.T) {
assert.Equal(t, k8sServiceAccountSc, sc.RunAs.K8SServiceAccount)
})
t.Run("prefer lp auth role over auth", func(t *testing.T) {
execRequest := admin.ExecutionCreateRequest{
execRequest := &admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{},
}
lp := &admin.LaunchPlan{
Expand Down Expand Up @@ -4601,7 +4638,7 @@ func TestResolvePermissions(t *testing.T) {
}, sc)
})
t.Run("prefer security context over auth context", func(t *testing.T) {
execRequest := admin.ExecutionCreateRequest{
execRequest := &admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{
AuthRole: &admin.AuthRole{
AssumableIamRole: assumableIamRole,
Expand Down Expand Up @@ -4643,7 +4680,7 @@ func TestResolvePermissions(t *testing.T) {
assert.Equal(t, k8sServiceAccountSc, sc.RunAs.K8SServiceAccount)
})
t.Run("prefer lp auth over role", func(t *testing.T) {
execRequest := admin.ExecutionCreateRequest{
execRequest := &admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{},
}
lp := &admin.LaunchPlan{
Expand Down Expand Up @@ -4673,7 +4710,7 @@ func TestResolvePermissions(t *testing.T) {
}, sc)
})
t.Run("prefer lp auth over role", func(t *testing.T) {
authRole := resolveAuthRole(admin.ExecutionCreateRequest{
authRole := resolveAuthRole(&admin.ExecutionCreateRequest{
Spec: &admin.ExecutionSpec{},
}, &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1

// add permissions from auth and security context. Adding permissions from auth would be removed once all clients
// have migrated over to security context
addPermissions(data.ExecutionParameters.SecurityContext,
addPermissions(data.ExecutionParameters.ExecutionConfig.SecurityContext,
data.ExecutionParameters.RoleNameKey, flyteWorkflow)

labels := addMapValues(data.ExecutionParameters.Labels, flyteWorkflow.Labels)
Expand Down
12 changes: 6 additions & 6 deletions flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ func TestPrepareFlyteWorkflow(t *testing.T) {
MissingPluginBehavior: admin.PluginOverride_USE_DEFAULT,
},
},
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
IamRole: testRoleSc,
K8SServiceAccount: testK8sServiceAccountSc,
},
},
ExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: 50,
Interruptible: &wrappers.BoolValue{Value: true},
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
IamRole: testRoleSc,
K8SServiceAccount: testK8sServiceAccountSc,
},
},
},
RecoveryExecution: recoveryNodeExecutionID,
EventVersion: 1,
Expand Down
1 change: 0 additions & 1 deletion flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type ExecutionParameters struct {
Annotations map[string]string
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
SecurityContext *core.SecurityContext
RecoveryExecution *core.WorkflowExecutionIdentifier
TaskResources *TaskResources
EventVersion int
Expand Down

0 comments on commit d3550ba

Please sign in to comment.