Skip to content

Commit

Permalink
Cluster pool based execution assignment (flyteorg#481)
Browse files Browse the repository at this point in the history
* cluster pool execution assignment

Signed-off-by: Iaroslav Ciupin <[email protected]>

* update flyteidl

Signed-off-by: Iaroslav Ciupin <[email protected]>

* update flyteidl

Signed-off-by: Iaroslav Ciupin <[email protected]>

Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin authored Sep 30, 2022
1 parent defa10b commit 6911752
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 54 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.1.15
github.com/flyteorg/flyteidl v1.1.19
github.com/flyteorg/flyteplugins v1.0.10
github.com/flyteorg/flytepropeller v1.1.28
github.com/flyteorg/flytestdlib v1.0.5
Expand Down
6 changes: 2 additions & 4 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.13 h1:xRUOu9+6c/zTZRTv+He1s4kX7uxmd/K5y7tAP598f8A=
github.com/flyteorg/flyteidl v1.1.13/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteidl v1.1.15 h1:h+T8yeya5OEt7POav0wZkjPdtUatilraVTuwrioqzuA=
github.com/flyteorg/flyteidl v1.1.15/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteidl v1.1.19 h1:1CtSbuFhFHwUbKdv66PqbcER01iacAJU+snh0eTsXc4=
github.com/flyteorg/flyteidl v1.1.19/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
github.com/flyteorg/flyteplugins v1.0.10/go.mod h1:GfbmRByI/rSatm/Epoj3bNyrXwIQ9NOXTVwLS6Z0p84=
github.com/flyteorg/flytepropeller v1.1.28 h1:68qQ0QRHoCzagF0oifkW/c4A1L4B4LdgyHCPLKMiY2g=
Expand Down
35 changes: 3 additions & 32 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,7 @@ func TestCreateExecution(t *testing.T) {

principal := "principal"
rawOutput := "raw_output"
clusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "foo",
Value: []string{"bar"},
Operator: admin.Selector_NOT_EQUALS,
},
},
},
}
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
Expand Down Expand Up @@ -4679,17 +4669,7 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
}

func TestGetClusterAssignment(t *testing.T) {
clusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "foo",
Value: []string{"bar"},
Operator: admin.Selector_EQUALS,
},
},
},
}
clusterAssignment := admin.ClusterAssignment{ClusterPoolName: "gpu"}
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
Expand Down Expand Up @@ -4720,16 +4700,7 @@ func TestGetClusterAssignment(t *testing.T) {
assert.True(t, proto.Equal(ca, &clusterAssignment))
})
t.Run("value from request", func(t *testing.T) {
reqClusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "baz",
Operator: admin.Selector_IN,
},
},
},
}
reqClusterAssignment := admin.ClusterAssignment{ClusterPoolName: "swimming-pool"}
ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ var attributesApplicationConfigProvider = testutils.GetApplicationConfigWithDefa
func TestValidateMatchingAttributes(t *testing.T) {
testCases := []struct {
attributes *admin.MatchingAttributes
identifier string
expectedMatchableResource admin.MatchableResource
expectedErr error
}{
{
nil,
"foo",
defaultMatchableResource,
errors.NewFlyteAdminErrorf(codes.InvalidArgument, "missing matching_attributes"),
},
Expand All @@ -38,7 +36,6 @@ func TestValidateMatchingAttributes(t *testing.T) {
},
},
},
"foo",
admin.MatchableResource_TASK_RESOURCE,
nil,
},
Expand All @@ -52,7 +49,6 @@ func TestValidateMatchingAttributes(t *testing.T) {
},
},
},
"foo",
admin.MatchableResource_CLUSTER_RESOURCE,
nil,
},
Expand All @@ -64,7 +60,6 @@ func TestValidateMatchingAttributes(t *testing.T) {
},
},
},
"foo",
admin.MatchableResource_EXECUTION_QUEUE,
nil,
},
Expand All @@ -81,7 +76,6 @@ func TestValidateMatchingAttributes(t *testing.T) {
},
},
},
"foo",
admin.MatchableResource_PLUGIN_OVERRIDE,
nil,
},
Expand All @@ -93,32 +87,23 @@ func TestValidateMatchingAttributes(t *testing.T) {
},
},
},
"foo",
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
nil,
},
{
&admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ClusterAssignment{
ClusterAssignment: &admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "bar",
Operator: admin.Selector_EXISTS,
},
},
},
ClusterPoolName: "gpu",
},
},
},
"foo",
admin.MatchableResource_CLUSTER_ASSIGNMENT,
nil,
},
}
for _, tc := range testCases {
matchableResource, err := validateMatchingAttributes(tc.attributes, tc.identifier)
matchableResource, err := validateMatchingAttributes(tc.attributes, "foo")
assert.Equal(t, tc.expectedMatchableResource, matchableResource)
assert.EqualValues(t, tc.expectedErr, err)
}
Expand Down

0 comments on commit 6911752

Please sign in to comment.