Skip to content

Commit

Permalink
Support plugin override matchable resource (flyteorg#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Sep 24, 2020
1 parent 8aac329 commit c5425dd
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.18.6
github.com/lyft/flytepropeller v0.3.16
github.com/lyft/flytepropeller v0.3.17
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,11 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz
github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.3 h1:+O0rDCXoiui5X56DtoqquW0rqjN75jDWqAEyvcqmarI=
github.com/lyft/flyteidl v0.18.3/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.6 h1:HGbxHI8avEDvoPqcO2+/BoJVcP9sjOj4qwJ/wNRWuoA=
github.com/lyft/flyteidl v0.18.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteplugins v0.5.1/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytepropeller v0.3.7 h1:l2AguhyhiUDCvqjHYF8XJw46gPW9j4XNZwJEAJdiEtI=
github.com/lyft/flytepropeller v0.3.7/go.mod h1:8sNP7ZnEngNRYBMewmH4PtiRR0pus8RkjNoPqelyKX8=
github.com/lyft/flytepropeller v0.3.16 h1:a6KbvtDRMMVEUlVTqQ9h9IOehUerk3dT+pvsN5Ql/4o=
github.com/lyft/flytepropeller v0.3.16/go.mod h1:GArCzcLAZ48OacGUsHUA3f028ixoU8CVZOMikyjEdNY=
github.com/lyft/flytepropeller v0.3.17 h1:a2PVqWjnn8oNEeayAqNizMAtEixl/F3S4vd8z4kbiqI=
github.com/lyft/flytepropeller v0.3.17/go.mod h1:T8Utxqv7B5USAX9c/Qh0lBbKXHFSgOwwaISOd9h36P4=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
Expand Down
21 changes: 21 additions & 0 deletions pkg/common/testutils/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package testutils

import "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

// Convenience method to wrap verbose boilerplate for initializing a PluginOverrides MatchingAttributes.
func GetPluginOverridesAttributes(vals map[string][]string) *admin.MatchingAttributes {
overrides := make([]*admin.PluginOverride, 0, len(vals))
for taskType, pluginIDs := range vals {
overrides = append(overrides, &admin.PluginOverride{
TaskType: taskType,
PluginId: pluginIDs,
})
}
return &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_PluginOverrides{
PluginOverrides: &admin.PluginOverrides{
Overrides: overrides,
},
},
}
}
26 changes: 26 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ func (m *ExecutionManager) addLabelsAndAnnotations(requestSpec *admin.ExecutionS
return nil
}

func (m *ExecutionManager) addPluginOverrides(ctx context.Context, executionID *core.WorkflowExecutionIdentifier,
workflowName, launchPlanName string, partiallyPopulatedInputs *workflowengineInterfaces.ExecuteWorkflowInput) error {
override, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: executionID.Project,
Domain: executionID.Domain,
Workflow: workflowName,
LaunchPlan: launchPlanName,
ResourceType: admin.MatchableResource_PLUGIN_OVERRIDE,
})
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
if !ok || ec.Code() != codes.NotFound {
return err
}
}
if override != nil && override.Attributes != nil && override.Attributes.GetPluginOverrides() != nil {
partiallyPopulatedInputs.TaskPluginOverrides = override.Attributes.GetPluginOverrides().Overrides
}
return nil
}

