Skip to content

Commit

Permalink
Support ephemeral storage resource validation for tasks (flyteorg#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Aug 6, 2021
1 parent 0720cf4 commit 263d52c
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 63 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flyteidl v0.19.19
github.com/flyteorg/flyteplugins v0.5.59
github.com/flyteorg/flytepropeller v0.13.3
github.com/flyteorg/flytestdlib v0.3.27
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o=
github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo=
github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw=
github.com/flyteorg/flytepropeller v0.13.3 h1:nnO4d9w6UbgLCF9kn0M6LTkYpS/F5jEoEF22YcRmLYI=
Expand Down
71 changes: 48 additions & 23 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,45 +210,51 @@ func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.L
}

func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
systemResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet {
configResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet {
// The values below should never be used (deduce it from the request; request should be set by the time we get here).
// Setting them here just in case we end up with requests not set. We are not adding to config because it would add
// more confusion as its mostly not used.
cpuLimit := "500m"
memoryLimit := "500Mi"
resourceEntries := task.Template.GetContainer().Resources.Requests
var cpuIndex, memoryIndex = -1, -1
for idx, entry := range resourceEntries {
resourceRequestEntries := task.Template.GetContainer().Resources.Requests
var cpuIndex, memoryIndex, ephemeralStorageIndex = -1, -1, -1
for idx, entry := range resourceRequestEntries {
switch entry.Name {
case core.Resources_CPU:
cpuIndex = idx

case core.Resources_MEMORY:
memoryIndex = idx
case core.Resources_EPHEMERAL_STORAGE:
ephemeralStorageIndex = idx
}
}

if cpuIndex < 0 || memoryIndex < 0 {
logger.Errorf(ctx, "Cpu request and Memory request missing for %s", task.Template.Id)
}
taskResourceLimits := runtimeInterfaces.TaskResourceSet{}

if cpuIndex >= 0 {
cpuLimit = resourceEntries[cpuIndex].Value
// For resource values, we prefer to use the limits set in the application config over the set resource values.
if len(configResourceLimits.CPU) > 0 {
cpuLimit = configResourceLimits.CPU
} else if cpuIndex >= 0 {
cpuLimit = resourceRequestEntries[cpuIndex].Value
}
if memoryIndex >= 0 {
memoryLimit = resourceEntries[memoryIndex].Value
taskResourceLimits.CPU = cpuLimit
if len(configResourceLimits.Memory) > 0 {
memoryLimit = configResourceLimits.Memory
} else if memoryIndex >= 0 {
memoryLimit = resourceRequestEntries[memoryIndex].Value
}

taskResourceLimits := runtimeInterfaces.TaskResourceSet{CPU: cpuLimit, Memory: memoryLimit}
// Use the limits from config
if systemResourceLimits.CPU != "" {
taskResourceLimits.CPU = systemResourceLimits.CPU
taskResourceLimits.Memory = memoryLimit
if len(taskResourceLimits.GPU) == 0 && len(configResourceLimits.GPU) > 0 {
// When a platform default for GPU exists, but one isn't set in the task resources, use the platform value.
taskResourceLimits.GPU = configResourceLimits.GPU
}
if systemResourceLimits.Memory != "" {
taskResourceLimits.Memory = systemResourceLimits.Memory
}
if systemResourceLimits.GPU != "" {
taskResourceLimits.GPU = systemResourceLimits.GPU
if len(configResourceLimits.EphemeralStorage) > 0 {
taskResourceLimits.EphemeralStorage = configResourceLimits.EphemeralStorage
} else if ephemeralStorageIndex >= 0 {
taskResourceLimits.EphemeralStorage = resourceRequestEntries[ephemeralStorageIndex].Value
}

return taskResourceLimits
Expand All @@ -257,21 +263,23 @@ func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask,
func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
platformValues runtimeInterfaces.TaskResourceSet,
resourceEntries []*core.Resources_ResourceEntry, taskResourceSpec *admin.TaskResourceSpec) []*core.Resources_ResourceEntry {
var cpuIndex, memoryIndex = -1, -1
var cpuIndex, memoryIndex, ephemeralStorageindex = -1, -1, -1
for idx, entry := range resourceEntries {
switch entry.Name {
case core.Resources_CPU:
cpuIndex = idx
case core.Resources_MEMORY:
memoryIndex = idx
case core.Resources_EPHEMERAL_STORAGE:
ephemeralStorageindex = idx
}
}
if cpuIndex > 0 && memoryIndex > 0 {
if cpuIndex > 0 && memoryIndex > 0 && ephemeralStorageindex > 0 {
// nothing to do
return resourceEntries
}

if cpuIndex < 0 && platformValues.CPU != "" {
if cpuIndex < 0 && len(platformValues.CPU) > 0 {
logger.Debugf(ctx, "Setting 'cpu' for [%+v] to %s", identifier, platformValues.CPU)
cpuValue := platformValues.CPU
if taskResourceSpec != nil && len(taskResourceSpec.Cpu) > 0 {
Expand All @@ -284,7 +292,7 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
}
resourceEntries = append(resourceEntries, cpuResource)
}
if memoryIndex < 0 && platformValues.Memory != "" {
if memoryIndex < 0 && len(platformValues.Memory) > 0 {
memoryValue := platformValues.Memory
if taskResourceSpec != nil && len(taskResourceSpec.Memory) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
Expand All @@ -297,6 +305,23 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
logger.Debugf(ctx, "Setting 'memory' for [%+v] to %s", identifier, platformValues.Memory)
resourceEntries = append(resourceEntries, memoryResource)
}
if ephemeralStorageindex < 0 {
var ephemeralStorageValue string
if taskResourceSpec != nil && len(taskResourceSpec.EphemeralStorage) > 0 {
// Use the custom attributes from the database rather than the platform defaults from the application config
ephemeralStorageValue = taskResourceSpec.EphemeralStorage
} else if len(platformValues.EphemeralStorage) > 0 {
ephemeralStorageValue = platformValues.EphemeralStorage
}
if len(ephemeralStorageValue) > 0 {
ephemeralStorageResource := &core.Resources_ResourceEntry{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: ephemeralStorageValue,
}
logger.Debugf(ctx, "Setting 'ephemeralStorage' for [%+v] to %s", identifier, platformValues.EphemeralStorage)
resourceEntries = append(resourceEntries, ephemeralStorageResource)
}
}
return resourceEntries
}

Expand Down
134 changes: 104 additions & 30 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2675,31 +2675,63 @@ func TestListExecutions_LegacyModel(t *testing.T) {

func TestAssignResourcesIfUnset(t *testing.T) {
platformValues := runtimeInterfaces.TaskResourceSet{
CPU: "200m",
GPU: "8",
Memory: "200Gi",
}
taskResourceSpec := &admin.TaskResourceSpec{
Cpu: "400m",
Memory: "400Gi",
}
assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
}, platformValues, []*core.Resources_ResourceEntry{}, taskResourceSpec)
CPU: "200m",
GPU: "8",
Memory: "200Gi",
EphemeralStorage: "500Mi",
}
t.Run("Set in task resource spec", func(t *testing.T) {
taskResourceSpec := &admin.TaskResourceSpec{
Cpu: "400m",
Memory: "400Gi",
EphemeralStorage: "600Mi",
}
assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
}, platformValues, []*core.Resources_ResourceEntry{}, taskResourceSpec)

assert.EqualValues(t, []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: taskResourceSpec.Cpu,
},
{
Name: core.Resources_MEMORY,
Value: taskResourceSpec.Memory,
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: taskResourceSpec.EphemeralStorage,
},
}, assignedResources)
})
t.Run("Unset in task resource spec", func(t *testing.T) {
assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
}, platformValues, []*core.Resources_ResourceEntry{}, &admin.TaskResourceSpec{})

assert.EqualValues(t, []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: platformValues.CPU,
},
{
Name: core.Resources_MEMORY,
Value: platformValues.Memory,
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: platformValues.EphemeralStorage,
},
}, assignedResources)
})

