From f53200727aec3654c2b20ff8a75ec74de1e97541 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Fri, 25 Mar 2022 20:07:41 +0530 Subject: [PATCH] Added execution config changes (#378) * Added execution config changes Signed-off-by: Prafulla Mahindrakar * lint fixes Signed-off-by: Prafulla Mahindrakar * using executionConfig data during launch Signed-off-by: Prafulla Mahindrakar * resolve conflicts Signed-off-by: Prafulla Mahindrakar * Removed defaults for labels and annotations Signed-off-by: Prafulla Mahindrakar * added more coverage Signed-off-by: Prafulla Mahindrakar * added more coverage Signed-off-by: Prafulla Mahindrakar * Updating idl and lint fixes Signed-off-by: Prafulla Mahindrakar * Adde missing go.sum Signed-off-by: Prafulla Mahindrakar * feedback changes to return immediately if any field is set while overriding Signed-off-by: Prafulla Mahindrakar * using released flyteidl Signed-off-by: Prafulla Mahindrakar * using released flyteidl Signed-off-by: Prafulla Mahindrakar --- flyteadmin/go.mod | 2 +- flyteadmin/go.sum | 4 +- .../pkg/manager/impl/execution_manager.go | 116 +++++-- .../manager/impl/execution_manager_test.go | 300 +++++++++++++++++- flyteadmin/pkg/manager/impl/util/shared.go | 20 ++ .../pkg/manager/impl/util/shared_test.go | 68 +++- .../runtime/application_config_provider.go | 1 + .../interfaces/application_configuration.go | 42 +++ 8 files changed, 513 insertions(+), 40 deletions(-) diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 0828db395..d3e59c405 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -11,7 +11,7 @@ require ( github.com/benbjohnson/clock v1.1.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.24.2 + github.com/flyteorg/flyteidl v0.24.6 github.com/flyteorg/flyteplugins v0.10.16 github.com/flyteorg/flytepropeller v0.16.36 github.com/flyteorg/flytestdlib v0.4.13 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 967f3c492..6d08b9a9a 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -352,8 +352,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.2 h1:RQzWmtVQR+NKAppjw7xTsIn6gosP0Q/j58tfF6Cr6h4= -github.com/flyteorg/flyteidl v0.24.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.24.6 h1:n2796X9Sw7mNDtXWwsJr84DLQpz8Cptvb7LptfJLxag= +github.com/flyteorg/flyteidl v0.24.6/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw= github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos= github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 1d895f551..ead7cf8ad 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -428,41 +428,91 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request return parentNodeExecutionID, sourceExecutionID, nil } +// WorkflowExecutionConfigInterface is used as common interface for capturing the common behavior catering to the needs +// of fetching the WorkflowExecutionConfig across LaunchPlanSpec, ExecutionCreateRequest +// MatchableResource_WORKFLOW_EXECUTION_CONFIG and ApplicationConfig +type WorkflowExecutionConfigInterface interface { + // GetMaxParallelism Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness. + GetMaxParallelism() int32 + // GetRawOutputDataConfig Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.). + GetRawOutputDataConfig() *admin.RawOutputDataConfig + // GetSecurityContext Indicates security context permissions for executions triggered with this matchable attribute. + GetSecurityContext() *core.SecurityContext + // GetAnnotations Custom annotations to be applied to a triggered execution resource. + GetAnnotations() *admin.Annotations + // GetLabels Custom labels to be applied to a triggered execution resource. + GetLabels() *admin.Labels +} + +// Merge into workflowExecConfig from spec and return true if any value has been changed +func mergeIntoExecConfig(workflowExecConfig *admin.WorkflowExecutionConfig, spec WorkflowExecutionConfigInterface) bool { + isChanged := false + if workflowExecConfig.GetMaxParallelism() == 0 && spec.GetMaxParallelism() > 0 { + workflowExecConfig.MaxParallelism = spec.GetMaxParallelism() + isChanged = true + } + if workflowExecConfig.GetSecurityContext() == nil && spec.GetSecurityContext() != nil { + workflowExecConfig.SecurityContext = spec.GetSecurityContext() + isChanged = true + } + // Launchplan spec has label, annotation and rawOutputDataConfig initialized with empty values. + // Hence we do a deep check in the following conditions before assignment + if (workflowExecConfig.GetRawOutputDataConfig() == nil || + len(workflowExecConfig.GetRawOutputDataConfig().GetOutputLocationPrefix()) == 0) && + (spec.GetRawOutputDataConfig() != nil && len(spec.GetRawOutputDataConfig().OutputLocationPrefix) > 0) { + workflowExecConfig.RawOutputDataConfig = spec.GetRawOutputDataConfig() + isChanged = true + } + if (workflowExecConfig.GetLabels() == nil || len(workflowExecConfig.GetLabels().Values) == 0) && + (spec.GetLabels() != nil && len(spec.GetLabels().Values) > 0) { + workflowExecConfig.Labels = spec.GetLabels() + isChanged = true + } + if (workflowExecConfig.GetAnnotations() == nil || len(workflowExecConfig.GetAnnotations().Values) == 0) && + (spec.GetAnnotations() != nil && len(spec.GetAnnotations().Values) > 0) { + workflowExecConfig.Annotations = spec.GetAnnotations() + isChanged = true + } + return isChanged +} + // Produces execution-time attributes for workflow execution. // Defaults to overridable execution values set in the execution create request, then looks at the launch plan values // (if any) before defaulting to values set in the matchable resource db and further if matchable resources don't // exist then defaults to one set in application configuration func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest, launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) { - if request.Spec.MaxParallelism > 0 { - return &admin.WorkflowExecutionConfig{ - MaxParallelism: request.Spec.MaxParallelism, - }, nil + + workflowExecConfig := &admin.WorkflowExecutionConfig{} + // merge the request spec into workflowExecConfig + if isChanged := mergeIntoExecConfig(workflowExecConfig, request.Spec); isChanged { + return workflowExecConfig, nil } - if launchPlan != nil && launchPlan.Spec.MaxParallelism > 0 { - return &admin.WorkflowExecutionConfig{ - MaxParallelism: launchPlan.Spec.MaxParallelism, - }, nil + + if launchPlan != nil && launchPlan.Spec != nil { + // merge the launch plan spec into workflowExecConfig + if isChanged := mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec); isChanged { + return workflowExecConfig, nil + } } - resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ - Project: request.Project, - Domain: request.Domain, - ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, - }) + matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager, + admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain) if err != nil { - if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get workflow execution config overrides with error: %v", err) - return nil, err - } + return nil, err } - if resource != nil && resource.Attributes.GetWorkflowExecutionConfig() != nil { - return resource.Attributes.GetWorkflowExecutionConfig(), nil + + if matchableResource != nil && matchableResource.Attributes.GetWorkflowExecutionConfig() != nil { + // merge the matchable resource workflow execution config into workflowExecConfig + if isChanged := mergeIntoExecConfig(workflowExecConfig, + matchableResource.Attributes.GetWorkflowExecutionConfig()); isChanged { + return workflowExecConfig, nil + } } + // merge the application config into workflowExecConfig + mergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig()) // Defaults to one from the application config - return &admin.WorkflowExecutionConfig{ - MaxParallelism: m.config.ApplicationConfiguration().GetTopLevelConfig().GetMaxParallelism(), - }, nil + return workflowExecConfig, nil } func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *admin.ExecutionCreateRequest) ( @@ -579,21 +629,23 @@ func (m *ExecutionManager) launchSingleTaskExecution( } var labels map[string]string - if requestSpec.Labels != nil { - labels = requestSpec.Labels.Values + if executionConfig.Labels != nil { + labels = executionConfig.Labels.Values } + labels, err = m.addProjectLabels(ctx, request.Project, labels) if err != nil { return nil, nil, err } + var annotations map[string]string - if requestSpec.Annotations != nil { - annotations = requestSpec.Annotations.Values + if executionConfig.Annotations != nil { + annotations = executionConfig.Annotations.Values } - rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig - if requestSpec.RawOutputDataConfig != nil { - rawOutputDataConfig = requestSpec.RawOutputDataConfig + var rawOutputDataConfig *admin.RawOutputDataConfig + if executionConfig.RawOutputDataConfig != nil { + rawOutputDataConfig = executionConfig.RawOutputDataConfig } clusterAssignment, err := m.getClusterAssignment(ctx, &request) @@ -817,7 +869,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( namespace := common.GetNamespaceName( m.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), workflowExecutionID.Project, workflowExecutionID.Domain) - labels, err := resolveStringMap(requestSpec.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries()) + labels, err := resolveStringMap(executionConfig.GetLabels(), launchPlan.Spec.Labels, "labels", m.config.RegistrationValidationConfiguration().GetMaxLabelEntries()) if err != nil { return nil, nil, err } @@ -825,11 +877,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, err } - annotations, err := resolveStringMap(requestSpec.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) + annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { return nil, nil, err } - rawOutputDataConfig := launchPlan.Spec.RawOutputDataConfig + var rawOutputDataConfig *admin.RawOutputDataConfig if requestSpec.RawOutputDataConfig != nil { rawOutputDataConfig = requestSpec.RawOutputDataConfig } diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 895dc45c5..f5f18fd67 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -3622,6 +3622,299 @@ func TestCreateSingleTaskExecution(t *testing.T) { assert.NoError(t, err) } +func TestGetExecutionConfigOverrides(t *testing.T) { + + requestLabels := map[string]string{"requestLabelKey": "requestLabelValue"} + requestAnnotations := map[string]string{"requestAnnotationKey": "requestAnnotationValue"} + requestOutputLocationPrefix := "requestOutputLocationPrefix" + requestK8sServiceAccount := "requestK8sServiceAccount" + requestMaxParallelism := int32(10) + + launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"} + launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"} + launchPlanOutputLocationPrefix := "launchPlanOutputLocationPrefix" + launchPlanK8sServiceAccount := "launchPlanK8sServiceAccount" + launchPlanAssumableIamRole := "launchPlanAssumableIamRole" + launchPlanMaxParallelism := int32(50) + + applicationConfig := runtime.NewConfigurationProvider() + + defaultK8sServiceAccount := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().K8SServiceAccount + defaultMaxParallelism := applicationConfig.ApplicationConfiguration().GetTopLevelConfig().MaxParallelism + + rmLabels := map[string]string{"rmLabelKey": "rmLabelValue"} + rmAnnotations := map[string]string{"rmAnnotationKey": "rmAnnotationValue"} + rmOutputLocationPrefix := "rmOutputLocationPrefix" + rmK8sServiceAccount := "rmK8sServiceAccount" + rmMaxParallelism := int32(80) + + resourceManager := managerMocks.MockResourceManager{} + executionManager := ExecutionManager{ + resourceManager: &resourceManager, + config: applicationConfig, + } + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ + MaxParallelism: rmMaxParallelism, + Labels: &admin.Labels{Values: rmLabels}, + Annotations: &admin.Annotations{Values: rmAnnotations}, + RawOutputDataConfig: &admin.RawOutputDataConfig{ + OutputLocationPrefix: rmOutputLocationPrefix, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: rmK8sServiceAccount, + }, + }, + }, + }, + }, + }, nil + } + + t.Run("request with full config", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Labels: &admin.Labels{Values: requestLabels}, + Annotations: &admin.Annotations{Values: requestAnnotations}, + RawOutputDataConfig: &admin.RawOutputDataConfig{ + OutputLocationPrefix: requestOutputLocationPrefix, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: requestK8sServiceAccount, + }, + }, + MaxParallelism: requestMaxParallelism, + }, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) + assert.Equal(t, requestLabels, execConfig.GetLabels().Values) + assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values) + }) + t.Run("request with partial config", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Labels: &admin.Labels{Values: requestLabels}, + RawOutputDataConfig: &admin.RawOutputDataConfig{ + OutputLocationPrefix: requestOutputLocationPrefix, + }, + MaxParallelism: requestMaxParallelism, + }, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Annotations: &admin.Annotations{Values: launchPlanAnnotations}, + Labels: &admin.Labels{Values: launchPlanLabels}, + RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: launchPlanOutputLocationPrefix}, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: launchPlanK8sServiceAccount, + IamRole: launchPlanAssumableIamRole, + }, + }, + MaxParallelism: launchPlanMaxParallelism, + }, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.SecurityContext) + assert.Nil(t, execConfig.Annotations) + assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) + assert.Equal(t, requestLabels, execConfig.GetLabels().Values) + }) + t.Run("request with no config", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Labels: &admin.Labels{Values: launchPlanLabels}, + Annotations: &admin.Annotations{Values: launchPlanAnnotations}, + RawOutputDataConfig: &admin.RawOutputDataConfig{ + OutputLocationPrefix: launchPlanOutputLocationPrefix, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: launchPlanK8sServiceAccount, + }, + }, + MaxParallelism: launchPlanMaxParallelism, + }, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) + assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) + assert.Equal(t, launchPlanAnnotations, execConfig.GetAnnotations().Values) + }) + t.Run("launchplan with partial config", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Labels: &admin.Labels{Values: launchPlanLabels}, + Annotations: &admin.Annotations{Values: launchPlanAnnotations}, + RawOutputDataConfig: &admin.RawOutputDataConfig{ + OutputLocationPrefix: launchPlanOutputLocationPrefix, + }, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: launchPlanK8sServiceAccount, + }, + }, + MaxParallelism: launchPlanMaxParallelism, + }, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) + assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) + assert.Equal(t, launchPlanAnnotations, execConfig.GetAnnotations().Values) + }) + t.Run("launchplan with no config", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) + assert.Equal(t, rmLabels, execConfig.GetLabels().Values) + assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values) + }) + t.Run("matchable resource partial config", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ + MaxParallelism: rmMaxParallelism, + Annotations: &admin.Annotations{Values: rmAnnotations}, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: rmK8sServiceAccount, + }, + }, + }, + }, + }, + }, nil + } + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Equal(t, rmAnnotations, execConfig.GetAnnotations().Values) + }) + t.Run("matchable resource with no config", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{}, + }, + }, + }, nil + } + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("matchable resource failure", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return nil, fmt.Errorf("failed to fetch the resources") + } + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.Equal(t, fmt.Errorf("failed to fetch the resources"), err) + assert.Nil(t, execConfig.GetSecurityContext()) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) +} + func TestGetExecutionConfig(t *testing.T) { resourceManager := managerMocks.MockResourceManager{} resourceManager.GetResourceFunc = func(ctx context.Context, @@ -3642,8 +3935,10 @@ func TestGetExecutionConfig(t *testing.T) { }, nil } + applicationConfig := runtime.NewConfigurationProvider() executionManager := ExecutionManager{ resourceManager: &resourceManager, + config: applicationConfig, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{ Project: workflowIdentifier.Project, @@ -3658,7 +3953,6 @@ func TestGetExecutionConfig_Spec(t *testing.T) { resourceManager := managerMocks.MockResourceManager{} resourceManager.GetResourceFunc = func(ctx context.Context, request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { - t.Errorf("When a user specifies max parallelism in a spec, the db should not be queried") return nil, nil } applicationConfig := runtime.NewConfigurationProvider() @@ -3678,7 +3972,7 @@ func TestGetExecutionConfig_Spec(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, execConfig.MaxParallelism, int32(100)) + assert.Equal(t, int32(100), execConfig.MaxParallelism) execConfig, err = executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{ Project: workflowIdentifier.Project, @@ -3690,7 +3984,7 @@ func TestGetExecutionConfig_Spec(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, execConfig.MaxParallelism, int32(50)) + assert.Equal(t, int32(50), execConfig.MaxParallelism) resourceManager = managerMocks.MockResourceManager{} resourceManager.GetResourceFunc = func(ctx context.Context, diff --git a/flyteadmin/pkg/manager/impl/util/shared.go b/flyteadmin/pkg/manager/impl/util/shared.go index f79da3548..bed18fdd1 100644 --- a/flyteadmin/pkg/manager/impl/util/shared.go +++ b/flyteadmin/pkg/manager/impl/util/shared.go @@ -9,6 +9,7 @@ import ( "github.com/flyteorg/flyteadmin/pkg/errors" "github.com/flyteorg/flyteadmin/pkg/manager/impl/shared" "github.com/flyteorg/flyteadmin/pkg/manager/impl/validation" + "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteadmin/pkg/repositories/transformers" @@ -233,3 +234,22 @@ func GetTaskExecutionModel( } return &taskExecutionModel, nil } + +// GetMatchableResource gets matchable resource for resourceType and project - domain combination. +// Returns nil with nothing is found or return an error +func GetMatchableResource(ctx context.Context, resourceManager interfaces.ResourceInterface, resourceType admin.MatchableResource, + project, domain string) (*interfaces.ResourceResponse, error) { + matchableResource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{ + Project: project, + Domain: domain, + ResourceType: resourceType, + }) + if err != nil { + if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { + logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain with error: %v", resourceType, + project, domain, err) + return nil, err + } + } + return matchableResource, nil +} diff --git a/flyteadmin/pkg/manager/impl/util/shared_test.go b/flyteadmin/pkg/manager/impl/util/shared_test.go index 25ba0044a..fce7b682b 100644 --- a/flyteadmin/pkg/manager/impl/util/shared_test.go +++ b/flyteadmin/pkg/manager/impl/util/shared_test.go @@ -6,17 +6,19 @@ import ( "strings" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteadmin/pkg/common" commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" flyteAdminErrors "github.com/flyteorg/flyteadmin/pkg/errors" "github.com/flyteorg/flyteadmin/pkg/manager/impl/testutils" + managerInterfaces "github.com/flyteorg/flyteadmin/pkg/manager/interfaces" + managerMocks "github.com/flyteorg/flyteadmin/pkg/manager/mocks" "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces" repositoryMocks "github.com/flyteorg/flyteadmin/pkg/repositories/mocks" "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/storage" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" @@ -474,3 +476,65 @@ func TestListActiveLaunchPlanVersionsFilters(t *testing.T) { assert.Equal(t, activeExpr.Args, int32(admin.LaunchPlanState_ACTIVE)) assert.Equal(t, activeExpr.Query, testutils.StateQueryPattern) } + +func TestGetMatchableResource(t *testing.T) { + resourceType := admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG + project := "dummyProject" + domain := "dummyDomain" + t.Run("successful fetch", func(t *testing.T) { + resourceManager := &managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: project, + Domain: domain, + ResourceType: resourceType, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ + MaxParallelism: 12, + }, + }, + }, + }, nil + } + + mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism) + assert.Nil(t, err) + }) + + t.Run("not found", func(t *testing.T) { + resourceManager := &managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: project, + Domain: domain, + ResourceType: resourceType, + }) + return nil, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "resource not found") + } + + _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + assert.Nil(t, err) + }) + + t.Run("internal error", func(t *testing.T) { + resourceManager := &managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: project, + Domain: domain, + ResourceType: resourceType, + }) + return nil, flyteAdminErrors.NewFlyteAdminError(codes.Internal, "internal error") + } + + _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + assert.NotNil(t, err) + }) +} diff --git a/flyteadmin/pkg/runtime/application_config_provider.go b/flyteadmin/pkg/runtime/application_config_provider.go index 3731b7c72..6c00ae917 100644 --- a/flyteadmin/pkg/runtime/application_config_provider.go +++ b/flyteadmin/pkg/runtime/application_config_provider.go @@ -38,6 +38,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic EventVersion: 2, AsyncEventsBufferSize: 100, MaxParallelism: 25, + K8SServiceAccount: "default", }) var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{ diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 9f2a5d0ab..3b3cbf468 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -1,6 +1,8 @@ package interfaces import ( + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/config" "golang.org/x/time/rate" @@ -65,6 +67,19 @@ type ApplicationConfig struct { // This is useful to achieve fairness. Note: MapTasks are regarded as one unit, // and parallelism/concurrency of MapTasks is independent from this. MaxParallelism int32 `json:"maxParallelism"` + // Labels to apply to the execution resource. + Labels map[string]string `json:"labels,omitempty"` + // Annotations to apply to the execution resource. + Annotations map[string]string `json:"annotations,omitempty"` + + // Optional: security context override to apply this execution. + // iam_role references the fully qualified name of Identity & Access Management role to impersonate. + AssumableIamRole string `json:"assumableIamRole"` + // k8s_service_account references a kubernetes service account to impersonate. + K8SServiceAccount string `json:"k8sServiceAccount"` + + // Prefix for where offloaded data from user workflows will be written + OutputLocationPrefix string `json:"outputLocationPrefix"` } func (a *ApplicationConfig) GetRoleNameKey() string { @@ -95,6 +110,33 @@ func (a *ApplicationConfig) GetMaxParallelism() int32 { return a.MaxParallelism } +func (a *ApplicationConfig) GetRawOutputDataConfig() *admin.RawOutputDataConfig { + return &admin.RawOutputDataConfig{ + OutputLocationPrefix: a.OutputLocationPrefix, + } +} + +func (a *ApplicationConfig) GetSecurityContext() *core.SecurityContext { + return &core.SecurityContext{ + RunAs: &core.Identity{ + IamRole: a.AssumableIamRole, + K8SServiceAccount: a.K8SServiceAccount, + }, + } +} + +func (a *ApplicationConfig) GetAnnotations() *admin.Annotations { + return &admin.Annotations{ + Values: a.Annotations, + } +} + +func (a *ApplicationConfig) GetLabels() *admin.Labels { + return &admin.Labels{ + Values: a.Labels, + } +} + // This section holds common config for AWS type AWSConfig struct { Region string `json:"region"`