func (m *ExecutionManager) offloadInputs(ctx context.Context, literalMap *core.LiteralMap, identifier *core.WorkflowExecutionIdentifier, key string) (storage.DataReference, error) {
if literalMap == nil {
literalMap = &core.LiteralMap{}
Expand Down Expand Up @@ -611,6 +632,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
if err != nil {
return nil, nil, err
}
err = m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name,
&executeWorkflowInputs)
if err != nil {
return nil, nil, err
}

execInfo, err := m.workflowExecutor.ExecuteWorkflow(ctx, executeWorkflowInputs)
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/protobuf/proto"
notificationMocks "github.com/lyft/flyteadmin/pkg/async/notifications/mocks"
commonTestUtils "github.com/lyft/flyteadmin/pkg/common/testutils"
dataMocks "github.com/lyft/flyteadmin/pkg/data/mocks"
flyteAdminErrors "github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/manager/impl/executions"
Expand Down Expand Up @@ -2104,6 +2105,80 @@ func TestAddLabelsAndAnnotationsRuntimeLimitsObserved(t *testing.T) {
assert.EqualError(t, err, "Labels has too many entries [2 > 1]")
}

func TestAddPluginOverrides(t *testing.T) {
executionID := &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: "unused",
}
workflowName := "workflow_name"
launchPlanName := "launch_plan_name"

db := repositoryMocks.NewMockRepository()
db.ResourceRepo().(*repositoryMocks.MockResourceRepo).GetFunction = func(ctx context.Context, ID interfaces.ResourceID) (
models.Resource, error) {
assert.Equal(t, project, ID.Project)
assert.Equal(t, domain, ID.Domain)
assert.Equal(t, workflowName, ID.Workflow)
assert.Equal(t, launchPlanName, ID.LaunchPlan)
existingAttributes := commonTestUtils.GetPluginOverridesAttributes(map[string][]string{
"python": {"plugin a"},
"hive": {"plugin b"},
})
bytes, err := proto.Marshal(existingAttributes)
if err != nil {
t.Fatal(err)
}
return models.Resource{
Project: project,
Domain: domain,
Attributes: bytes,
}, nil
}
partiallyPopulatedInputs := workflowengineInterfaces.ExecuteWorkflowInput{}

execManager := NewExecutionManager(
db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)

err := execManager.(*ExecutionManager).addPluginOverrides(
context.Background(), executionID, workflowName, launchPlanName, &partiallyPopulatedInputs)
assert.NoError(t, err)
assert.Len(t, partiallyPopulatedInputs.TaskPluginOverrides, 2)
for _, override := range partiallyPopulatedInputs.TaskPluginOverrides {
if override.TaskType == "python" {
assert.EqualValues(t, []string{"plugin a"}, override.PluginId)
} else if override.TaskType == "hive" {
assert.EqualValues(t, []string{"plugin b"}, override.PluginId)
} else {
t.Errorf("Unexpected task type [%s] plugin override committed to db", override.TaskType)
}
}
}

func TestPluginOverrides_ResourceGetFailure(t *testing.T) {
executionID := &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: "unused",
}
workflowName := "workflow_name"
launchPlanName := "launch_plan_name"

db := repositoryMocks.NewMockRepository()
db.ResourceRepo().(*repositoryMocks.MockResourceRepo).GetFunction = func(ctx context.Context, ID interfaces.ResourceID) (
models.Resource, error) {
return models.Resource{}, flyteAdminErrors.NewFlyteAdminErrorf(codes.Aborted, "uh oh")
}
execManager := NewExecutionManager(
db, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)

err := execManager.(*ExecutionManager).addPluginOverrides(
context.Background(), executionID, workflowName, launchPlanName, &workflowengineInterfaces.ExecuteWorkflowInput{})
assert.Error(t, err, "uh oh")
}

func TestGetExecution_Legacy(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC)
Expand Down
78 changes: 78 additions & 0 deletions pkg/manager/impl/resources/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package resources
import (
"context"

"github.com/lyft/flyteadmin/pkg/repositories/models"

"github.com/gogo/protobuf/proto"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flytestdlib/contextutils"
Expand Down Expand Up @@ -54,6 +56,41 @@ func (m *ResourceManager) GetResource(ctx context.Context, request interfaces.Re
}, nil
}

