Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Pick up latest plugins resource resolution & implement changes #333

Merged
merged 10 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions boilerplate/flyte/end2end/end2end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ then
fi

make kustomize
# launch flyte end2end
kubectl apply -f "${OUT}/deployment/test/flyte_generated.yaml"
make end2end_execute
popd
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.1
github.com/flyteorg/flyteplugins v0.6.5
github.com/flyteorg/flyteplugins v0.7.0
github.com/flyteorg/flytestdlib v0.3.36
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.1 h1:wINIuKv+0xtTD0kR2RF99C5uGYuwflhY78iw+DkiY8o=
github.com/flyteorg/flyteidl v0.21.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.6.3 h1:gLLQ0nISFSFnUXORxkw3QjRyHcX8AYGlHObxRT6CBk4=
github.com/flyteorg/flyteplugins v0.6.3/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flyteplugins v0.6.5 h1:f971kVpLEuWMbEORrLW3qq7/NhLj3jIVcewZ1/Oz9B8=
github.com/flyteorg/flyteplugins v0.6.5/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flyteplugins v0.7.0 h1:Cy7qqUhoXcLRYVW1b0wtk9+WtMGNFFXZ+O5weCzplvA=
github.com/flyteorg/flyteplugins v0.7.0/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.36 h1:XLvc7kfc9XkQBpPvNXevh5+Ijbgmd7gEOHTWhdEY5eA=
github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type TaskResourceSpec struct {
Memory resource.Quantity
EphemeralStorage resource.Quantity
Storage resource.Quantity
GPU resource.Quantity
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
}

// Defines the complete closure of compute resources a task can request and apply as limits.
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_7_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_8_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_9_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1354),
int64(1374),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/nodes/task/remote_workflow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ func Test_cacheFlyteWorkflow(t *testing.T) {
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
GPU: resource.MustParse("1"),
},
Limits: v1alpha1.TaskResourceSpec{
CPU: resource.MustParse("1"),
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
GPU: resource.MustParse("1"),
},
},
},
Expand Down
68 changes: 28 additions & 40 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"strconv"

"github.com/flyteorg/flytepropeller/pkg/utils"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/encoding"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -48,27 +50,12 @@ func (te taskExecutionID) GetGeneratedName() string {
return te.execName
}

type taskOverrides struct {
pluginCore.TaskOverrides
resourceRequirements *v1.ResourceRequirements
}

func (t taskOverrides) GetResources() *v1.ResourceRequirements {
return t.resourceRequirements
}

func newTaskOverrides(overrides pluginCore.TaskOverrides, resourceRequirements *v1.ResourceRequirements) pluginCore.TaskOverrides {
return &taskOverrides{
TaskOverrides: overrides,
resourceRequirements: resourceRequirements,
}
}

type taskExecutionMetadata struct {
handler.NodeExecutionMetadata
taskExecID taskExecutionID
o pluginCore.TaskOverrides
maxAttempts uint32
taskExecID taskExecutionID
o pluginCore.TaskOverrides
maxAttempts uint32
platformResources *v1.ResourceRequirements
}

func (t taskExecutionMetadata) GetTaskExecutionID() pluginCore.TaskExecutionID {
Expand All @@ -83,6 +70,10 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 {
return t.maxAttempts
}

func (t taskExecutionMetadata) GetPlatformResources() *v1.ResourceRequirements {
return t.platformResources
}

type taskExecutionContext struct {
handler.NodeExecutionContext
tm taskExecutionMetadata
Expand Down Expand Up @@ -182,28 +173,24 @@ func assignResource(resourceName v1.ResourceName, execConfigRequest, execConfigL
}
}

// Reconciles platform-specific resource defaults requests and max limits with the static resource values
// defined by this task and node execution context.
func determineResourceRequirements(nCtx handler.NodeExecutionContext, taskResources v1alpha1.TaskResources) *v1.ResourceRequirements {
var requests = make(v1.ResourceList)
var limits = make(v1.ResourceList)
if nCtx.Node().GetResources() != nil {
if nCtx.Node().GetResources().Requests != nil {
requests = nCtx.Node().GetResources().Requests
}
if nCtx.Node().GetResources().Limits != nil {
limits = nCtx.Node().GetResources().Limits
}
}

assignResource(v1.ResourceCPU, taskResources.Requests.CPU, taskResources.Limits.CPU, requests, limits)
assignResource(v1.ResourceMemory, taskResources.Requests.Memory, taskResources.Limits.Memory, requests, limits)
assignResource(v1.ResourceEphemeralStorage, taskResources.Requests.EphemeralStorage, taskResources.Limits.EphemeralStorage, requests, limits)
assignResource(v1.ResourceStorage, taskResources.Requests.Storage, taskResources.Limits.Storage, requests, limits)
func convertTaskResourcesToRequirements(taskResources v1alpha1.TaskResources) *v1.ResourceRequirements {
return &v1.ResourceRequirements{
Requests: requests,
Limits: limits,
Requests: v1.ResourceList{
v1.ResourceCPU: taskResources.Requests.CPU,
v1.ResourceMemory: taskResources.Requests.Memory,
v1.ResourceEphemeralStorage: taskResources.Requests.EphemeralStorage,
v1.ResourceStorage: taskResources.Requests.Storage,
utils.ResourceNvidiaGPU: taskResources.Requests.GPU,
},
Limits: v1.ResourceList{
v1.ResourceCPU: taskResources.Limits.CPU,
v1.ResourceMemory: taskResources.Limits.Memory,
v1.ResourceEphemeralStorage: taskResources.Limits.EphemeralStorage,
v1.ResourceStorage: taskResources.Limits.Storage,
utils.ResourceNvidiaGPU: taskResources.Limits.GPU,
},
}

}

func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) {
Expand Down Expand Up @@ -260,8 +247,9 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
tm: taskExecutionMetadata{
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
taskExecID: taskExecutionID{execName: uniqueID, id: id},
o: newTaskOverrides(nCtx.Node(), determineResourceRequirements(nCtx, nCtx.ExecutionContext().GetExecutionConfig().TaskResources)),
o: nCtx.Node(),
maxAttempts: maxAttempts,
platformResources: convertTaskResourcesToRequirements(nCtx.ExecutionContext().GetExecutionConfig().TaskResources),
},
rm: resourcemanager.GetTaskResourceManager(
t.resourceManager, resourceNamespacePrefix, id),
Expand Down
Loading