From df3c00fd2d461d37e9ab4566b47a81bf435e739d Mon Sep 17 00:00:00 2001 From: Prafulla Mahindrakar Date: Tue, 17 May 2022 18:34:00 +0530 Subject: [PATCH] Added enableSchedule option on registration Signed-off-by: Prafulla Mahindrakar --- .../subcommand/register/files_config.go | 1 + .../subcommand/register/filesconfig_flags.go | 1 + .../register/filesconfig_flags_test.go | 14 ++++++++++++ cmd/get/execution.go | 1 + cmd/register/files_test.go | 6 ++++- cmd/register/register_util.go | 22 ++++++++++++++++--- cmd/register/register_util_test.go | 3 ++- 7 files changed, 43 insertions(+), 5 deletions(-) diff --git a/cmd/config/subcommand/register/files_config.go b/cmd/config/subcommand/register/files_config.go index ea233358..3a542034 100644 --- a/cmd/config/subcommand/register/files_config.go +++ b/cmd/config/subcommand/register/files_config.go @@ -26,6 +26,7 @@ type FilesConfig struct { DeprecatedSourceUploadPath string `json:"sourceUploadPath" pflag:",Deprecated: Update flyte admin to avoid having to configure storage access from flytectl."` DestinationDirectory string `json:"destinationDirectory" pflag:",Location of source code in container."` DryRun bool `json:"dryRun" pflag:",Execute command without making any modifications."` + EnableSchedule bool `json:"enableSchedule" pflag:",Enable the schedule if the files contain schedulable launchplan."` } func GetConfig() *FilesConfig { diff --git a/cmd/config/subcommand/register/filesconfig_flags.go b/cmd/config/subcommand/register/filesconfig_flags.go index 9ca76123..10d4756c 100755 --- a/cmd/config/subcommand/register/filesconfig_flags.go +++ b/cmd/config/subcommand/register/filesconfig_flags.go @@ -61,5 +61,6 @@ func (cfg FilesConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.StringVar(&DefaultFilesConfig.DeprecatedSourceUploadPath, fmt.Sprintf("%v%v", prefix, "sourceUploadPath"), DefaultFilesConfig.DeprecatedSourceUploadPath, "Deprecated: Update flyte admin to avoid having to configure storage access from flytectl.") cmdFlags.StringVar(&DefaultFilesConfig.DestinationDirectory, fmt.Sprintf("%v%v", prefix, "destinationDirectory"), DefaultFilesConfig.DestinationDirectory, "Location of source code in container.") cmdFlags.BoolVar(&DefaultFilesConfig.DryRun, fmt.Sprintf("%v%v", prefix, "dryRun"), DefaultFilesConfig.DryRun, "Execute command without making any modifications.") + cmdFlags.BoolVar(&DefaultFilesConfig.EnableSchedule, fmt.Sprintf("%v%v", prefix, "enableSchedule"), DefaultFilesConfig.EnableSchedule, "Enable the schedule if the files contain schedulable launchplan.") return cmdFlags } diff --git a/cmd/config/subcommand/register/filesconfig_flags_test.go b/cmd/config/subcommand/register/filesconfig_flags_test.go index 911df3dc..14fbc00e 100755 --- a/cmd/config/subcommand/register/filesconfig_flags_test.go +++ b/cmd/config/subcommand/register/filesconfig_flags_test.go @@ -253,4 +253,18 @@ func TestFilesConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_enableSchedule", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("enableSchedule", testValue) + if vBool, err := cmdFlags.GetBool("enableSchedule"); err == nil { + testDecodeJson_FilesConfig(t, fmt.Sprintf("%v", vBool), &actual.EnableSchedule) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/cmd/get/execution.go b/cmd/get/execution.go index 657d2834..26ec0b79 100644 --- a/cmd/get/execution.go +++ b/cmd/get/execution.go @@ -93,6 +93,7 @@ var hundredChars = 100 var executionColumns = []printer.Column{ {Header: "Name", JSONPath: "$.id.name"}, {Header: "Launch Plan Name", JSONPath: "$.spec.launchPlan.name"}, + {Header: "Version", JSONPath: "$.spec.launchPlan.version"}, {Header: "Type", JSONPath: "$.spec.launchPlan.resourceType"}, {Header: "Phase", JSONPath: "$.closure.phase"}, {Header: "Scheduled Time", JSONPath: "$.spec.metadata.scheduledAt"}, diff --git a/cmd/register/files_test.go b/cmd/register/files_test.go index ea167f29..0a9eb330 100644 --- a/cmd/register/files_test.go +++ b/cmd/register/files_test.go @@ -31,6 +31,7 @@ func TestRegisterFromFiles(t *testing.T) { s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) err := registerFromFilesFunc(s.Ctx, args, s.CmdCtx) assert.Nil(t, err) }) @@ -52,7 +53,7 @@ func TestRegisterFromFiles(t *testing.T) { s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) - + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) mockDataProxy := s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient) mockDataProxy.OnCreateUploadLocationMatch(s.Ctx, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil) @@ -75,6 +76,7 @@ func TestRegisterFromFiles(t *testing.T) { s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient).OnCreateUploadLocationMatch(mock.Anything, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil) err = Register(s.Ctx, args, config.GetConfig(), s.CmdCtx) assert.Nil(t, err) @@ -95,6 +97,7 @@ func TestRegisterFromFiles(t *testing.T) { s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) assert.NotNil(t, err) }) @@ -163,6 +166,7 @@ func TestRegisterFromFiles(t *testing.T) { s.MockAdminClient.OnCreateTaskMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateWorkflowMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + s.MockAdminClient.OnUpdateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) s.MockClient.DataProxyClient().(*mocks.DataProxyServiceClient).OnCreateUploadLocationMatch(mock.Anything, mock.Anything).Return(&service.CreateUploadLocationResponse{}, nil) err = registerFromFilesFunc(s.Ctx, args, s.CmdCtx) assert.Nil(t, err) diff --git a/cmd/register/register_util.go b/cmd/register/register_util.go index 82310263..9d61085f 100644 --- a/cmd/register/register_util.go +++ b/cmd/register/register_util.go @@ -125,7 +125,7 @@ func unMarshalContents(ctx context.Context, fileContents []byte, fname string) ( } -func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext, dryRun bool) error { +func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext, dryRun, enableSchedule bool) error { switch v := message.(type) { case *admin.LaunchPlan: launchPlan := message.(*admin.LaunchPlan) @@ -144,7 +144,23 @@ func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.Command }, Spec: launchPlan.Spec, }) - return err + if err != nil { + return err + } + // Activate the launchplan + if enableSchedule { + _, err = cmdCtx.AdminClient().UpdateLaunchPlan(ctx, &admin.LaunchPlanUpdateRequest{ + Id: &core.Identifier{ + Project: config.GetConfig().Project, + Domain: config.GetConfig().Domain, + Name: launchPlan.Id.Name, + Version: launchPlan.Id.Version, + }, + State: admin.LaunchPlanState_ACTIVE, + }) + return err + } + return nil case *admin.WorkflowSpec: workflowSpec := message.(*admin.WorkflowSpec) if dryRun { @@ -581,7 +597,7 @@ func registerFile(ctx context.Context, fileName string, registerResults []Result registerResults = append(registerResults, registerResult) return registerResults, err } - if err := register(ctx, spec, cmdCtx, config.DryRun); err != nil { + if err := register(ctx, spec, cmdCtx, config.DryRun, config.EnableSchedule); err != nil { // If error is AlreadyExists then dont consider this to be an error but just a warning state if grpcError := status.Code(err); grpcError == codes.AlreadyExists { registerResult = Result{Name: fileName, Status: "Success", Info: fmt.Sprintf("%v", grpcError.String())} diff --git a/cmd/register/register_util_test.go b/cmd/register/register_util_test.go index c9d90280..13ae96cd 100644 --- a/cmd/register/register_util_test.go +++ b/cmd/register/register_util_test.go @@ -62,6 +62,7 @@ func registerFilesSetup() { rconfig.DefaultFilesConfig.AssumableIamRole = "" rconfig.DefaultFilesConfig.K8sServiceAccount = "" rconfig.DefaultFilesConfig.OutputLocationPrefix = "" + rconfig.DefaultFilesConfig.EnableSchedule = true } func TestGetSortedArchivedFileWithParentFolderList(t *testing.T) { @@ -476,7 +477,7 @@ func TestRegister(t *testing.T) { s := setup() registerFilesSetup() node := &admin.NodeExecution{} - err := register(s.Ctx, node, s.CmdCtx, rconfig.DefaultFilesConfig.DryRun) + err := register(s.Ctx, node, s.CmdCtx, rconfig.DefaultFilesConfig.DryRun, rconfig.DefaultFilesConfig.EnableSchedule) assert.NotNil(t, err) }) }