Skip to content
This repository has been archived by the owner on Dec 20, 2023. It is now read-only.

Commit

Permalink
Retrieve launch plan interfaces from admin (#103)
Browse files Browse the repository at this point in the history
# TL;DR
When Propeller comes across a launch plan node in a `DynamicJobSpec` it will hit Admin to retrieve the interface for the LP to do the interface check.

## Type
 - [ ] Bug Fix
 - [x] Feature
 - [ ] Plugin

## Are all requirements met?

 - [x] Code completed
 - [x] Smoke tested
 - [x] Unit tests added
 - [x] Code documentation added
 - [x] Any pending items have an associated Issue

## Complete description
Please see the issue linked below and also the SDK PR flyteorg/flytekit#92 for more information.  A sample dynamic job spec object has been uploaded here as well.  Please see the text file for the type of dynamic job spec this PR is meant to support.
[dynamic_job_spec.txt](https://github.com/lyft/flytepropeller/files/4430252/dynamic_job_spec.txt)

* Added a `GetLaunchPlan` function to a `launchplan/Reader` interface which sits alongside the `launchplan/Executor` interface.  Admin client wrapper now satisfies both interfaces.
* Added a call to that function in the dynamic job handler `buildContextualDynamicWorkflow` function.

## Tracking Issue
flyteorg/flyte#139

## Follow-up issue
flyteorg/flyte#246
This PR will be deprecated upon completion of this issue.
  • Loading branch information
wild-endeavor authored Apr 8, 2020
1 parent 61ee3c8 commit 41a756f
Show file tree
Hide file tree
Showing 18 changed files with 647 additions and 83 deletions.
18 changes: 10 additions & 8 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ go 1.13

require (
cloud.google.com/go v0.54.0 // indirect
github.com/Azure/azure-sdk-for-go v40.2.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go v40.3.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.10.0 // indirect
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/aws/aws-sdk-go v1.29.20 // indirect
github.com/aws/aws-sdk-go v1.29.23 // indirect
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/fatih/color v1.9.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.4
github.com/golang/protobuf v1.3.5
github.com/google/uuid v1.1.1
github.com/graymeta/stow v0.2.5 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.24
github.com/lyft/flyteplugins v0.3.20
Expand All @@ -34,19 +36,19 @@ require (
github.com/spf13/cobra v0.0.6
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 // indirect
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
google.golang.org/genproto v0.0.0-20200309141739-5b75447e413d // indirect
google.golang.org/grpc v1.27.1
google.golang.org/genproto v0.0.0-20200312145019-da6875a35672 // indirect
google.golang.org/grpc v1.28.0
gopkg.in/ini.v1 v1.54.0 // indirect
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/client-go v11.0.0+incompatible
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab // indirect
sigs.k8s.io/controller-runtime v0.5.0
sigs.k8s.io/controller-runtime v0.5.1
)

// Pin the version of client-go to something that's compatible with katrogan's fork of api and apimachinery
Expand Down
55 changes: 26 additions & 29 deletions flytepropeller/go.sum

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions flytepropeller/pkg/compiler/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package compiler

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
)

// This object is meant to satisfy github.com/lyft/flytepropeller/pkg/compiler/common.InterfaceProvider
// This file is pretty much copied from Admin, (sorry for the link, a real link made go mod import admin)
// github-dot-com/lyft/flyteadmin/blob/1acce744b8c7839ab77a0eb1ed922905af15baa5/pkg/workflowengine/impl/interface_provider.go
// but that implementation relies on the internal Admin Gorm model. We should consider deprecating that one in favor
// of this one as Admin already has a dependency on the Propeller compiler.
type LaunchPlanInterfaceProvider struct {
expectedInputs core.ParameterMap
expectedOutputs core.VariableMap
identifier *core.Identifier
}

func (p *LaunchPlanInterfaceProvider) GetID() *core.Identifier {
return p.identifier
}
func (p *LaunchPlanInterfaceProvider) GetExpectedInputs() *core.ParameterMap {
return &p.expectedInputs

}
func (p *LaunchPlanInterfaceProvider) GetExpectedOutputs() *core.VariableMap {
return &p.expectedOutputs
}

func NewLaunchPlanInterfaceProvider(launchPlan admin.LaunchPlan) *LaunchPlanInterfaceProvider {
return &LaunchPlanInterfaceProvider{
expectedInputs: *launchPlan.Closure.ExpectedInputs,
expectedOutputs: *launchPlan.Closure.ExpectedOutputs,
identifier: launchPlan.Id,
}
}
72 changes: 72 additions & 0 deletions flytepropeller/pkg/compiler/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package compiler

import (
"testing"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/utils"
"github.com/stretchr/testify/assert"
)

var launchPlanIdentifier = core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Project: "project",
Domain: "domain",
Name: "name",
Version: "version",
}

var inputs = core.ParameterMap{
Parameters: map[string]*core.Parameter{
"foo": {
Var: &core.Variable{
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
Behavior: &core.Parameter_Default{
Default: utils.MustMakeLiteral("foo-value"),
},
},
},
}
var outputs = core.VariableMap{
Variables: map[string]*core.Variable{
"foo": {
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
}

func getDummyLaunchPlan() admin.LaunchPlan {
launchPlanClosure := admin.LaunchPlanClosure{
ExpectedInputs: &inputs,
ExpectedOutputs: &outputs,
}
return admin.LaunchPlan{
Id: &launchPlanIdentifier,
Spec: nil,
Closure: &launchPlanClosure,
}
}

func TestGetId(t *testing.T) {
launchPlan := getDummyLaunchPlan()
provider := NewLaunchPlanInterfaceProvider(launchPlan)
assert.Equal(t, &core.Identifier{ResourceType: 3, Project: "project", Domain: "domain", Name: "name", Version: "version"}, provider.GetID())
}

func TestGetExpectedInputs(t *testing.T) {
launchPlan := getDummyLaunchPlan()
provider := NewLaunchPlanInterfaceProvider(launchPlan)
assert.Contains(t, (*provider.GetExpectedInputs()).Parameters, "foo")
assert.NotNil(t, (*provider.GetExpectedInputs()).Parameters["foo"].Var.Type.GetSimple())
assert.EqualValues(t, "STRING", (*provider.GetExpectedInputs()).Parameters["foo"].Var.Type.GetSimple().String())
assert.NotNil(t, (*provider.GetExpectedInputs()).Parameters["foo"].GetDefault())
}

func TestGetExpectedOutputs(t *testing.T) {
launchPlan := getDummyLaunchPlan()
provider := NewLaunchPlanInterfaceProvider(launchPlan)
assert.EqualValues(t, outputs.Variables["foo"].GetType().GetType(),
provider.GetExpectedOutputs().Variables["foo"].GetType().GetType())
}
6 changes: 3 additions & 3 deletions flytepropeller/pkg/compiler/workflow_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.Work

compiledWf := &core.CompiledWorkflow{Template: wf}

gb := newWorfklowBuilder(compiledWf, wfIndex, c.NewTaskIndex(taskBuilders...), toInterfaceProviderMap(launchPlans))
gb := newWorkflowBuilder(compiledWf, wfIndex, c.NewTaskIndex(taskBuilders...), toInterfaceProviderMap(launchPlans))
// Terminate early if there are some required component not present.
if !gb.validateAllRequirements(errs.NewScope()) {
return nil, errs
Expand All @@ -312,10 +312,10 @@ func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.Work
}

func (w workflowBuilder) newWorkflowBuilder(fg *flyteWorkflow) workflowBuilder {
return newWorfklowBuilder(fg, w.allSubWorkflows, w.allTasks, w.allLaunchPlans)
return newWorkflowBuilder(fg, w.allSubWorkflows, w.allTasks, w.allLaunchPlans)
}

func newWorfklowBuilder(fg *flyteWorkflow, wfIndex c.WorkflowIndex, tasks c.TaskIndex,
func newWorkflowBuilder(fg *flyteWorkflow, wfIndex c.WorkflowIndex, tasks c.TaskIndex,
workflows map[string]c.InterfaceProvider) workflowBuilder {

return workflowBuilder{
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/compiler/workflow_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestValidateUnderlyingInterface(parentT *testing.T) {
}
}

g := newWorfklowBuilder(
g := newWorkflowBuilder(
&core.CompiledWorkflow{Template: inputWorkflow},
mustBuildWorkflowIndex(inputWorkflow),
common.NewTaskIndex(compiledTasks...),
Expand Down
11 changes: 6 additions & 5 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,26 @@ func newK8sEventRecorder(ctx context.Context, kubeclientset kubernetes.Interface
func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {

var wfLauncher launchplan.Executor
var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
adminClient, err := admin.InitializeAdminClientFromConfig(ctx)
if err != nil {
logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error())
return nil, err
}
wfLauncher, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration,
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration,
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"))
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
}

if err := wfLauncher.Initialize(ctx); err != nil {
if err := launchPlanActor.Initialize(ctx); err != nil {
logger.Errorf(ctx, "failed to initialize Admin workflow Launcher, err: %v", err.Error())
return nil, err
}
} else {
wfLauncher = launchplan.NewFailFastLaunchPlanExecutor()
launchPlanActor = launchplan.NewFailFastLaunchPlanExecutor()
}

logger.Info(ctx, "Setting up event sink and recorder")
Expand Down Expand Up @@ -298,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes,
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
Expand Down
41 changes: 38 additions & 3 deletions flytepropeller/pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -60,6 +62,7 @@ type dynamicNodeTaskNodeHandler struct {
TaskNodeHandler
metrics metrics
nodeExecutor executors.Node
lpReader launchplan.Reader
}

func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevState handler.DynamicNodeState, nCtx handler.NodeExecutionContext) (handler.Transition, handler.DynamicNodeState, error) {
Expand Down Expand Up @@ -377,8 +380,23 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return nil, true, err
}

// TODO: This will currently fail if the WF references any launch plans
closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, []common2.InterfaceProvider{})
// Get the requirements, that is, a list of all the task IDs and the launch plan IDs that will be called as part of this dynamic task.
// The definition of these will need to be fetched from Admin (in order to get the interface).
requirements, err := compiler.GetRequirements(wf, djSpec.Subworkflows)
if err != nil {
return nil, true, err
}

launchPlanInterfaces, err := d.getLaunchPlanInterfaces(ctx, requirements.GetRequiredLaunchPlanIds())
if err != nil {
return nil, true, err
}

// TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec.
// The reason they might be missing is because if a user yields a task that is SdkTask.fetch'ed, it should not be included
// See https://github.com/lyft/flyte/issues/219 for more information.

closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces)
if err != nil {
return nil, true, err
}
Expand All @@ -395,6 +413,22 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return newContextualWorkflow(nCtx.Workflow(), subwf, nStatus, subwf.Tasks, subwf.SubWorkflows, nCtx.DataStore()), true, nil
}

func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, launchPlanIDs []compiler.LaunchPlanRefIdentifier) (
[]common2.InterfaceProvider, error) {

var launchPlanInterfaces = make([]common2.InterfaceProvider, len(launchPlanIDs))
for idx, id := range launchPlanIDs {
lp, err := d.lpReader.GetLaunchPlan(ctx, &id)
if err != nil {
logger.Debugf(ctx, "Error fetching launch plan definition from admin")
return nil, err
}
launchPlanInterfaces[idx] = compiler.NewLaunchPlanInterfaceProvider(*lp)
}

return launchPlanInterfaces, nil
}

func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, dynamicWorkflow v1alpha1.ExecutableWorkflow,
nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) {

Expand Down Expand Up @@ -470,11 +504,12 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context,
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), prevState, nil
}

func New(underlying TaskNodeHandler, nodeExecutor executors.Node, scope promutils.Scope) handler.Node {
func New(underlying TaskNodeHandler, nodeExecutor executors.Node, launchPlanReader launchplan.Reader, scope promutils.Scope) handler.Node {

return &dynamicNodeTaskNodeHandler{
TaskNodeHandler: underlying,
metrics: newMetrics(scope),
nodeExecutor: nodeExecutor,
lpReader: launchPlanReader,
}
}
Loading

0 comments on commit 41a756f

Please sign in to comment.