Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

updating dask plugin to use container resources with overrides #351

Merged
merged 4 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,27 +298,39 @@ func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.

container.Env = DecorateEnvVars(ctx, container.Env, parameters.TaskExecMetadata.GetEnvironmentVariables(), parameters.TaskExecMetadata.GetTaskExecutionID())

// retrieve platformResources and overrideResources to use when aggregating container resources
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
if platformResources == nil {
platformResources = &v1.ResourceRequirements{}
}

var overrideResources *v1.ResourceRequirements
if parameters.TaskExecMetadata.GetOverrides() != nil && parameters.TaskExecMetadata.GetOverrides().GetResources() != nil {
res := parameters.TaskExecMetadata.GetOverrides().GetResources()
platformResources := parameters.TaskExecMetadata.GetPlatformResources()
if platformResources == nil {
platformResources = &v1.ResourceRequirements{}
}
overrideResources = parameters.TaskExecMetadata.GetOverrides().GetResources()
}
if overrideResources == nil {
overrideResources = &v1.ResourceRequirements{}
}

logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
" Resources [%v] with mode [%v]", res, platformResources, container.Resources, mode)

switch mode {
case ResourceCustomizationModeAssignResources:
container.Resources = ApplyResourceOverrides(*res, *platformResources, assignIfUnset)
case ResourceCustomizationModeMergeExistingResources:
MergeResources(*res, &container.Resources)
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset)
case ResourceCustomizationModeEnsureExistingResourcesInRange:
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset)
}
logger.Infof(ctx, "ApplyResourceOverrides with Resources [%v], Platform Resources [%v] and Container"+
" Resources [%v] with mode [%v]", overrideResources, platformResources, container.Resources, mode)

logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources)
switch mode {
case ResourceCustomizationModeAssignResources:
// this will use overrideResources to set container resources and fallback to the platformResource values.
// it is important to note that this ignores the existing container.Resources values.
container.Resources = ApplyResourceOverrides(*overrideResources, *platformResources, assignIfUnset)
case ResourceCustomizationModeMergeExistingResources:
// this merges the overrideResources on top of the existing container.Resources to apply the overrides, then it
// uses the platformResource values to set defaults for any missing resource.
MergeResources(*overrideResources, &container.Resources)
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, assignIfUnset)
case ResourceCustomizationModeEnsureExistingResourcesInRange:
// this use the platformResources defaults to ensure that the container.Resources values are within the
// platformResources limits. it will not override any existing container.Resources values.
container.Resources = ApplyResourceOverrides(container.Resources, *platformResources, !assignIfUnset)
}

logger.Infof(ctx, "Adjusted container resources [%v]", container.Resources)
return nil
}
15 changes: 8 additions & 7 deletions go/tasks/plugins/k8s/dask/dask.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext,
}
}

defaultResources := executionMetadata.GetPlatformResources()
if executionMetadata.GetOverrides() != nil && executionMetadata.GetOverrides().GetResources() != nil {
defaultResources = executionMetadata.GetOverrides().GetResources()
containerResources, err := flytek8s.ToK8sResourceRequirements(defaultContainerSpec.GetResources())
if err != nil {
return nil, err
}

jobRunnerContainer := v1.Container{
Name: "job-runner",
Image: defaultImage,
Args: defaultContainerSpec.GetArgs(),
Env: defaultEnvVars,
Resources: *defaultResources,
Resources: *containerResources,
}

templateParameters := template.Parameters{
Expand All @@ -79,15 +79,16 @@ func getDefaults(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext,
OutputPath: taskCtx.OutputWriter(),
Task: taskCtx.TaskReader(),
}
err := flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters, flytek8s.ResourceCustomizationModeAssignResources, &jobRunnerContainer)
if err != nil {
if err = flytek8s.AddFlyteCustomizationsToContainer(ctx, templateParameters,
flytek8s.ResourceCustomizationModeMergeExistingResources, &jobRunnerContainer); err != nil {

return nil, err
}

return &defaults{
Image: defaultImage,
JobRunnerContainer: jobRunnerContainer,
Resources: defaultResources,
Resources: &jobRunnerContainer.Resources,
Env: defaultEnvVars,
Annotations: executionMetadata.GetAnnotations(),
IsInterruptible: executionMetadata.IsInterruptible(),
Expand Down
14 changes: 9 additions & 5 deletions go/tasks/plugins/k8s/dask/dask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
v1.ResourceMemory: resource.MustParse("17G"),
},
}
defaultResources = v1.ResourceRequirements{
Requests: testPlatformResources.Requests,
Limits: testPlatformResources.Requests,
}
)

func dummyDaskJob(status daskAPI.JobStatus) *daskAPI.DaskJob {
Expand Down Expand Up @@ -199,7 +203,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
assert.Equal(t, "job-runner", jobSpec.Containers[0].Name)
assert.Equal(t, defaultTestImage, jobSpec.Containers[0].Image)
assert.Equal(t, testArgs, jobSpec.Containers[0].Args)
assert.Equal(t, testPlatformResources, jobSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, jobSpec.Containers[0].Resources)
assert.Equal(t, defaultTolerations, jobSpec.Tolerations)
assert.Equal(t, defaultNodeSelector, jobSpec.NodeSelector)
assert.Equal(t, defaultAffinity, jobSpec.Affinity)
Expand All @@ -226,7 +230,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
}
assert.Equal(t, v1.RestartPolicyNever, schedulerSpec.RestartPolicy)
assert.Equal(t, defaultTestImage, schedulerSpec.Containers[0].Image)
assert.Equal(t, testPlatformResources, schedulerSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, schedulerSpec.Containers[0].Resources)
assert.Equal(t, []string{"dask-scheduler"}, schedulerSpec.Containers[0].Args)
assert.Equal(t, expectedPorts, schedulerSpec.Containers[0].Ports)
assert.Equal(t, testEnvVars, schedulerSpec.Containers[0].Env)
Expand Down Expand Up @@ -263,7 +267,7 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
assert.Equal(t, "dask-worker", workerSpec.Containers[0].Name)
assert.Equal(t, v1.PullIfNotPresent, workerSpec.Containers[0].ImagePullPolicy)
assert.Equal(t, defaultTestImage, workerSpec.Containers[0].Image)
assert.Equal(t, testPlatformResources, workerSpec.Containers[0].Resources)
assert.Equal(t, defaultResources, workerSpec.Containers[0].Resources)
assert.Equal(t, testEnvVars, workerSpec.Containers[0].Env)
assert.Equal(t, defaultTolerations, workerSpec.Tolerations)
assert.Equal(t, defaultNodeSelector, workerSpec.NodeSelector)
Expand All @@ -273,9 +277,9 @@ func TestBuildResourceDaskHappyPath(t *testing.T) {
"--name",
"$(DASK_WORKER_NAME)",
"--nthreads",
"5",
"4",
"--memory-limit",
"17G",
"1Gi",
}, workerSpec.Containers[0].Args)
}

Expand Down