Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Project settings (#480)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
wild-endeavor authored Oct 3, 2022
1 parent 6911752 commit b47e646
Show file tree
Hide file tree
Showing 24 changed files with 1,236 additions and 116 deletions.
6 changes: 3 additions & 3 deletions flyteadmin/flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ server:
httpPort: 8088
grpcPort: 8089
grpcServerReflection: true
kube-config: /Users/haythamabuelfutuh/kubeconfig/k3s/k3s.yaml
kube-config: /Users/ytong/.kube/config
security:
secure: false
useAuth: false
Expand Down Expand Up @@ -66,7 +66,7 @@ database:
port: 5432
username: postgres
host: localhost
dbname: postgres
dbname: flyteadmin
options: "sslmode=disable"
scheduler:
eventScheduler:
Expand Down Expand Up @@ -122,7 +122,7 @@ storage:
auth-type: accesskey
secret-key: miniostorage
disable-ssl: true
endpoint: "http://localhost:9000"
endpoint: "http://localhost:30084"
region: my-region
signedUrl:
stowConfigOverride:
Expand Down
89 changes: 27 additions & 62 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/benbjohnson/clock"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/shared"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/wrappers"
)

const childContainerQueueKey = "child_queue"
Expand Down Expand Up @@ -434,59 +433,6 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
return parentNodeExecutionID, sourceExecutionID, nil
}

// WorkflowExecutionConfigInterface is used as common interface for capturing the common behavior catering to the needs
// of fetching the WorkflowExecutionConfig across LaunchPlanSpec, ExecutionCreateRequest
// MatchableResource_WORKFLOW_EXECUTION_CONFIG and ApplicationConfig
type WorkflowExecutionConfigInterface interface {
// GetMaxParallelism Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
GetMaxParallelism() int32
// GetRawOutputDataConfig Encapsulates user settings pertaining to offloaded data (i.e. Blobs, Schema, query data, etc.).
GetRawOutputDataConfig() *admin.RawOutputDataConfig
// GetSecurityContext Indicates security context permissions for executions triggered with this matchable attribute.
GetSecurityContext() *core.SecurityContext
// GetAnnotations Custom annotations to be applied to a triggered execution resource.
GetAnnotations() *admin.Annotations
// GetLabels Custom labels to be applied to a triggered execution resource.
GetLabels() *admin.Labels
// GetInterruptible indicates a workflow should be flagged as interruptible for a single execution. If omitted, the workflow's default is used.
GetInterruptible() *wrappers.BoolValue
}

// Merge into workflowExecConfig from spec and return true if any value has been changed
func mergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec WorkflowExecutionConfigInterface) admin.WorkflowExecutionConfig {
if workflowExecConfig.GetMaxParallelism() == 0 && spec.GetMaxParallelism() > 0 {
workflowExecConfig.MaxParallelism = spec.GetMaxParallelism()
}

if workflowExecConfig.GetSecurityContext() == nil && spec.GetSecurityContext() != nil {
if spec.GetSecurityContext().GetRunAs() != nil &&
(len(spec.GetSecurityContext().GetRunAs().GetK8SServiceAccount()) > 0 ||
len(spec.GetSecurityContext().GetRunAs().GetIamRole()) > 0) {
workflowExecConfig.SecurityContext = spec.GetSecurityContext()
}
}
// Launchplan spec has label, annotation and rawOutputDataConfig initialized with empty values.
// Hence we do a deep check in the following conditions before assignment
if (workflowExecConfig.GetRawOutputDataConfig() == nil ||
len(workflowExecConfig.GetRawOutputDataConfig().GetOutputLocationPrefix()) == 0) &&
(spec.GetRawOutputDataConfig() != nil && len(spec.GetRawOutputDataConfig().OutputLocationPrefix) > 0) {
workflowExecConfig.RawOutputDataConfig = spec.GetRawOutputDataConfig()
}
if (workflowExecConfig.GetLabels() == nil || len(workflowExecConfig.GetLabels().Values) == 0) &&
(spec.GetLabels() != nil && len(spec.GetLabels().Values) > 0) {
workflowExecConfig.Labels = spec.GetLabels()
}
if (workflowExecConfig.GetAnnotations() == nil || len(workflowExecConfig.GetAnnotations().Values) == 0) &&
(spec.GetAnnotations() != nil && len(spec.GetAnnotations().Values) > 0) {
workflowExecConfig.Annotations = spec.GetAnnotations()
}

if workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil {
workflowExecConfig.Interruptible = spec.GetInterruptible()
}
return workflowExecConfig
}

// Produces execution-time attributes for workflow execution.
// Defaults to overridable execution values set in the execution create request, then looks at the launch plan values
// (if any) before defaulting to values set in the matchable resource db and further if matchable resources don't
Expand All @@ -495,30 +441,49 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) {

workflowExecConfig := admin.WorkflowExecutionConfig{}
// merge the request spec into workflowExecConfig
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig, request.Spec)
// Merge the request spec into workflowExecConfig
workflowExecConfig = util.MergeIntoExecConfig(workflowExecConfig, request.Spec)

