Skip to content

Commit

Permalink
Specify cluster pool when creating execution (flyteorg#355)
Browse files Browse the repository at this point in the history
* Create execution with specifying clusterPool

Signed-off-by: Iaroslav Ciupin <[email protected]>

* format

Signed-off-by: Iaroslav Ciupin <[email protected]>

* update flyteidl

Signed-off-by: Iaroslav Ciupin <[email protected]>

* refactor

Signed-off-by: Iaroslav Ciupin <[email protected]>

* Address comments

Signed-off-by: Iaroslav Ciupin <[email protected]>

* update flyteidl

Signed-off-by: Iaroslav Ciupin <[email protected]>

* increase coverage

Signed-off-by: Iaroslav Ciupin <[email protected]>

Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin authored Sep 30, 2022
1 parent 5a331a4 commit d2baa09
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 108 deletions.
29 changes: 19 additions & 10 deletions cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"fmt"

"github.com/flyteorg/flytectl/cmd/config"
cmdCore "github.com/flyteorg/flytectl/cmd/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flytectl/cmd/config"
cmdCore "github.com/flyteorg/flytectl/cmd/core"
)

const (
Expand Down Expand Up @@ -132,6 +133,13 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
9. If you have configured a plugin that implements github.com/flyteorg/flyteadmin/pkg/workflowengine/interfaces/WorkflowExecutor
that supports cluster pools, then when creating a new execution, you can assign it to a specific cluster pool:
::
flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d development --clusterPool my-gpu-cluster
Usage
`
)
Expand All @@ -150,6 +158,7 @@ type ExecutionConfig struct {
Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."`
DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."`
Version string `json:"version" pflag:",specify version of execution workflow/task."`
ClusterPool string `json:"clusterPool" pflag:",specify which cluster pool to assign execution to."`
// Non plfag section is read from the execution config generated by get task/launch plan
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Expand All @@ -170,13 +179,9 @@ type ExecutionParams struct {
execType ExecutionType
}

var (
executionConfig = &ExecutionConfig{}
)
var executionConfig = &ExecutionConfig{}

func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
var execParams ExecutionParams
var err error
sourceProject := config.GetConfig().Project
sourceDomain := config.GetConfig().Domain

Expand All @@ -185,7 +190,8 @@ func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.C
targetExecName = args[0]
}

if execParams, err = readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain); err != nil {
execParams, err := readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain)
if err != nil {
return err
}
var executionRequest *admin.ExecutionCreateRequest
Expand All @@ -195,16 +201,19 @@ func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.C
case Recover:
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
case Task:
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName); err != nil {
executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
if err != nil {
return err
}
case Workflow:
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName); err != nil {
executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid execution type %v", execParams.execType)
}

if executionConfig.DryRun {
logger.Debugf(ctx, "skipping CreateExecution request (DryRun)")
} else {
Expand Down
Loading

0 comments on commit d2baa09

Please sign in to comment.