Skip to content

Commit

Permalink
Apply resource limits on container without resource (flyteorg#222)
Browse files Browse the repository at this point in the history
Signed-off-by: tnsetting <[email protected]>
  • Loading branch information
tnsetting authored Jul 28, 2021
1 parent be0aa2c commit 0720cf4
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 12 deletions.
30 changes: 26 additions & 4 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.L
return inputsURI, nil
}

func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask) runtimeInterfaces.TaskResourceSet {
func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
systemResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet {
// The values below should never be used (deduce it from the request; request should be set by the time we get here).
// Setting them here just in case we end up with requests not set. We are not adding to config because it would add
// more confusion as its mostly not used.
Expand Down Expand Up @@ -238,7 +239,19 @@ func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask) runti
memoryLimit = resourceEntries[memoryIndex].Value
}

return runtimeInterfaces.TaskResourceSet{CPU: cpuLimit, Memory: memoryLimit}
taskResourceLimits := runtimeInterfaces.TaskResourceSet{CPU: cpuLimit, Memory: memoryLimit}
// Use the limits from config
if systemResourceLimits.CPU != "" {
taskResourceLimits.CPU = systemResourceLimits.CPU
}
if systemResourceLimits.Memory != "" {
taskResourceLimits.Memory = systemResourceLimits.Memory
}
if systemResourceLimits.GPU != "" {
taskResourceLimits.GPU = systemResourceLimits.GPU
}

return taskResourceLimits
}

func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
Expand Down Expand Up @@ -332,11 +345,20 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
logger.Warningf(ctx, "Can't set default resources for nil task.")
return
}
if task.Template == nil || task.Template.GetContainer() == nil || task.Template.GetContainer().Resources == nil {
if task.Template == nil || task.Template.GetContainer() == nil {
// Nothing to do
logger.Debugf(ctx, "Not setting default resources for task [%+v], no container resources found to check", task)
return
}

if task.Template.GetContainer().Resources == nil {
// In case of no resources on the container, create empty requests and limits
// so the container will still have resources configure properly
task.Template.GetContainer().Resources = &core.Resources{
Requests: []*core.Resources_ResourceEntry{},
Limits: []*core.Resources_ResourceEntry{},
}
}
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: task.Template.Id.Project,
Domain: task.Template.Id.Domain,
Expand All @@ -362,7 +384,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
taskResourceSpec = resource.Attributes.GetTaskResourceAttributes().Limits
}
task.Template.GetContainer().Resources.Limits = assignResourcesIfUnset(
ctx, task.Template.Id, createTaskDefaultLimits(ctx, task), task.Template.GetContainer().Resources.Limits,
ctx, task.Template.Id, createTaskDefaultLimits(ctx, task, m.config.TaskResourceConfiguration().GetLimits()), task.Template.GetContainer().Resources.Limits,
taskResourceSpec)
checkTaskRequestsLessThanLimits(ctx, task.Template.Id, task.Template.GetContainer().Resources)
}
Expand Down
72 changes: 64 additions & 8 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ var requestedAt = time.Now()
var testCluster = "C1"
var outputURI = "output uri"

var resourceDefaults = runtimeInterfaces.TaskResourceSet{
CPU: "200m",
Memory: "200Gi",
}
var resourceLimits = runtimeInterfaces.TaskResourceSet{
CPU: "300m",
Memory: "500Gi",
}

func getLegacySpec() *admin.ExecutionSpec {
executionRequest := testutils.GetExecutionRequest()
legacySpec := executionRequest.Spec
Expand Down Expand Up @@ -113,7 +122,8 @@ func getMockExecutionsConfigProvider() runtimeInterfaces.Configuration {
testutils.GetApplicationConfigWithDefaultDomains(),
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil, nil, nil, nil)
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, nil)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
return mockExecutionsConfigProvider
Expand Down Expand Up @@ -229,6 +239,28 @@ func TestCreateExecution(t *testing.T) {
})
setDefaultLpCallbackForExecTest(repository)
mockExecutor := workflowengineMocks.NewMockExecutor()
resources := &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "300m",
},
{
Name: core.Resources_MEMORY,
Value: "500Gi",
},
},
}
mockExecutor.(*workflowengineMocks.MockExecutor).SetExecuteWorkflowCallback(
func(inputs workflowengineInterfaces.ExecuteWorkflowInput) (*workflowengineInterfaces.ExecutionInfo, error) {
assert.EqualValues(t, map[string]string{
Expand All @@ -241,6 +273,13 @@ func TestCreateExecution(t *testing.T) {
"annotation4": "4",
}, inputs.Annotations)
assert.EqualValues(t, 10*time.Minute, inputs.QueueingBudget)
tasks := inputs.WfClosure.GetTasks()
for _, task := range tasks {
assert.EqualValues(t, resources.Requests,
task.Template.GetContainer().Resources.Requests)
assert.EqualValues(t, resources.Limits,
task.Template.GetContainer().Resources.Limits)
}
return &workflowengineInterfaces.ExecutionInfo{
Cluster: testCluster,
}, nil
Expand Down Expand Up @@ -419,7 +458,8 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
Tags: []string{"tag"},
},
}),
nil, nil, nil, nil)
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, nil)
configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
mockExecutor := workflowengineMocks.NewMockExecutor()
Expand Down Expand Up @@ -2823,11 +2863,11 @@ func TestSetDefaults(t *testing.T) {
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
Value: "300m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
Value: "500Gi",
},
},
},
Expand Down Expand Up @@ -2890,7 +2930,7 @@ func TestSetDefaults_MissingDefaults(t *testing.T) {
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
Value: "300m",
},
{
Name: core.Resources_MEMORY,
Expand Down Expand Up @@ -2923,10 +2963,26 @@ func TestCreateTaskDefaultLimits(t *testing.T) {
},
},
}
t.Run("missing_limits_in_config", func(t *testing.T) {
limits := runtimeInterfaces.TaskResourceSet{}

defaultLimits := createTaskDefaultLimits(context.Background(), task)
assert.Equal(t, "200Mi", defaultLimits.Memory)
assert.Equal(t, "200m", defaultLimits.CPU)
defaultLimits := createTaskDefaultLimits(context.Background(), task, limits)
assert.Equal(t, "200Mi", defaultLimits.Memory)
assert.Equal(t, "200m", defaultLimits.CPU)
})
t.Run("use_limits_from_config", func(t *testing.T) {
defaultLimits := createTaskDefaultLimits(context.Background(), task, resourceLimits)
assert.Equal(t, "500Gi", defaultLimits.Memory)
assert.Equal(t, "300m", defaultLimits.CPU)
})
t.Run("use_limits_from_config", func(t *testing.T) {
limits := runtimeInterfaces.TaskResourceSet{
CPU: "300m",
}
defaultLimits := createTaskDefaultLimits(context.Background(), task, limits)
assert.Equal(t, "200Mi", defaultLimits.Memory)
assert.Equal(t, "300m", defaultLimits.CPU)
})
}

func TestCreateSingleTaskExecution(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/manager/impl/testutils/mock_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ func GetValidTaskRequest() admin.TaskCreateRequest {
},
Spec: &admin.TaskSpec{
Template: &core.TaskTemplate{
Id: &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
},
Type: "type",
Metadata: &core.TaskMetadata{
Runtime: &core.RuntimeMetadata{
Expand Down

0 comments on commit 0720cf4

Please sign in to comment.