Skip to content

Commit

Permalink
Add ability to recover executions (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Jul 21, 2021
1 parent 0510aaa commit 2d65956
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 10 deletions.
11 changes: 11 additions & 0 deletions flytectl/cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ Also an execution can be relaunched by passing in current execution id.
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytectldemo -d development
An execution can be recovered, that is recreated from the last known failure point for a previously-run workflow execution.
See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details.
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytectldemo -d development
Generic data types are also supported for execution in similar way.Following is sample of how the inputs need to be specified while creating the execution.
As usual the spec file should be generated first and then run the execution using the spec file.
Expand Down Expand Up @@ -138,6 +145,7 @@ type ExecutionConfig struct {
KubeServiceAcct string `json:"kubeServiceAcct" pflag:",kubernetes service account AuthRole for launching execution."`
IamRoleARN string `json:"iamRoleARN" pflag:",iam role ARN AuthRole for launching execution."`
Relaunch string `json:"relaunch" pflag:",execution id to be relaunched."`
Recover string `json:"recover" pflag:",execution id to be recreated from the last known failure point."`
// Non plfag section is read from the execution config generated by get task/launchplan
Workflow string `json:"workflow,omitempty"`
Task string `json:"task,omitempty"`
Expand All @@ -151,6 +159,7 @@ const (
Task ExecutionType = iota
Workflow
Relaunch
Recover
)

type ExecutionParams struct {
Expand All @@ -174,6 +183,8 @@ func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.C
switch execParams.execType {
case Relaunch:
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx)
case Recover:
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx)
case Task:
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx); err != nil {
return err
Expand Down
33 changes: 32 additions & 1 deletion flytectl/cmd/create/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func TestCreateLaunchPlanExecutionFunc(t *testing.T) {
func TestCreateRelaunchExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
defer func() { executionConfig.Relaunch = "" }()
relaunchExecResponse := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Expand All @@ -206,14 +207,44 @@ func TestCreateRelaunchExecutionFunc(t *testing.T) {
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
}

func TestCreateRecoverExecutionFunc(t *testing.T) {
setup()
createExecutionSetup()
defer func() { executionConfig.Recover = "" }()

originalExecutionName := "abc123"
recoverExecResponse := &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Name: "f652ea3596e7f4d80a0e",
},
}

executionConfig.Recover = originalExecutionName
recoverRequest := &admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: originalExecutionName,
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
},
}
mockClient.OnRecoverExecutionMatch(ctx, recoverRequest).Return(recoverExecResponse, nil)
err = createExecutionCommand(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "RecoverExecution", ctx, recoverRequest)
tearDownAndVerify(t, `execution identifier project:"flytesnacks" domain:"development" name:"f652ea3596e7f4d80a0e"`)
executionConfig.Relaunch = ""
}

func TestCreateExecutionFuncInvalid(t *testing.T) {
setup()
createExecutionSetup()
executionConfig.Relaunch = ""
executionConfig.ExecFile = ""
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("executionConfig or relaunch can't be empty. Run the flytectl get task/launchplan to generate the config"), err)
assert.Equal(t, fmt.Errorf("executionConfig, relaunch and recover can't be empty. Run the flytectl get task/launchplan to generate the config"), err)
executionConfig.ExecFile = "Invalid-file"
err = createExecutionCommand(ctx, args, cmdCtx)
assert.NotNil(t, err)
Expand Down
24 changes: 22 additions & 2 deletions flytectl/cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ func relaunchExecution(ctx context.Context, executionName string, project string
return nil
}