assert.EqualValues(t, []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: taskResourceSpec.Cpu,
},
{
Name: core.Resources_MEMORY,
Value: taskResourceSpec.Memory,
},
}, assignedResources)
}

func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Expand All @@ -2722,6 +2754,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "2",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "300Mi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -2732,6 +2768,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "100Mi",
},
},
}
checkTaskRequestsLessThanLimits(ctx, identifier, resources)
Expand All @@ -2745,6 +2785,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "100Mi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -2755,6 +2799,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "100Mi",
},
},
}, resources))
})
Expand All @@ -2769,6 +2817,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "100Mi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -2779,6 +2831,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1.5",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "200Mi",
},
},
}
checkTaskRequestsLessThanLimits(ctx, identifier, resources)
Expand All @@ -2792,6 +2848,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "100Mi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -2802,6 +2862,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "1.5",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "200Mi",
},
},
}, resources))
})
Expand Down Expand Up @@ -2833,14 +2897,16 @@ func TestSetDefaults(t *testing.T) {

taskConfig := runtimeMocks.MockTaskResourceConfiguration{}
taskConfig.Defaults = runtimeInterfaces.TaskResourceSet{
CPU: "200m",
GPU: "8",
Memory: "200Gi",
CPU: "200m",
GPU: "8",
Memory: "200Gi",
EphemeralStorage: "500Mi",
}
taskConfig.Limits = runtimeInterfaces.TaskResourceSet{
CPU: "300m",
GPU: "8",
Memory: "500Gi",
CPU: "300m",
GPU: "8",
Memory: "500Gi",
EphemeralStorage: "501Mi",
}
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
Expand All @@ -2859,6 +2925,10 @@ func TestSetDefaults(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "500Mi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Expand All @@ -2869,6 +2939,10 @@ func TestSetDefaults(t *testing.T) {
Name: core.Resources_MEMORY,
Value: "500Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "501Mi",
},
},
},
},
Expand Down
Loading

0 comments on commit 263d52c

Please sign in to comment.