Skip to content

Commit

Permalink
Added capability to relaunch an execution (flyteorg#52)
Browse files Browse the repository at this point in the history
* Added capability to relaunch an execution

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added more coverage

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Moved around test function

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Added few more tests

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Using RelaunchExecution directly

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* added comments for interface and renamed the file

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored and robert-ulbrich-mercedes-benz committed Jul 2, 2024
1 parent 74a149d commit 3a60db7
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 46 deletions.
28 changes: 24 additions & 4 deletions flytectl/cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ The root project and domain flags of -p and -d should point to task/launch plans
flytectl create execution --execFile execution_spec.yaml -p flytectldemo -d development --targetProject flytesnacks
Also an execution can be relaunched by passing in current execution id.
::
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytectldemo -d development
Usage
`
)
Expand All @@ -86,16 +92,25 @@ type ExecutionConfig struct {
TargetProject string `json:"targetProject" pflag:",project where execution needs to be created.If not specified configured project would be used."`
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."`
// Non plfag section is read from the execution config generated by get task/launchplan
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Version string `json:"version"`
Inputs map[string]interface{} `json:"inputs"`
}

type ExecutionType int

const (
Task ExecutionType = iota
Workflow
Relaunch
)

type ExecutionParams struct {
name string
isTask bool
name string
execType ExecutionType
}

var (
Expand All @@ -111,14 +126,19 @@ func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.C
return err
}
var executionRequest *admin.ExecutionCreateRequest
if execParams.isTask {
switch execParams.execType {
case Relaunch:
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx)
case Task:
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
return err
}
} else {
case Workflow:
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
return err
}
default:
return fmt.Errorf("invalid execution type %v", execParams.execType)
}
exec, _err := cmdCtx.AdminClient().CreateExecution(ctx, executionRequest)
if _err != nil {
Expand Down
71 changes: 64 additions & 7 deletions flytectl/cmd/create/execution_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package create

import (
"fmt"
"testing"

"github.com/flyteorg/flytectl/cmd/config"
Expand Down Expand Up @@ -129,6 +130,36 @@ func createExecutionSetup() {
}
mockClient.OnGetLaunchPlanMatch(ctx, objectGetRequest).Return(launchPlan1, nil)
}

func TestCreateTaskExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
executionCreateResponseTask := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "ff513c0e44b5b4a35aa5",
},
}
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseTask, nil)
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"ff513c0e44b5b4a35aa5" `)
}

func TestCreateTaskExecutionFuncError(t *testing.T) {
setup()
createExecutionSetup()
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(nil, fmt.Errorf("error launching task"))
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("error launching task"), err)
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
}

func TestCreateLaunchPlanExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
Expand All @@ -147,20 +178,46 @@ func TestCreateLaunchPlanExecutionFunc(t *testing.T) {
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
}

func TestCreateTaskExecutionFunc(t *testing.T) {
func TestCreateRelaunchExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
executionCreateResponseTask := &admin.ExecutionCreateResponse{
relaunchExecResponse := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "ff513c0e44b5b4a35aa5",
Name: "f652ea3596e7f4d80a0e",
},
}
mockClient.OnCreateExecutionMatch(ctx, mock.Anything).Return(executionCreateResponseTask, nil)
executionConfig.ExecFile = testDataFolder + "task_execution_spec.yaml"

executionConfig.Relaunch = relaunchExecResponse.Id.Name
relaunchRequest := &admin.ExecutionRelaunchRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: executionConfig.Relaunch,
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
},
}
mockClient.OnRelaunchExecutionMatch(ctx, relaunchRequest).Return(relaunchExecResponse, nil)
err = createExecutionCommand(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "CreateExecution", ctx, mock.Anything)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"ff513c0e44b5b4a35aa5" `)
mockClient.AssertCalled(t, "RelaunchExecution", ctx, relaunchRequest)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
}

func TestCreateExecutionFuncInvalid(t *testing.T) {
setup()
createExecutionSetup()
executionConfig.Relaunch = ""
executionConfig.ExecFile = ""
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("executionConfig or relaunch can't be empty. Run the flytectl get task/launchplan to generate the config"), err)
executionConfig.ExecFile = "Invalid-file"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("unable to read from %v yaml file", executionConfig.ExecFile), err)
executionConfig.ExecFile = testDataFolder + "invalid_execution_spec.yaml"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("either one of task or workflow name should be specified to launch an execution"), err)
}
47 changes: 33 additions & 14 deletions flytectl/cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package create

