Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove storage as a task resource option #4658

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,23 +253,6 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
})
}

// Only assign storage when it is either requested or limited in the task definition, or a platform
// default exists.
if !taskResourceRequirements.Defaults.Storage.IsZero() ||
!taskResourceRequirements.Limits.Storage.IsZero() ||
!platformTaskResources.Defaults.Storage.IsZero() {
storageResource := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Storage, taskResourceRequirements.Limits.Storage,
platformTaskResources.Defaults.Storage, platformTaskResources.Limits.Storage)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Request.String(),
})
finalizedResourceLimits = append(finalizedResourceLimits, &core.Resources_ResourceEntry{
Name: core.Resources_STORAGE,
Value: storageResource.Limit.String(),
})
}

// Only assign gpu when it is either requested or limited in the task definition, or a platform default exists.
if !taskResourceRequirements.Defaults.GPU.IsZero() ||
!taskResourceRequirements.Limits.GPU.IsZero() ||
Expand Down
9 changes: 0 additions & 9 deletions flyteplugins/go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,14 @@ func TestLoadConfig(t *testing.T) {
}, k8sConfig.DefaultEnvVars)
assert.NotNil(t, k8sConfig.ResourceTolerations)
assert.Contains(t, k8sConfig.ResourceTolerations, v1.ResourceName("nvidia.com/gpu"))
assert.Contains(t, k8sConfig.ResourceTolerations, v1.ResourceStorage)
tolGPU := v1.Toleration{
Key: "flyte/gpu",
Value: "dedicated",
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
Value: "special",
Operator: v1.TolerationOpEqual,
Effect: v1.TaintEffectPreferNoSchedule,
}