func recoverExecution(ctx context.Context, executionName string, project string, domain string,
cmdCtx cmdCore.CommandContext) error {
recoveredExec, err := cmdCtx.AdminClient().RecoverExecution(ctx, &admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: executionName,
Project: project,
Domain: domain,
},
})
if err != nil {
return err
}
fmt.Printf("execution identifier %v\n", recoveredExec.Id)
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, securityContext *core.SecurityContext,
authRole *admin.AuthRole) *admin.ExecutionCreateRequest {

Expand Down Expand Up @@ -164,14 +180,18 @@ func resolveOverrides(toBeOverridden *ExecutionConfig, project string, domain st

func readConfigAndValidate(project string, domain string) (ExecutionParams, error) {
executionParams := ExecutionParams{}
if executionConfig.ExecFile == "" && executionConfig.Relaunch == "" {
return executionParams, fmt.Errorf("executionConfig or relaunch can't be empty." +
if executionConfig.ExecFile == "" && executionConfig.Relaunch == "" && executionConfig.Recover == "" {
return executionParams, fmt.Errorf("executionConfig, relaunch and recover can't be empty." +
" Run the flytectl get task/launchplan to generate the config")
}
if executionConfig.Relaunch != "" {
resolveOverrides(executionConfig, project, domain)
return ExecutionParams{name: executionConfig.Relaunch, execType: Relaunch}, nil
}
if len(executionConfig.Recover) > 0 {
resolveOverrides(executionConfig, project, domain)
return ExecutionParams{name: executionConfig.Recover, execType: Recover}, nil
}
var readExecutionConfig *ExecutionConfig
var err error
if readExecutionConfig, err = readExecConfigFromFile(executionConfig.ExecFile); err != nil {
Expand Down
33 changes: 29 additions & 4 deletions flytectl/cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ import (
)

var (
relaunchExecResponse *admin.ExecutionCreateResponse
relaunchRequest *admin.ExecutionRelaunchRequest
executionCreateResponse *admin.ExecutionCreateResponse
relaunchRequest *admin.ExecutionRelaunchRequest
recoverRequest *admin.ExecutionRecoverRequest
)

// This function needs to be called after testutils.Steup()
func createExecutionUtilSetup() {
ctx = testutils.Ctx
cmdCtx = testutils.CmdCtx
mockClient = testutils.MockClient
relaunchExecResponse = &admin.ExecutionCreateResponse{
executionCreateResponse = &admin.ExecutionCreateResponse{
Id: &core.WorkflowExecutionIdentifier{
Project: "flytesnacks",
Domain: "development",
Expand All @@ -36,12 +37,19 @@ func createExecutionUtilSetup() {
Domain: config.GetConfig().Domain,
},
}
recoverRequest = &admin.ExecutionRecoverRequest{
Id: &core.WorkflowExecutionIdentifier{
Name: "execName",
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
},
}
}

func TestCreateExecutionForRelaunch(t *testing.T) {
setup()
createExecutionUtilSetup()
mockClient.OnRelaunchExecutionMatch(ctx, relaunchRequest).Return(relaunchExecResponse, nil)
mockClient.OnRelaunchExecutionMatch(ctx, relaunchRequest).Return(executionCreateResponse, nil)
err = relaunchExecution(ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
assert.Nil(t, err)
}
Expand All @@ -54,3 +62,20 @@ func TestCreateExecutionForRelaunchNotFound(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
}

func TestCreateExecutionForRecovery(t *testing.T) {
setup()
createExecutionUtilSetup()
mockClient.OnRecoverExecutionMatch(ctx, recoverRequest).Return(executionCreateResponse, nil)
err = recoverExecution(ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
assert.Nil(t, err)
}

func TestCreateExecutionForRecoveryNotFound(t *testing.T) {
setup()
createExecutionUtilSetup()
mockClient.OnRecoverExecutionMatch(ctx, recoverRequest).Return(nil, errors.New("unknown execution"))
err = recoverExecution(ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, cmdCtx)
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
}
1 change: 1 addition & 0 deletions flytectl/cmd/create/executionconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytectl/cmd/create/executionconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions flytectl/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
extensions = [
"sphinx.ext.autosummary",
"sphinx.ext.autosectionlabel",
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx.ext.viewcode",
"sphinx.ext.doctest",
Expand Down Expand Up @@ -181,3 +182,9 @@
"Miscellaneous",
),
]

# -- Options for intersphinx -------------------------------------------------
# intersphinx configuration
intersphinx_mapping = {
"flyteidl": ("https://docs.flyte.org/projects/flyteidl/en/latest", None),
}
8 changes: 8 additions & 0 deletions flytectl/docs/source/gen/flytectl_create_execution.rst

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flytectl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/docker/docker v20.10.7+incompatible
github.com/docker/go-connections v0.4.0
github.com/enescakir/emoji v1.0.0
github.com/flyteorg/flyteidl v0.19.9
github.com/flyteorg/flyteidl v0.19.14
github.com/flyteorg/flytestdlib v0.3.28
github.com/ghodss/yaml v1.0.0
github.com/golang/protobuf v1.4.3
Expand Down
4 changes: 2 additions & 2 deletions flytectl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.19.9 h1:1j4/YbV/G1m2hrK017F9K0JYZYxCCwf4qtEkiNnUiEw=
github.com/flyteorg/flyteidl v0.19.9/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.14 h1:OLg2eT9uYllcfMMjEZJoXQ+2WXcrNbUxD+yaCrz2AlI=
github.com/flyteorg/flyteidl v0.19.14/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.28 h1:bvyldApjlUy9ETxSFpYvLhYLJxxndnMZTf93rVG6a00=
github.com/flyteorg/flytestdlib v0.3.28/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
Expand Down

0 comments on commit 2d65956

Please sign in to comment.