import (
"context"
"errors"
"fmt"
"io/ioutil"
"strings"
Expand All @@ -11,7 +10,6 @@ import (
cmdGet "github.com/flyteorg/flytectl/cmd/get"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/google/uuid"
"sigs.k8s.io/yaml"
)
Expand All @@ -20,7 +18,7 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName string,
var lp *admin.LaunchPlan
var err error
// Fetch the launch plan
if lp, err = cmdGet.FetchLPVersion(ctx, workflowName, executionConfig.Version, project, domain, cmdCtx); err != nil {
if lp, err = cmdGet.DefaultFetcher.FetchLPVersion(ctx, workflowName, executionConfig.Version, project, domain, cmdCtx); err != nil {
return nil, err
}
// Create workflow params literal map
Expand Down Expand Up @@ -70,6 +68,21 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
return createExecutionRequest(ID, inputs, authRole), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string, cmdCtx cmdCore.CommandContext) error {
relaunchedExec, err := cmdCtx.AdminClient().RelaunchExecution(ctx, &admin.ExecutionRelaunchRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: executionName,
Project: project,
Domain: domain,
},
})
if err != nil {
return err
}
fmt.Printf("execution identifier %v\n", relaunchedExec.Id)
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, authRole *admin.AuthRole) *admin.ExecutionCreateRequest {
return &admin.ExecutionCreateRequest{
Project: executionConfig.TargetProject,
Expand Down Expand Up @@ -100,32 +113,36 @@ func readExecConfigFromFile(fileName string) (*ExecutionConfig, error) {
return &executionConfigRead, nil
}

func resolveOverrides(readExecutionConfig *ExecutionConfig, project string, domain string) {
func resolveOverrides(toBeOverridden *ExecutionConfig, project string, domain string) {
if executionConfig.KubeServiceAcct != "" {
readExecutionConfig.KubeServiceAcct = executionConfig.KubeServiceAcct
toBeOverridden.KubeServiceAcct = executionConfig.KubeServiceAcct
}
if executionConfig.IamRoleARN != "" {
readExecutionConfig.IamRoleARN = executionConfig.IamRoleARN
toBeOverridden.IamRoleARN = executionConfig.IamRoleARN
}
if executionConfig.TargetProject != "" {
readExecutionConfig.TargetProject = executionConfig.TargetProject
toBeOverridden.TargetProject = executionConfig.TargetProject
}
if executionConfig.TargetDomain != "" {
readExecutionConfig.TargetDomain = executionConfig.TargetDomain
toBeOverridden.TargetDomain = executionConfig.TargetDomain
}
// Use the root project and domain to launch the task/workflow if target is unspecified
if executionConfig.TargetProject == "" {
readExecutionConfig.TargetProject = project
toBeOverridden.TargetProject = project
}
if executionConfig.TargetDomain == "" {
readExecutionConfig.TargetDomain = domain
toBeOverridden.TargetDomain = domain
}
}

func readConfigAndValidate(project string, domain string) (ExecutionParams, error) {
executionParams := ExecutionParams{}
if executionConfig.ExecFile == "" {
return executionParams, errors.New("executionConfig can't be empty. Run the flytectl get task/launchplan to generate the config")
if executionConfig.ExecFile == "" && executionConfig.Relaunch == "" {
return executionParams, fmt.Errorf("executionConfig or relaunch can't be empty. Run the flytectl get task/launchplan to generate the config")
}
if executionConfig.Relaunch != "" {
resolveOverrides(executionConfig, project, domain)
return ExecutionParams{name: executionConfig.Relaunch, execType: Relaunch}, nil
}
var readExecutionConfig *ExecutionConfig
var err error
Expand All @@ -138,11 +155,13 @@ func readConfigAndValidate(project string, domain string) (ExecutionParams, erro
isTask := readExecutionConfig.Task != ""
isWorkflow := readExecutionConfig.Workflow != ""
if isTask == isWorkflow {
return executionParams, errors.New("either one of task or workflow name should be specified to launch an execution")
return executionParams, fmt.Errorf("either one of task or workflow name should be specified to launch an execution")
}
name := readExecutionConfig.Task
execType := Task
if !isTask {
name = readExecutionConfig.Workflow
execType = Workflow
}
return ExecutionParams{name: name, isTask: isTask}, nil
return ExecutionParams{name: name, execType: execType}, nil
}
56 changes: 56 additions & 0 deletions flytectl/cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package create

import (
"errors"
"testing"

"github.com/flyteorg/flytectl/cmd/config"
"github.com/flyteorg/flytectl/cmd/testutils"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/stretchr/testify/assert"
)

var (
relaunchExecResponse *admin.ExecutionCreateResponse
relaunchRequest *admin.ExecutionRelaunchRequest
)

// This function needs to be called after testutils.Steup()
func createExecutionUtilSetup() {
ctx = testutils.Ctx
cmdCtx = testutils.CmdCtx
mockClient = testutils.MockClient
relaunchExecResponse = &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "f652ea3596e7f4d80a0e",
},
}
relaunchRequest = &admin.ExecutionRelaunchRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: "execName",
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
},
}
}

func TestCreateExecutionForRelaunch(t *testing.T) {
setup()
createExecutionUtilSetup()
mockClient.OnRelaunchExecutionMatch(ctx, relaunchRequest).Return(relaunchExecResponse, nil)
err = relaunchExecution(ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
assert.Nil(t, err)
}

func TestCreateExecutionForRelaunchNotFound(t *testing.T) {
setup()
createExecutionUtilSetup()
mockClient.OnRelaunchExecutionMatch(ctx, relaunchRequest).Return(nil, errors.New("unknown execution"))
err = relaunchExecution(ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
}
12 changes: 7 additions & 5 deletions flytectl/cmd/create/executionconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions flytectl/cmd/get/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -70,13 +69,7 @@ func getExecutionFunc(ctx context.Context, args []string, cmdCtx cmdCore.Command
var executions []*admin.Execution
if len(args) > 0 {
name := args[0]
execution, err := cmdCtx.AdminClient().GetExecution(ctx, &admin.WorkflowExecutionGetRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: name,
},
})
execution, err := DefaultFetcher.FetchExecution(ctx, name, config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3a60db7

Please sign in to comment.