func (m *ResourceManager) createOrMergeUpdateWorkflowAttributes(
ctx context.Context, request admin.WorkflowAttributesUpdateRequest, model models.Resource,
resourceType admin.MatchableResource) (*admin.WorkflowAttributesUpdateResponse, error) {
resourceID := repo_interface.ResourceID{
Project: model.Project,
Domain: model.Domain,
Workflow: model.Workflow,
LaunchPlan: model.LaunchPlan,
ResourceType: model.ResourceType,
}
existing, err := m.db.ResourceRepo().GetRaw(ctx, resourceID)
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
if ok && ec.Code() == codes.NotFound {
// Proceed with the default CreateOrUpdate call since there's no existing model to update.
err = m.db.ResourceRepo().CreateOrUpdate(ctx, model)
if err != nil {
return nil, err
}
return &admin.WorkflowAttributesUpdateResponse{}, nil
}
return nil, err
}
updatedModel, err := transformers.MergeUpdateWorkflowAttributes(
ctx, existing, resourceType, &resourceID, request.Attributes)
if err != nil {
return nil, err
}
err = m.db.ResourceRepo().CreateOrUpdate(ctx, updatedModel)
if err != nil {
return nil, err
}
return &admin.WorkflowAttributesUpdateResponse{}, nil
}

func (m *ResourceManager) UpdateWorkflowAttributes(
ctx context.Context, request admin.WorkflowAttributesUpdateRequest) (
*admin.WorkflowAttributesUpdateResponse, error) {
Expand All @@ -67,6 +104,9 @@ func (m *ResourceManager) UpdateWorkflowAttributes(
if err != nil {
return nil, err
}
if request.Attributes.GetMatchingAttributes().GetPluginOverrides() != nil {
return m.createOrMergeUpdateWorkflowAttributes(ctx, request, model, admin.MatchableResource_PLUGIN_OVERRIDE)
}
err = m.db.ResourceRepo().CreateOrUpdate(ctx, model)
if err != nil {
return nil, err
Expand Down Expand Up @@ -109,6 +149,41 @@ func (m *ResourceManager) DeleteWorkflowAttributes(ctx context.Context,
return &admin.WorkflowAttributesDeleteResponse{}, nil
}

func (m *ResourceManager) createOrMergeUpdateProjectDomainAttributes(
ctx context.Context, request admin.ProjectDomainAttributesUpdateRequest, model models.Resource,
resourceType admin.MatchableResource) (*admin.ProjectDomainAttributesUpdateResponse, error) {
resourceID := repo_interface.ResourceID{
Project: model.Project,
Domain: model.Domain,
Workflow: model.Workflow,
LaunchPlan: model.LaunchPlan,
ResourceType: model.ResourceType,
}
existing, err := m.db.ResourceRepo().GetRaw(ctx, resourceID)
if err != nil {
ec, ok := err.(errors.FlyteAdminError)
if ok && ec.Code() == codes.NotFound {
// Proceed with the default CreateOrUpdate call since there's no existing model to update.
err = m.db.ResourceRepo().CreateOrUpdate(ctx, model)
if err != nil {
return nil, err
}
return &admin.ProjectDomainAttributesUpdateResponse{}, nil
}
return nil, err
}
updatedModel, err := transformers.MergeUpdateProjectDomainAttributes(
ctx, existing, resourceType, &resourceID, request.Attributes)
if err != nil {
return nil, err
}
err = m.db.ResourceRepo().CreateOrUpdate(ctx, updatedModel)
if err != nil {
return nil, err
}
return &admin.ProjectDomainAttributesUpdateResponse{}, nil
}

func (m *ResourceManager) UpdateProjectDomainAttributes(
ctx context.Context, request admin.ProjectDomainAttributesUpdateRequest) (
*admin.ProjectDomainAttributesUpdateResponse, error) {
Expand All @@ -123,6 +198,9 @@ func (m *ResourceManager) UpdateProjectDomainAttributes(
if err != nil {
return nil, err
}
if request.Attributes.GetMatchingAttributes().GetPluginOverrides() != nil {
return m.createOrMergeUpdateProjectDomainAttributes(ctx, request, model, admin.MatchableResource_PLUGIN_OVERRIDE)
}
err = m.db.ResourceRepo().CreateOrUpdate(ctx, model)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit c5425dd

Please sign in to comment.