Skip to content

Commit

Permalink
Pick up latest plugins resource resolution & implement changes (flyte…
Browse files Browse the repository at this point in the history
…org#333)

Signed-off-by: Tim Bauer <[email protected]>
  • Loading branch information
Katrina Rogan authored and bimtauer committed Oct 20, 2021
1 parent a1f23e6 commit 5bbb092
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 131 deletions.
2 changes: 2 additions & 0 deletions boilerplate/flyte/end2end/end2end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ then
fi

make kustomize
# launch flyte end2end
kubectl apply -f "${OUT}/deployment/test/flyte_generated.yaml"
make end2end_execute
popd
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.1
github.com/flyteorg/flyteplugins v0.6.5
github.com/flyteorg/flyteplugins v0.7.0
github.com/flyteorg/flytestdlib v0.3.36
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,8 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.1 h1:wINIuKv+0xtTD0kR2RF99C5uGYuwflhY78iw+DkiY8o=
github.com/flyteorg/flyteidl v0.21.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.6.3 h1:gLLQ0nISFSFnUXORxkw3QjRyHcX8AYGlHObxRT6CBk4=
github.com/flyteorg/flyteplugins v0.6.3/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flyteplugins v0.6.5 h1:f971kVpLEuWMbEORrLW3qq7/NhLj3jIVcewZ1/Oz9B8=
github.com/flyteorg/flyteplugins v0.6.5/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flyteplugins v0.7.0 h1:Cy7qqUhoXcLRYVW1b0wtk9+WtMGNFFXZ+O5weCzplvA=
github.com/flyteorg/flyteplugins v0.7.0/go.mod h1:bEMSalcpNKAu0iFy/fTP2HJOmsCi82LcMnsB8iBeJ/U=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.36 h1:XLvc7kfc9XkQBpPvNXevh5+Ijbgmd7gEOHTWhdEY5eA=
github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type TaskResourceSpec struct {
Memory resource.Quantity
EphemeralStorage resource.Quantity
Storage resource.Quantity
GPU resource.Quantity
}

// Defines the complete closure of compute resources a task can request and apply as limits.
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_10_simple.json
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_7_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_8_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compiler/test/testdata/branch/k8s/success_9_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -552,13 +552,15 @@
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
"Storage": "0",
"GPU": "0"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1354),
int64(1374),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/nodes/task/remote_workflow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ func Test_cacheFlyteWorkflow(t *testing.T) {
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
GPU: resource.MustParse("1"),
},
Limits: v1alpha1.TaskResourceSpec{
CPU: resource.MustParse("1"),
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
GPU: resource.MustParse("1"),
},
},
},
Expand Down
68 changes: 28 additions & 40 deletions pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"strconv"

"github.com/flyteorg/flytepropeller/pkg/utils"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/encoding"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -48,27 +50,12 @@ func (te taskExecutionID) GetGeneratedName() string {
return te.execName
}

type taskOverrides struct {
pluginCore.TaskOverrides
resourceRequirements *v1.ResourceRequirements
}

func (t taskOverrides) GetResources() *v1.ResourceRequirements {
return t.resourceRequirements
}

func newTaskOverrides(overrides pluginCore.TaskOverrides, resourceRequirements *v1.ResourceRequirements) pluginCore.TaskOverrides {
return &taskOverrides{
TaskOverrides: overrides,
resourceRequirements: resourceRequirements,
}
}

type taskExecutionMetadata struct {
handler.NodeExecutionMetadata
taskExecID taskExecutionID
o pluginCore.TaskOverrides
maxAttempts uint32
taskExecID taskExecutionID
o pluginCore.TaskOverrides
maxAttempts uint32
platformResources *v1.ResourceRequirements
}

func (t taskExecutionMetadata) GetTaskExecutionID() pluginCore.TaskExecutionID {
Expand All @@ -83,6 +70,10 @@ func (t taskExecutionMetadata) GetMaxAttempts() uint32 {
return t.maxAttempts
}

func (t taskExecutionMetadata) GetPlatformResources() *v1.ResourceRequirements {
return t.platformResources
}

type taskExecutionContext struct {
handler.NodeExecutionContext
tm taskExecutionMetadata
Expand Down Expand Up @@ -182,28 +173,24 @@ func assignResource(resourceName v1.ResourceName, execConfigRequest, execConfigL
}
}

// Reconciles platform-specific resource defaults requests and max limits with the static resource values
// defined by this task and node execution context.
func determineResourceRequirements(nCtx handler.NodeExecutionContext, taskResources v1alpha1.TaskResources) *v1.ResourceRequirements {
var requests = make(v1.ResourceList)
var limits = make(v1.ResourceList)
if nCtx.Node().GetResources() != nil {
if nCtx.Node().GetResources().Requests != nil {
requests = nCtx.Node().GetResources().Requests
}
if nCtx.Node().GetResources().Limits != nil {
limits = nCtx.Node().GetResources().Limits
}
}

assignResource(v1.ResourceCPU, taskResources.Requests.CPU, taskResources.Limits.CPU, requests, limits)
assignResource(v1.ResourceMemory, taskResources.Requests.Memory, taskResources.Limits.Memory, requests, limits)
assignResource(v1.ResourceEphemeralStorage, taskResources.Requests.EphemeralStorage, taskResources.Limits.EphemeralStorage, requests, limits)
assignResource(v1.ResourceStorage, taskResources.Requests.Storage, taskResources.Limits.Storage, requests, limits)
func convertTaskResourcesToRequirements(taskResources v1alpha1.TaskResources) *v1.ResourceRequirements {
return &v1.ResourceRequirements{
Requests: requests,
Limits: limits,
Requests: v1.ResourceList{
v1.ResourceCPU: taskResources.Requests.CPU,
v1.ResourceMemory: taskResources.Requests.Memory,
v1.ResourceEphemeralStorage: taskResources.Requests.EphemeralStorage,
v1.ResourceStorage: taskResources.Requests.Storage,
utils.ResourceNvidiaGPU: taskResources.Requests.GPU,
},
Limits: v1.ResourceList{
v1.ResourceCPU: taskResources.Limits.CPU,
v1.ResourceMemory: taskResources.Limits.Memory,
v1.ResourceEphemeralStorage: taskResources.Limits.EphemeralStorage,
v1.ResourceStorage: taskResources.Limits.Storage,
utils.ResourceNvidiaGPU: taskResources.Limits.GPU,
},
}

}

func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) {
Expand Down Expand Up @@ -260,8 +247,9 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
tm: taskExecutionMetadata{
NodeExecutionMetadata: nCtx.NodeExecutionMetadata(),
taskExecID: taskExecutionID{execName: uniqueID, id: id},
o: newTaskOverrides(nCtx.Node(), determineResourceRequirements(nCtx, nCtx.ExecutionContext().GetExecutionConfig().TaskResources)),
o: nCtx.Node(),
maxAttempts: maxAttempts,
platformResources: convertTaskResourcesToRequirements(nCtx.ExecutionContext().GetExecutionConfig().TaskResources),
},
rm: resourcemanager.GetTaskResourceManager(
t.resourceManager, resourceNamespacePrefix, id),
Expand Down
Loading

0 comments on commit 5bbb092

Please sign in to comment.