forked from flyteorg/flyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create executions in flytectl (flyteorg#39)
- Loading branch information
1 parent
d0cdbc5
commit f506799
Showing
40 changed files
with
3,000 additions
and
130 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,43 @@ | ||
package create | ||
|
||
import ( | ||
"context" | ||
"sort" | ||
"testing" | ||
|
||
cmdCore "github.com/flyteorg/flytectl/cmd/core" | ||
"github.com/flyteorg/flytectl/cmd/testutils" | ||
"github.com/flyteorg/flyteidl/clients/go/admin/mocks" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
const testDataFolder = "../testdata/" | ||
|
||
var ( | ||
err error | ||
ctx context.Context | ||
mockClient *mocks.AdminServiceClient | ||
args []string | ||
cmdCtx cmdCore.CommandContext | ||
) | ||
var setup = testutils.Setup | ||
var tearDownAndVerify = testutils.TearDownAndVerify | ||
|
||
func TestCreateCommand(t *testing.T) { | ||
createCommand := RemoteCreateCommand() | ||
assert.Equal(t, createCommand.Use, "create") | ||
assert.Equal(t, createCommand.Short, "Used for creating various flyte resources including tasks/workflows/launchplans/executions/project.") | ||
assert.Equal(t, len(createCommand.Commands()), 1) | ||
assert.Equal(t, len(createCommand.Commands()), 2) | ||
cmdNouns := createCommand.Commands() | ||
// Sort by Use value. | ||
sort.Slice(cmdNouns, func(i, j int) bool { | ||
return cmdNouns[i].Use < cmdNouns[j].Use | ||
}) | ||
assert.Equal(t, cmdNouns[0].Use, "project") | ||
assert.Equal(t, cmdNouns[0].Aliases, []string{"projects"}) | ||
assert.Equal(t, cmdNouns[0].Short, "Create project resources") | ||
assert.Equal(t, cmdNouns[0].Use, "execution") | ||
assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"}) | ||
assert.Equal(t, cmdNouns[0].Short, executionShort) | ||
assert.Equal(t, cmdNouns[1].Use, "project") | ||
assert.Equal(t, cmdNouns[1].Aliases, []string{"projects"}) | ||
assert.Equal(t, cmdNouns[1].Short, "Create project resources") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package create | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/flyteorg/flytectl/cmd/config" | ||
cmdCore "github.com/flyteorg/flytectl/cmd/core" | ||
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" | ||
) | ||
|
||
const ( | ||
executionShort = "Create execution resources" | ||
executionLong = ` | ||
Create the executions for given workflow/task in a project and domain. | ||
There are three steps in generating an execution. | ||
- Generate the execution spec file using the get command. | ||
- Update the inputs for the execution if needed. | ||
- Run the execution by passing in the generated yaml file. | ||
The spec file should be generated first and then run the execution using the spec file. | ||
You can reference the flytectl get task for more details | ||
:: | ||
flytectl get tasks -d development -p flytectldemo core.advanced.run_merge_sort.merge --version v2 --execFile execution_spec.yaml | ||
The generated file would look similar to this | ||
.. code-block:: yaml | ||
iamRoleARN: "" | ||
inputs: | ||
sorted_list1: | ||
- 0 | ||
sorted_list2: | ||
- 0 | ||
kubeServiceAcct: "" | ||
targetDomain: "" | ||
targetProject: "" | ||
task: core.advanced.run_merge_sort.merge | ||
version: "v2" | ||
The generated file can be modified to change the input values. | ||
.. code-block:: yaml | ||
iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole' | ||
inputs: | ||
sorted_list1: | ||
- 2 | ||
- 4 | ||
- 6 | ||
sorted_list2: | ||
- 1 | ||
- 3 | ||
- 5 | ||
kubeServiceAcct: "" | ||
targetDomain: "" | ||
targetProject: "" | ||
task: core.advanced.run_merge_sort.merge | ||
version: "v2" | ||
And then can be passed through the command line. | ||
Notice the source and target domain/projects can be different. | ||
The root project and domain flags of -p and -d should point to task/launch plans project/domain. | ||
:: | ||
flytectl create execution --execFile execution_spec.yaml -p flytectldemo -d development --targetProject flytesnacks | ||
Usage | ||
` | ||
) | ||
|
||
//go:generate pflags ExecutionConfig --default-var executionConfig | ||
|
||
// ExecutionConfig hold configuration for create execution flags and configuration of the actual task or workflow to be launched. | ||
type ExecutionConfig struct { | ||
// pflag section | ||
ExecFile string `json:"execFile,omitempty" pflag:",file for the execution params.If not specified defaults to <<workflow/task>_name>.execution_spec.yaml"` | ||
TargetDomain string `json:"targetDomain" pflag:",project where execution needs to be created.If not specified configured domain would be used."` | ||
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created.If not specified configured project would be used."` | ||
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."` | ||
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."` | ||
// Non plfag section is read from the execution config generated by get task/launchplan | ||
Workflow string `json:"workflow,omitempty"` | ||
Task string `json:"task,omitempty"` | ||
Version string `json:"version"` | ||
Inputs map[string]interface{} `json:"inputs"` | ||
} | ||
|
||
type ExecutionParams struct { | ||
name string | ||
isTask bool | ||
} | ||
|
||
var ( | ||
executionConfig = &ExecutionConfig{} | ||
) | ||
|
||
func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error { | ||
var execParams ExecutionParams | ||
var err error | ||
sourceProject := config.GetConfig().Project | ||
sourceDomain := config.GetConfig().Domain | ||
if execParams, err = readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain); err != nil { | ||
return err | ||
} | ||
var executionRequest *admin.ExecutionCreateRequest | ||
if execParams.isTask { | ||
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil { | ||
return err | ||
} | ||
} else { | ||
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil { | ||
return err | ||
} | ||
} | ||
exec, _err := cmdCtx.AdminClient().CreateExecution(ctx, executionRequest) | ||
if _err != nil { | ||
return _err | ||
} | ||
fmt.Printf("execution identifier %v\n", exec.Id) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
package create | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/flyteorg/flytectl/cmd/config" | ||
"github.com/flyteorg/flytectl/cmd/testutils" | ||
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" | ||
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
// This function needs to be called after testutils.Steup() | ||
func createExecutionSetup() { | ||
ctx = testutils.Ctx | ||
cmdCtx = testutils.CmdCtx | ||
mockClient = testutils.MockClient | ||
sortedListLiteralType := core.Variable{ | ||
Type: &core.LiteralType{ | ||
Type: &core.LiteralType_CollectionType{ | ||
CollectionType: &core.LiteralType{ | ||
Type: &core.LiteralType_Simple{ | ||
Simple: core.SimpleType_INTEGER, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
variableMap := map[string]*core.Variable{ | ||
"sorted_list1": &sortedListLiteralType, | ||
"sorted_list2": &sortedListLiteralType, | ||
} | ||
|
||
task1 := &admin.Task{ | ||
Id: &core.Identifier{ | ||
Name: "task1", | ||
Version: "v2", | ||
}, | ||
Closure: &admin.TaskClosure{ | ||
CreatedAt: ×tamppb.Timestamp{Seconds: 1, Nanos: 0}, | ||
CompiledTask: &core.CompiledTask{ | ||
Template: &core.TaskTemplate{ | ||
Interface: &core.TypedInterface{ | ||
Inputs: &core.VariableMap{ | ||
Variables: variableMap, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
mockClient.OnGetTaskMatch(ctx, mock.Anything).Return(task1, nil) | ||
parameterMap := map[string]*core.Parameter{ | ||
"numbers": { | ||
Var: &core.Variable{ | ||
Type: &core.LiteralType{ | ||
Type: &core.LiteralType_CollectionType{ | ||
CollectionType: &core.LiteralType{ | ||
Type: &core.LiteralType_Simple{ | ||
Simple: core.SimpleType_INTEGER, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
"numbers_count": { | ||
Var: &core.Variable{ | ||
Type: &core.LiteralType{ | ||
Type: &core.LiteralType_Simple{ | ||
Simple: core.SimpleType_INTEGER, | ||
}, | ||
}, | ||
}, | ||
}, | ||
"run_local_at_count": { | ||
Var: &core.Variable{ | ||
Type: &core.LiteralType{ | ||
Type: &core.LiteralType_Simple{ | ||
Simple: core.SimpleType_INTEGER, | ||
}, | ||
}, | ||
}, | ||
Behavior: &core.Parameter_Default{ | ||
Default: &core.Literal{ | ||
Value: &core.Literal_Scalar{ | ||
Scalar: &core.Scalar{ | ||
Value: &core.Scalar_Primitive{ | ||
Primitive: &core.Primitive{ | ||
Value: &core.Primitive_Integer{ | ||
Integer: 10, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
launchPlan1 := &admin.LaunchPlan{ | ||
Id: &core.Identifier{ | ||
Name: "core.advanced.run_merge_sort.merge_sort", | ||
Version: "v3", | ||
}, | ||
Spec: &admin.LaunchPlanSpec{ | ||
DefaultInputs: &core.ParameterMap{ | ||
Parameters: parameterMap, | ||
}, | ||
}, | ||
Closure: &admin.LaunchPlanClosure{ | ||
CreatedAt: ×tamppb.Timestamp{Seconds: 0, Nanos: 0}, | ||
ExpectedInputs: &core.ParameterMap{ | ||
Parameters: parameterMap, | ||
}, | ||
}, | ||
} | ||
objectGetRequest := &admin.ObjectGetRequest{ | ||
Id: &core.Identifier{ | ||
ResourceType: core.ResourceType_LAUNCH_PLAN, | ||
Project: config.GetConfig().Project, | ||
Domain: config.GetConfig().Domain, | ||
Name: "core.advanced.run_merge_sort.merge_sort", | ||
Version: "v3", | ||
}, | ||
} | ||
mockClient.OnGetLaunchPlanMatch(ctx, objectGetRequest).Return(launchPlan1, nil) | ||
} | ||
func TestCreateLaunchPlanExecutionFunc(t *testing.T) { | ||
setup() | ||
createExecutionSetup() | ||
executionCreateResponseLP := &admin.ExecutionCreateResponse{ | ||
Id: &core.WorkflowExecutionIdentifier{ | ||
Project: "flytesnacks", | ||
Domain: "development", | ||
Name: "f652ea3596e7f4d80a0e", | ||
}, | ||
} | ||
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseLP, nil) | ||
executionConfig.ExecFile = testDataFolder + "launchplan_execution_spec.yaml" | ||
err = createExecutionCommand(ctx, args, cmdCtx) | ||
assert.Nil(t, err) | ||
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything) | ||
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`) | ||
} | ||
|
||
func TestCreateTaskExecutionFunc(t *testing.T) { | ||
setup() | ||
createExecutionSetup() | ||
executionCreateResponseTask := &admin.ExecutionCreateResponse{ | ||
Id: &core.WorkflowExecutionIdentifier{ | ||
Project: "flytesnacks", | ||
Domain: "development", | ||
Name: "ff513c0e44b5b4a35aa5", | ||
}, | ||
} | ||
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseTask, nil) | ||
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml" | ||
err = createExecutionCommand(ctx, args, cmdCtx) | ||
assert.Nil(t, err) | ||
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything) | ||
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"ff513c0e44b5b4a35aa5" `) | ||
} |
Oops, something went wrong.