diff --git a/go.mod b/go.mod index f8fbfb4d95..cf6ad54f78 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.19.14 + github.com/flyteorg/flyteidl v0.19.19 github.com/flyteorg/flyteplugins v0.5.59 github.com/flyteorg/flytepropeller v0.13.3 github.com/flyteorg/flytestdlib v0.3.27 diff --git a/go.sum b/go.sum index ff6d387d0f..a13b5014e7 100644 --- a/go.sum +++ b/go.sum @@ -304,8 +304,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI= github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.19.19 h1:jv93YLz0Bq++sH9r0AOhdNaHFdXSCWjsXJoLOIduA2o= +github.com/flyteorg/flyteidl v0.19.19/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.5.59 h1:Uw1xlrlx5rSTpdTMwJTo7mbqHI7X7p7CFVm3473iRjo= github.com/flyteorg/flyteplugins v0.5.59/go.mod h1:nesnW7pJhXEysFQg9TnSp36ao33ie0oA/TI4sYPaeyw= github.com/flyteorg/flytepropeller v0.13.3 h1:nnO4d9w6UbgLCF9kn0M6LTkYpS/F5jEoEF22YcRmLYI= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index c55cc12ba8..2c924a63c3 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -210,45 +210,51 @@ func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.L } func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask, - systemResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet { + configResourceLimits runtimeInterfaces.TaskResourceSet) runtimeInterfaces.TaskResourceSet { // The values below should never be used (deduce it from the request; request should be set by the time we get here). // Setting them here just in case we end up with requests not set. We are not adding to config because it would add // more confusion as its mostly not used. cpuLimit := "500m" memoryLimit := "500Mi" - resourceEntries := task.Template.GetContainer().Resources.Requests - var cpuIndex, memoryIndex = -1, -1 - for idx, entry := range resourceEntries { + resourceRequestEntries := task.Template.GetContainer().Resources.Requests + var cpuIndex, memoryIndex, ephemeralStorageIndex = -1, -1, -1 + for idx, entry := range resourceRequestEntries { switch entry.Name { case core.Resources_CPU: cpuIndex = idx - case core.Resources_MEMORY: memoryIndex = idx + case core.Resources_EPHEMERAL_STORAGE: + ephemeralStorageIndex = idx } } if cpuIndex < 0 || memoryIndex < 0 { logger.Errorf(ctx, "Cpu request and Memory request missing for %s", task.Template.Id) } + taskResourceLimits := runtimeInterfaces.TaskResourceSet{} - if cpuIndex >= 0 { - cpuLimit = resourceEntries[cpuIndex].Value + // For resource values, we prefer to use the limits set in the application config over the set resource values. + if len(configResourceLimits.CPU) > 0 { + cpuLimit = configResourceLimits.CPU + } else if cpuIndex >= 0 { + cpuLimit = resourceRequestEntries[cpuIndex].Value } - if memoryIndex >= 0 { - memoryLimit = resourceEntries[memoryIndex].Value + taskResourceLimits.CPU = cpuLimit + if len(configResourceLimits.Memory) > 0 { + memoryLimit = configResourceLimits.Memory + } else if memoryIndex >= 0 { + memoryLimit = resourceRequestEntries[memoryIndex].Value } - - taskResourceLimits := runtimeInterfaces.TaskResourceSet{CPU: cpuLimit, Memory: memoryLimit} - // Use the limits from config - if systemResourceLimits.CPU != "" { - taskResourceLimits.CPU = systemResourceLimits.CPU + taskResourceLimits.Memory = memoryLimit + if len(taskResourceLimits.GPU) == 0 && len(configResourceLimits.GPU) > 0 { + // When a platform default for GPU exists, but one isn't set in the task resources, use the platform value. + taskResourceLimits.GPU = configResourceLimits.GPU } - if systemResourceLimits.Memory != "" { - taskResourceLimits.Memory = systemResourceLimits.Memory - } - if systemResourceLimits.GPU != "" { - taskResourceLimits.GPU = systemResourceLimits.GPU + if len(configResourceLimits.EphemeralStorage) > 0 { + taskResourceLimits.EphemeralStorage = configResourceLimits.EphemeralStorage + } else if ephemeralStorageIndex >= 0 { + taskResourceLimits.EphemeralStorage = resourceRequestEntries[ephemeralStorageIndex].Value } return taskResourceLimits @@ -257,21 +263,23 @@ func createTaskDefaultLimits(ctx context.Context, task *core.CompiledTask, func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier, platformValues runtimeInterfaces.TaskResourceSet, resourceEntries []*core.Resources_ResourceEntry, taskResourceSpec *admin.TaskResourceSpec) []*core.Resources_ResourceEntry { - var cpuIndex, memoryIndex = -1, -1 + var cpuIndex, memoryIndex, ephemeralStorageindex = -1, -1, -1 for idx, entry := range resourceEntries { switch entry.Name { case core.Resources_CPU: cpuIndex = idx case core.Resources_MEMORY: memoryIndex = idx + case core.Resources_EPHEMERAL_STORAGE: + ephemeralStorageindex = idx } } - if cpuIndex > 0 && memoryIndex > 0 { + if cpuIndex > 0 && memoryIndex > 0 && ephemeralStorageindex > 0 { // nothing to do return resourceEntries } - if cpuIndex < 0 && platformValues.CPU != "" { + if cpuIndex < 0 && len(platformValues.CPU) > 0 { logger.Debugf(ctx, "Setting 'cpu' for [%+v] to %s", identifier, platformValues.CPU) cpuValue := platformValues.CPU if taskResourceSpec != nil && len(taskResourceSpec.Cpu) > 0 { @@ -284,7 +292,7 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier, } resourceEntries = append(resourceEntries, cpuResource) } - if memoryIndex < 0 && platformValues.Memory != "" { + if memoryIndex < 0 && len(platformValues.Memory) > 0 { memoryValue := platformValues.Memory if taskResourceSpec != nil && len(taskResourceSpec.Memory) > 0 { // Use the custom attributes from the database rather than the platform defaults from the application config @@ -297,6 +305,23 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier, logger.Debugf(ctx, "Setting 'memory' for [%+v] to %s", identifier, platformValues.Memory) resourceEntries = append(resourceEntries, memoryResource) } + if ephemeralStorageindex < 0 { + var ephemeralStorageValue string + if taskResourceSpec != nil && len(taskResourceSpec.EphemeralStorage) > 0 { + // Use the custom attributes from the database rather than the platform defaults from the application config + ephemeralStorageValue = taskResourceSpec.EphemeralStorage + } else if len(platformValues.EphemeralStorage) > 0 { + ephemeralStorageValue = platformValues.EphemeralStorage + } + if len(ephemeralStorageValue) > 0 { + ephemeralStorageResource := &core.Resources_ResourceEntry{ + Name: core.Resources_EPHEMERAL_STORAGE, + Value: ephemeralStorageValue, + } + logger.Debugf(ctx, "Setting 'ephemeralStorage' for [%+v] to %s", identifier, platformValues.EphemeralStorage) + resourceEntries = append(resourceEntries, ephemeralStorageResource) + } + } return resourceEntries } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index a194ae88ce..280f0b6e71 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -2675,31 +2675,63 @@ func TestListExecutions_LegacyModel(t *testing.T) { func TestAssignResourcesIfUnset(t *testing.T) { platformValues := runtimeInterfaces.TaskResourceSet{ - CPU: "200m", - GPU: "8", - Memory: "200Gi", - } - taskResourceSpec := &admin.TaskResourceSpec{ - Cpu: "400m", - Memory: "400Gi", - } - assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{ - Project: "project", - Domain: "domain", - Name: "name", - Version: "version", - }, platformValues, []*core.Resources_ResourceEntry{}, taskResourceSpec) + CPU: "200m", + GPU: "8", + Memory: "200Gi", + EphemeralStorage: "500Mi", + } + t.Run("Set in task resource spec", func(t *testing.T) { + taskResourceSpec := &admin.TaskResourceSpec{ + Cpu: "400m", + Memory: "400Gi", + EphemeralStorage: "600Mi", + } + assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", + }, platformValues, []*core.Resources_ResourceEntry{}, taskResourceSpec) + + assert.EqualValues(t, []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: taskResourceSpec.Cpu, + }, + { + Name: core.Resources_MEMORY, + Value: taskResourceSpec.Memory, + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: taskResourceSpec.EphemeralStorage, + }, + }, assignedResources) + }) + t.Run("Unset in task resource spec", func(t *testing.T) { + assignedResources := assignResourcesIfUnset(context.Background(), &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", + }, platformValues, []*core.Resources_ResourceEntry{}, &admin.TaskResourceSpec{}) + + assert.EqualValues(t, []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: platformValues.CPU, + }, + { + Name: core.Resources_MEMORY, + Value: platformValues.Memory, + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: platformValues.EphemeralStorage, + }, + }, assignedResources) + }) - assert.EqualValues(t, []*core.Resources_ResourceEntry{ - { - Name: core.Resources_CPU, - Value: taskResourceSpec.Cpu, - }, - { - Name: core.Resources_MEMORY, - Value: taskResourceSpec.Memory, - }, - }, assignedResources) } func TestCheckTaskRequestsLessThanLimits(t *testing.T) { @@ -2722,6 +2754,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "2", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "300Mi", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -2732,6 +2768,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "100Mi", + }, }, } checkTaskRequestsLessThanLimits(ctx, identifier, resources) @@ -2745,6 +2785,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "100Mi", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -2755,6 +2799,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "100Mi", + }, }, }, resources)) }) @@ -2769,6 +2817,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "100Mi", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -2779,6 +2831,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1.5", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "200Mi", + }, }, } checkTaskRequestsLessThanLimits(ctx, identifier, resources) @@ -2792,6 +2848,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "100Mi", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -2802,6 +2862,10 @@ func TestCheckTaskRequestsLessThanLimits(t *testing.T) { Name: core.Resources_MEMORY, Value: "1.5", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "200Mi", + }, }, }, resources)) }) @@ -2833,14 +2897,16 @@ func TestSetDefaults(t *testing.T) { taskConfig := runtimeMocks.MockTaskResourceConfiguration{} taskConfig.Defaults = runtimeInterfaces.TaskResourceSet{ - CPU: "200m", - GPU: "8", - Memory: "200Gi", + CPU: "200m", + GPU: "8", + Memory: "200Gi", + EphemeralStorage: "500Mi", } taskConfig.Limits = runtimeInterfaces.TaskResourceSet{ - CPU: "300m", - GPU: "8", - Memory: "500Gi", + CPU: "300m", + GPU: "8", + Memory: "500Gi", + EphemeralStorage: "501Mi", } mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig, @@ -2859,6 +2925,10 @@ func TestSetDefaults(t *testing.T) { Name: core.Resources_MEMORY, Value: "200Gi", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "500Mi", + }, }, Limits: []*core.Resources_ResourceEntry{ { @@ -2869,6 +2939,10 @@ func TestSetDefaults(t *testing.T) { Name: core.Resources_MEMORY, Value: "500Gi", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "501Mi", + }, }, }, }, diff --git a/pkg/manager/impl/validation/task_validator.go b/pkg/manager/impl/validation/task_validator.go index d1bd6f8c8f..772f7f5bcf 100644 --- a/pkg/manager/impl/validation/task_validator.go +++ b/pkg/manager/impl/validation/task_validator.go @@ -111,6 +111,10 @@ func taskResourceSetToMap( gpuQuantity := resource.MustParse(resourceSet.GPU) resourceMap[core.Resources_GPU] = &gpuQuantity } + if resourceSet.EphemeralStorage != "" { + ephemeralStorageQuantity := resource.MustParse(resourceSet.EphemeralStorage) + resourceMap[core.Resources_EPHEMERAL_STORAGE] = &ephemeralStorageQuantity + } return resourceMap } @@ -158,6 +162,11 @@ func requestedResourcesToQuantity( return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "gpu for [%+v] must be a whole number, got: %s instead", identifier, limitEntry.Value) } + case core.Resources_EPHEMERAL_STORAGE: + err := addResourceEntryToMap(identifier, limitEntry, &requestedToQuantity) + if err != nil { + return nil, err + } default: continue } @@ -184,6 +193,8 @@ func validateTaskResources( switch resourceName { case core.Resources_CPU: fallthrough + case core.Resources_EPHEMERAL_STORAGE: + fallthrough case core.Resources_MEMORY: limitQuantity, ok := requestedResourceLimits[resourceName] if ok && limitQuantity.Value() < defaultQuantity.Value() { diff --git a/pkg/manager/impl/validation/task_validator_test.go b/pkg/manager/impl/validation/task_validator_test.go index dd2e1279bf..77c2aecd83 100644 --- a/pkg/manager/impl/validation/task_validator_test.go +++ b/pkg/manager/impl/validation/task_validator_test.go @@ -174,15 +174,17 @@ func TestValidateTaskTypeWhitelist(t *testing.T) { func TestTaskResourceSetToMap(t *testing.T) { resourceSet := runtimeInterfaces.TaskResourceSet{ - CPU: "100Mi", - GPU: "2", - Memory: "1.5Gi", + CPU: "100Mi", + GPU: "2", + Memory: "1.5Gi", + EphemeralStorage: "500Mi", } resourceSetMap := taskResourceSetToMap(resourceSet) - assert.Len(t, resourceSetMap, 3) + assert.Len(t, resourceSetMap, 4) assert.Equal(t, resourceSetMap[core.Resources_CPU].Value(), int64(104857600)) assert.Equal(t, resourceSetMap[core.Resources_GPU].Value(), int64(2)) assert.Equal(t, resourceSetMap[core.Resources_MEMORY].Value(), int64(1610612736)) + assert.Equal(t, resourceSetMap[core.Resources_EPHEMERAL_STORAGE].Value(), int64(524288000)) } func TestAddResourceEntryToMap(t *testing.T) { @@ -263,6 +265,10 @@ func TestValidateTaskResources(t *testing.T) { Name: core.Resources_GPU, Value: "2", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "500Mi", + }, } requestedTaskResourceLimits := []*core.Resources_ResourceEntry{ @@ -274,6 +280,10 @@ func TestValidateTaskResources(t *testing.T) { Name: core.Resources_GPU, Value: "2", }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "501Mi", + }, } assert.Nil(t, validateTaskResources(&core.Identifier{}, runtimeInterfaces.TaskResourceSet{}, requestedTaskResourceDefaults, requestedTaskResourceLimits)) diff --git a/pkg/runtime/interfaces/task_resource_configuration.go b/pkg/runtime/interfaces/task_resource_configuration.go index 389eff72de..8686c19ddf 100644 --- a/pkg/runtime/interfaces/task_resource_configuration.go +++ b/pkg/runtime/interfaces/task_resource_configuration.go @@ -1,10 +1,11 @@ package interfaces type TaskResourceSet struct { - CPU string `json:"cpu"` - GPU string `json:"gpu"` - Memory string `json:"memory"` - Storage string `json:"storage"` + CPU string `json:"cpu"` + GPU string `json:"gpu"` + Memory string `json:"memory"` + Storage string `json:"storage"` + EphemeralStorage string `json:"ephemeralStorage"` } // Provides default values for task resource limits and defaults.