assert.Equal(t, []v1.Toleration{tolGPU}, k8sConfig.ResourceTolerations[v1.ResourceName("nvidia.com/gpu")])
assert.Equal(t, []v1.Toleration{tolStorage}, k8sConfig.ResourceTolerations[v1.ResourceStorage])
expectedCPU := resource.MustParse("1000m")
assert.True(t, expectedCPU.Equal(k8sConfig.DefaultCPURequest))
expectedMemory := resource.MustParse("1024Mi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements

// TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits.
// https://github.com/kubernetes/enhancements/issues/362
delete(resources.Requests, v1.ResourceStorage)
delete(resources.Limits, v1.ResourceStorage)

gpuResourceName := config.GetK8sPluginConfig().GpuResourceName
shouldAdjustGPU := false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ func TestApplyResourceOverrides_RemoveStorage(t *testing.T) {
requestedResourceQuantity := resource.MustParse("1")
overrides := ApplyResourceOverrides(v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceCPU: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand Down Expand Up @@ -261,7 +259,6 @@ func TestMergeResources_EmptyIn(t *testing.T) {
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand All @@ -280,7 +277,6 @@ func TestMergeResources_EmptyOut(t *testing.T) {
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Limits: v1.ResourceList{
v1.ResourceStorage: requestedResourceQuantity,
v1.ResourceMemory: requestedResourceQuantity,
v1.ResourceEphemeralStorage: requestedResourceQuantity,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestGetTolerationsForResources(t *testing.T) {
Effect: v12.TaintEffectNoSchedule,
}

tolStorage := v12.Toleration{
Key: "storage",
tolEphemeralStorage := v12.Toleration{
Key: "ephemeral-storage",
Value: "dedicated",
Operator: v12.TolerationOpExists,
Effect: v12.TaintEffectNoSchedule,
Expand All @@ -55,8 +55,8 @@ func TestGetTolerationsForResources(t *testing.T) {
args{
v12.ResourceRequirements{
Limits: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
Expand All @@ -69,8 +69,8 @@ func TestGetTolerationsForResources(t *testing.T) {
args{
v12.ResourceRequirements{
Requests: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
Expand All @@ -83,12 +83,12 @@ func TestGetTolerationsForResources(t *testing.T) {
args{
v12.ResourceRequirements{
Limits: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
Requests: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
Expand All @@ -101,84 +101,84 @@ func TestGetTolerationsForResources(t *testing.T) {
args{
v12.ResourceRequirements{
Limits: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
map[v12.ResourceName][]v12.Toleration{
v12.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
v12.ResourceEphemeralStorage: {tolEphemeralStorage},
ResourceNvidiaGPU: {tolGPU},
},
nil,
[]v12.Toleration{tolStorage},
[]v12.Toleration{tolEphemeralStorage},
},
{
"tolerations-req",
args{
v12.ResourceRequirements{
Requests: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
map[v12.ResourceName][]v12.Toleration{
v12.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
v12.ResourceEphemeralStorage: {tolEphemeralStorage},
ResourceNvidiaGPU: {tolGPU},
},
nil,
[]v12.Toleration{tolStorage},
[]v12.Toleration{tolEphemeralStorage},
},
{
"tolerations-both",
args{
v12.ResourceRequirements{
Limits: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
Requests: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
map[v12.ResourceName][]v12.Toleration{
v12.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
v12.ResourceEphemeralStorage: {tolEphemeralStorage},
ResourceNvidiaGPU: {tolGPU},
},
nil,
[]v12.Toleration{tolStorage},
[]v12.Toleration{tolEphemeralStorage},
},
{
"no-tolerations-both",
args{
v12.ResourceRequirements{
Limits: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v12.ResourceList{
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceStorage: resource.MustParse("100M"),
v12.ResourceCPU: resource.MustParse("1024m"),
v12.ResourceEphemeralStorage: resource.MustParse("100M"),
},
},
},
map[v12.ResourceName][]v12.Toleration{
v12.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
v12.ResourceEphemeralStorage: {tolEphemeralStorage},
ResourceNvidiaGPU: {tolGPU},
},
nil,
[]v12.Toleration{tolStorage, tolGPU},
[]v12.Toleration{tolEphemeralStorage, tolGPU},
},
{
"default-tolerations",
args{},
nil,
[]v12.Toleration{tolStorage},
[]v12.Toleration{tolStorage},
[]v12.Toleration{tolEphemeralStorage},
[]v12.Toleration{tolEphemeralStorage},
},
}
for _, tt := range tests {
Expand Down
56 changes: 28 additions & 28 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,12 @@ func TestApplyGPUNodeSelectors(t *testing.T) {
func updatePod(t *testing.T) {
taskExecutionMetadata := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}, nil)

Expand Down Expand Up @@ -809,13 +809,13 @@ func toK8sPodInterruptible(t *testing.T) {

x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}, nil)

Expand Down Expand Up @@ -853,17 +853,17 @@ func TestToK8sPod(t *testing.T) {
Effect: v1.TaintEffectNoSchedule,
}

tolStorage := v1.Toleration{
Key: "storage",
tolEphemeralStorage := v1.Toleration{
Key: "ephemeral-storage",
Value: "dedicated",
Operator: v1.TolerationOpExists,
Effect: v1.TaintEffectNoSchedule,
}

assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
ResourceTolerations: map[v1.ResourceName][]v1.Toleration{
v1.ResourceStorage: {tolStorage},
ResourceNvidiaGPU: {tolGPU},
v1.ResourceEphemeralStorage: {tolEphemeralStorage},
ResourceNvidiaGPU: {tolGPU},
},
DefaultCPURequest: resource.MustParse("1024m"),
DefaultMemoryRequest: resource.MustParse("1024Mi"),
Expand All @@ -876,48 +876,48 @@ func TestToK8sPod(t *testing.T) {
t.Run("WithGPU", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}, nil)

p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Equal(t, len(p.Tolerations), 1)
assert.Equal(t, len(p.Tolerations), 2)
})

t.Run("NoGPU", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}, nil)

p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Equal(t, len(p.Tolerations), 0)
assert.Equal(t, len(p.Tolerations), 1)
assert.Equal(t, "some-acceptable-name", p.Containers[0].Name)
})

t.Run("Default toleration, selector, scheduler", func(t *testing.T) {
x := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceEphemeralStorage: resource.MustParse("100M"),
},
}, nil)

Expand Down
4 changes: 0 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceLi
if !v.IsZero() {
k8sResources[v1.ResourceMemory] = v
}
case core.Resources_STORAGE:
if !v.IsZero() {
k8sResources[v1.ResourceStorage] = v
}
case core.Resources_GPU:
if !v.IsZero() {
k8sResources[ResourceNvidiaGPU] = v
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestToK8sResourceList(t *testing.T) {
{Name: core.Resources_CPU, Value: "250m"},
{Name: core.Resources_GPU, Value: "1"},
{Name: core.Resources_MEMORY, Value: "1024Mi"},
{Name: core.Resources_STORAGE, Value: "1024Mi"},
{Name: core.Resources_EPHEMERAL_STORAGE, Value: "1024Mi"},
})

Expand All @@ -43,7 +42,6 @@ func TestToK8sResourceList(t *testing.T) {
assert.Equal(t, resource.MustParse("250m"), r[v1.ResourceCPU])
assert.Equal(t, resource.MustParse("1"), r[ResourceNvidiaGPU])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceMemory])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceStorage])
assert.Equal(t, resource.MustParse("1024Mi"), r[v1.ResourceEphemeralStorage])
}
{
Expand Down
Loading
Loading