var workflowName string
if launchPlan != nil && launchPlan.Spec != nil {
// merge the launch plan spec into workflowExecConfig
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec)
// Merge the launch plan spec into workflowExecConfig
workflowExecConfig = util.MergeIntoExecConfig(workflowExecConfig, launchPlan.Spec)
if launchPlan.Spec.WorkflowId != nil {
workflowName = launchPlan.Spec.WorkflowId.Name
}
}

// This will get the most specific Workflow Execution Config.
matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager,
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain, workflowName)
if err != nil {
return nil, err
}

if matchableResource != nil && matchableResource.Attributes.GetWorkflowExecutionConfig() != nil {
// merge the matchable resource workflow execution config into workflowExecConfig
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig,
workflowExecConfig = util.MergeIntoExecConfig(workflowExecConfig,
matchableResource.Attributes.GetWorkflowExecutionConfig())
}

// To match what the front-end will display to the user, we need to do the project level query too.
// This searches only for a direct match, and will not merge in system config level defaults like the
// GetProjectAttributes call does, since that's done below.
// The reason we need to do the project level query is for the case where some configs (say max parallelism)
// is set on the project level, but other items (say service account) is set on the project-domain level.
// In this case you want to use the project-domain service account, the project-level max parallelism, and
// system level defaults for the rest.
// See FLYTE-2322 for more background information.
projectMatchableResource, err := util.GetMatchableResource(ctx, m.resourceManager,
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, "", "")
if err != nil {
return nil, err
}
if projectMatchableResource != nil && projectMatchableResource.Attributes.GetWorkflowExecutionConfig() != nil {
// merge the matchable resource workflow execution config into workflowExecConfig
workflowExecConfig = util.MergeIntoExecConfig(workflowExecConfig,
projectMatchableResource.Attributes.GetWorkflowExecutionConfig())
}

// Backward compatibility changes to get security context from auth role.
// Older authRole or auth fields in the launchplan spec or execution request need to be used over application defaults.
// This portion of the code makes sure if newer way of setting security context is empty i.e
Expand All @@ -530,8 +495,8 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
len(resolvedSecurityCtx.GetRunAs().GetIamRole()) > 0) {
workflowExecConfig.SecurityContext = resolvedSecurityCtx
}
// merge the application config into workflowExecConfig. If even the deprecated fields are not set
workflowExecConfig = mergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig())
// Merge the application config into workflowExecConfig. If even the deprecated fields are not set
workflowExecConfig = util.MergeIntoExecConfig(workflowExecConfig, m.config.ApplicationConfiguration().GetTopLevelConfig())
// Explicitly set the security context if its nil since downstream we expect this settings to be available
if workflowExecConfig.GetSecurityContext() == nil {
workflowExecConfig.SecurityContext = &core.SecurityContext{
Expand Down
85 changes: 65 additions & 20 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4026,18 +4026,21 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
// two requests will be made, one with empty domain and one with filled in domain
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
return &managerInterfaces.ResourceResponse{
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)
projectDomainResponse := &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: rmMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: rmInterruptible},
Labels: &admin.Labels{Values: rmLabels},
Annotations: &admin.Annotations{Values: rmAnnotations},
RawOutputDataConfig: &admin.RawOutputDataConfig{
OutputLocationPrefix: rmOutputLocationPrefix,
Expand All @@ -4050,7 +4053,24 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
},
},
},
}, nil
}

projectResponse := &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
Labels: &admin.Labels{Values: rmLabels},
RawOutputDataConfig: &admin.RawOutputDataConfig{
OutputLocationPrefix: "shouldnotbeused",
},
},
},
},
}
if request.Domain == "" {
return projectResponse, nil
}
return projectDomainResponse, nil
}

t.Run("request with full config", func(t *testing.T) {
Expand Down Expand Up @@ -4234,11 +4254,15 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("matchable resource partial config", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)

return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down Expand Up @@ -4275,11 +4299,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("matchable resource with no config", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down Expand Up @@ -4308,11 +4335,15 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("fetch security context from deprecated config", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)

return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down Expand Up @@ -4345,12 +4376,17 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("matchable resource workflow resource", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Workflow: workflowIdentifier.Name,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
Workflow: workflowIdentifier.Name,
}, {Project: workflowIdentifier.Project,
Domain: "",
Workflow: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)

return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down Expand Up @@ -4391,11 +4427,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("matchable resource failure", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)
return nil, fmt.Errorf("failed to fetch the resources")
}
request := &admin.ExecutionCreateRequest{
Expand All @@ -4417,11 +4456,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
t.Run("application configuration", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down Expand Up @@ -4580,11 +4622,14 @@ func TestGetExecutionConfig(t *testing.T) {
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
assert.Contains(t, []managerInterfaces.ResourceRequest{{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
}, {Project: workflowIdentifier.Project,
Domain: "",
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG},
}, request)
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
Expand Down
Loading

0 comments on commit b47e646

Please sign in to comment.