diff --git a/flytectl/cmd/register/register_util.go b/flytectl/cmd/register/register_util.go index 9f3b678794..f81c438fec 100644 --- a/flytectl/cmd/register/register_util.go +++ b/flytectl/cmd/register/register_util.go @@ -275,39 +275,94 @@ func hydrateTaskSpec(task *admin.TaskSpec, sourceCode, sourceUploadPath, version return nil } -func validateLaunchSpec(lpSpec *admin.LaunchPlanSpec) error { - if lpSpec == nil { - return nil - } - if lpSpec.EntityMetadata != nil && lpSpec.EntityMetadata.Schedule != nil { - schedule := lpSpec.EntityMetadata.Schedule - var scheduleFixedParams []string - if lpSpec.DefaultInputs != nil { - for paramKey := range lpSpec.DefaultInputs.Parameters { - if paramKey != schedule.KickoffTimeInputArg { - scheduleFixedParams = append(scheduleFixedParams, paramKey) - } +func validateLPWithSchedule(lpSpec *admin.LaunchPlanSpec, wf *admin.Workflow) error { + schedule := lpSpec.EntityMetadata.Schedule + var scheduleRequiredParams []string + if wf != nil && wf.Closure != nil && wf.Closure.CompiledWorkflow != nil && + wf.Closure.CompiledWorkflow.Primary != nil && wf.Closure.CompiledWorkflow.Primary.Template != nil && + wf.Closure.CompiledWorkflow.Primary.Template.Interface != nil && + wf.Closure.CompiledWorkflow.Primary.Template.Interface.Inputs != nil { + variables := wf.Closure.CompiledWorkflow.Primary.Template.Interface.Inputs.Variables + for varName := range variables { + if varName != schedule.KickoffTimeInputArg { + scheduleRequiredParams = append(scheduleRequiredParams, varName) } } - if (lpSpec.FixedInputs == nil && len(scheduleFixedParams) > 0) || - (len(scheduleFixedParams) > len(lpSpec.FixedInputs.Literals)) { - fixedInputLen := 0 - if lpSpec.FixedInputs != nil { - fixedInputLen = len(lpSpec.FixedInputs.Literals) + + } + // Either the scheduled param should have default or fixed values + var scheduleParamsWithValues []string + // Check for default values + if lpSpec.DefaultInputs != nil { + for paramName := range lpSpec.DefaultInputs.Parameters { + if paramName != schedule.KickoffTimeInputArg { + scheduleParamsWithValues = append(scheduleParamsWithValues, paramName) } - return fmt.Errorf("param values are missing on scheduled workflow."+ - "additional args other than %v on scheduled workflow are %v > %v fixed values", schedule.KickoffTimeInputArg, - len(scheduleFixedParams), fixedInputLen) } } + // Check for fixed values + if lpSpec.FixedInputs != nil && lpSpec.FixedInputs.Literals != nil { + for fixedLiteralName := range lpSpec.FixedInputs.Literals { + scheduleParamsWithValues = append(scheduleParamsWithValues, fixedLiteralName) + } + } + + diffSet := leftDiff(scheduleRequiredParams, scheduleParamsWithValues) + if len(diffSet) > 0 { + return fmt.Errorf("param values are missing on scheduled workflow "+ + "for the following params %v. Either specify them having a default or fixed value", diffSet) + } return nil } -func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) error { - - if err := validateLaunchSpec(lpSpec); err != nil { +func validateLaunchSpec(ctx context.Context, lpSpec *admin.LaunchPlanSpec, cmdCtx cmdCore.CommandContext) error { + if lpSpec == nil || lpSpec.WorkflowId == nil || lpSpec.EntityMetadata == nil || + lpSpec.EntityMetadata.Schedule == nil { + return nil + } + // Fetch the workflow spec using the identifier + workflowID := lpSpec.WorkflowId + wf, err := cmdCtx.AdminFetcherExt().FetchWorkflowVersion(ctx, workflowID.Name, workflowID.Version, + workflowID.Project, workflowID.Domain) + if err != nil { return err } + + return validateLPWithSchedule(lpSpec, wf) +} + +// Finds the left diff between to two string slices +// If a and b are two sets then the o/p c is defined as : +// c = a - a ^ b +// where ^ is intersection slice of a and b +// and - removes all the common elements and returns a new slice +// a= {1,2,3} +// b = {3,4,5} +// o/p c = {1,2} +func leftDiff(a, b []string) []string { + m := make(map[string]bool) + + for _, item := range a { + m[item] = true + } + + for _, item := range b { + delete(m, item) + } + // nil semantics on return + if len(m) == 0 { + return nil + } + c := make([]string, len(m)) + index := 0 + for item := range m { + c[index] = item + index++ + } + return c +} + +func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) error { assumableIamRole := len(configAssumableIamRole) > 0 k8sServiceAcct := len(configK8sServiceAccount) > 0 outputLocationPrefix := len(configOutputLocationPrefix) > 0 @@ -325,6 +380,18 @@ func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccoun return nil } +// Validate the spec before sending it to admin. +func validateSpec(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext) error { + switch v := message.(type) { + case *admin.LaunchPlan: + launchPlan := v + if err := validateLaunchSpec(ctx, launchPlan.Spec, cmdCtx); err != nil { + return err + } + } + return nil +} + func hydrateSpec(message proto.Message, sourceCode string, config rconfig.FilesConfig) error { switch v := message.(type) { case *admin.LaunchPlan: @@ -479,7 +546,11 @@ func registerFile(ctx context.Context, fileName, sourceCode string, registerResu } logger.Debugf(ctx, "Hydrated spec : %v", getJSONSpec(spec)) - + if err = validateSpec(ctx, spec, cmdCtx); err != nil { + registerResult = Result{Name: fileName, Status: "Failed", Info: fmt.Sprintf("Error hydrating spec due to %v", err)} + registerResults = append(registerResults, registerResult) + return registerResults, err + } if err := register(ctx, spec, cmdCtx, config.DryRun); err != nil { // If error is AlreadyExists then dont consider this to be an error but just a warning state if grpcError := status.Code(err); grpcError == codes.AlreadyExists { diff --git a/flytectl/cmd/register/register_util_test.go b/flytectl/cmd/register/register_util_test.go index a711495293..15a14a8887 100644 --- a/flytectl/cmd/register/register_util_test.go +++ b/flytectl/cmd/register/register_util_test.go @@ -1,11 +1,13 @@ package register import ( + "context" "errors" "fmt" "net/http" "os" "path/filepath" + "sort" "strings" "testing" @@ -13,7 +15,6 @@ import ( v1 "k8s.io/api/core/v1" - "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" @@ -224,6 +225,61 @@ func TestRegisterFile(t *testing.T) { assert.Equal(t, 1, len(results)) assert.Nil(t, err) }) + t.Run("Failed Scheduled launch plan registration", func(t *testing.T) { + setup() + registerFilesSetup() + mockAdminClient.OnCreateLaunchPlanMatch(mock.Anything, mock.Anything).Return(nil, nil) + variableMap := map[string]*core.Variable{ + "var1": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var1", + }, + "var2": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var2 long descriptions probably needs truncate", + }, + } + wf := &admin.Workflow{ + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: variableMap, + }, + }, + }, + }, + }, + }, + } + mockAdminClient.OnGetWorkflowMatch(mock.Anything, mock.Anything).Return(wf, nil) + args = []string{"testdata/152_my_cron_scheduled_lp_3.pb"} + var registerResults []Result + results, err := registerFile(ctx, args[0], "", registerResults, cmdCtx, *rconfig.DefaultFilesConfig) + assert.Equal(t, 1, len(results)) + assert.Equal(t, "Failed", results[0].Status) + assert.Equal(t, "Error hydrating spec due to param values are missing on scheduled workflow for"+ + " the following params [var1 var2]. Either specify them having a default or fixed value", results[0].Info) + assert.NotNil(t, err) + }) t.Run("Non existent file", func(t *testing.T) { setup() registerFilesSetup() @@ -313,100 +369,6 @@ func TestHydrateLaunchPlanSpec(t *testing.T) { assert.Nil(t, err) assert.Equal(t, &admin.RawOutputDataConfig{OutputLocationPrefix: "prefix"}, lpSpec.RawOutputDataConfig) }) - t.Run("Validation successful", func(t *testing.T) { - lpSpec := &admin.LaunchPlanSpec{ - EntityMetadata: &admin.LaunchPlanMetadata{ - Schedule: &admin.Schedule{ - ScheduleExpression: &admin.Schedule_CronExpression{ - CronExpression: "foo", - }, - KickoffTimeInputArg: "kickoff_time_arg", - }, - }, - FixedInputs: &core.LiteralMap{ - Literals: map[string]*core.Literal{}, - }, - } - err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec) - assert.Nil(t, err) - }) - t.Run("Validation failure", func(t *testing.T) { - lpSpec := &admin.LaunchPlanSpec{ - EntityMetadata: &admin.LaunchPlanMetadata{ - Schedule: &admin.Schedule{ - ScheduleExpression: &admin.Schedule_CronExpression{ - CronExpression: "expr", - }, - KickoffTimeInputArg: "kickoff_time_arg", - }, - }, - DefaultInputs: &core.ParameterMap{ - Parameters: map[string]*core.Parameter{ - "bar": { - Var: &core.Variable{ - Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, - }, - }, - }, - }, - FixedInputs: &core.LiteralMap{ - Literals: map[string]*core.Literal{}, - }, - } - err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec) - assert.NotNil(t, err) - }) - t.Run("Validation failed with fixed inputs empty", func(t *testing.T) { - lpSpec := &admin.LaunchPlanSpec{ - EntityMetadata: &admin.LaunchPlanMetadata{ - Schedule: &admin.Schedule{ - ScheduleExpression: &admin.Schedule_CronExpression{ - CronExpression: "expr", - }, - KickoffTimeInputArg: "kickoff_time_arg", - }, - }, - DefaultInputs: &core.ParameterMap{ - Parameters: map[string]*core.Parameter{ - "bar": { - Var: &core.Variable{ - Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, - }, - }, - }, - }, - } - err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec) - assert.NotNil(t, err) - }) - t.Run("Validation success with fixed", func(t *testing.T) { - lpSpec := &admin.LaunchPlanSpec{ - EntityMetadata: &admin.LaunchPlanMetadata{ - Schedule: &admin.Schedule{ - ScheduleExpression: &admin.Schedule_CronExpression{ - CronExpression: "expr", - }, - KickoffTimeInputArg: "kickoff_time_arg", - }, - }, - DefaultInputs: &core.ParameterMap{ - Parameters: map[string]*core.Parameter{ - "bar": { - Var: &core.Variable{ - Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, - }, - }, - }, - }, - FixedInputs: &core.LiteralMap{ - Literals: map[string]*core.Literal{ - "bar": coreutils.MustMakeLiteral("bar-value"), - }, - }, - } - err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec) - assert.Nil(t, err) - }) } func TestUploadFastRegisterArtifact(t *testing.T) { @@ -550,3 +512,293 @@ func TestHydrateTaskSpec(t *testing.T) { assert.Len(t, hydratedPodSpec.Containers[1].Args, 2) assert.True(t, strings.HasSuffix(hydratedPodSpec.Containers[1].Args[1], "sourcey")) } + +func TestLeftDiff(t *testing.T) { + t.Run("empty slices", func(t *testing.T) { + c := leftDiff(nil, nil) + assert.Empty(t, c) + }) + t.Run("right empty slice", func(t *testing.T) { + a := []string{"1", "2", "3"} + c := leftDiff(a, nil) + sort.Strings(a) + sort.Strings(c) + assert.Equal(t, a, c) + }) + t.Run("non empty slices without intersection", func(t *testing.T) { + a := []string{"1", "2", "3"} + b := []string{"5", "6", "7"} + c := leftDiff(a, b) + sort.Strings(a) + sort.Strings(c) + assert.Equal(t, a, c) + }) + t.Run("non empty slices with some intersection", func(t *testing.T) { + a := []string{"1", "2", "3"} + b := []string{"2", "5", "7"} + c := leftDiff(a, b) + expected := []string{"1", "3"} + sort.Strings(expected) + sort.Strings(c) + assert.Equal(t, expected, c) + }) + + t.Run("non empty slices with full intersection same order", func(t *testing.T) { + a := []string{"1", "2", "3"} + b := []string{"1", "2", "3"} + c := leftDiff(a, b) + var expected []string + sort.Strings(c) + assert.Equal(t, expected, c) + }) + + t.Run("non empty slices with full intersection diff order", func(t *testing.T) { + a := []string{"1", "2", "3"} + b := []string{"2", "3", "1"} + c := leftDiff(a, b) + var expected []string + sort.Strings(c) + assert.Equal(t, expected, c) + }) +} + +func TestValidateLaunchSpec(t *testing.T) { + ctx := context.Background() + t.Run("nil launchplan spec", func(t *testing.T) { + registerFilesSetup() + err := validateLaunchSpec(ctx, nil, cmdCtx) + assert.Nil(t, err) + }) + t.Run("launchplan spec with nil workflow id", func(t *testing.T) { + registerFilesSetup() + lpSpec := &admin.LaunchPlanSpec{} + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.Nil(t, err) + }) + t.Run("launchplan spec with empty metadata", func(t *testing.T) { + registerFilesSetup() + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + } + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.Nil(t, err) + }) + t.Run("launchplan spec with metadata and empty schedule", func(t *testing.T) { + registerFilesSetup() + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + EntityMetadata: &admin.LaunchPlanMetadata{}, + } + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.Nil(t, err) + }) + t.Run("validate spec failed to fetch workflow", func(t *testing.T) { + setup() + registerFilesSetup() + + mockAdminClient.OnGetWorkflowMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed")) + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + EntityMetadata: &admin.LaunchPlanMetadata{ + Schedule: &admin.Schedule{ + KickoffTimeInputArg: "kick_off_time_arg", + }, + }, + } + lp := &admin.LaunchPlan{ + Spec: lpSpec, + } + err := validateSpec(ctx, lp, cmdCtx) + assert.NotNil(t, err) + assert.Equal(t, "failed", err.Error()) + }) + t.Run("failed to fetch workflow", func(t *testing.T) { + setup() + registerFilesSetup() + + mockAdminClient.OnGetWorkflowMatch(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed")) + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + EntityMetadata: &admin.LaunchPlanMetadata{ + Schedule: &admin.Schedule{ + KickoffTimeInputArg: "kick_off_time_arg", + }, + }, + } + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.NotNil(t, err) + assert.Equal(t, "failed", err.Error()) + }) + t.Run("launchplan spec missing required param schedule", func(t *testing.T) { + setup() + registerFilesSetup() + variableMap := map[string]*core.Variable{ + "var1": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var1", + }, + "var2": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var2 long descriptions probably needs truncate", + }, + } + wf := &admin.Workflow{ + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: variableMap, + }, + }, + }, + }, + }, + }, + } + mockAdminClient.OnGetWorkflowMatch(mock.Anything, mock.Anything).Return(wf, nil) + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + EntityMetadata: &admin.LaunchPlanMetadata{ + Schedule: &admin.Schedule{ + KickoffTimeInputArg: "kick_off_time_arg", + }, + }, + } + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.NotNil(t, err) + assert.Equal(t, "param values are missing on scheduled workflow for the following params [var1 var2]."+ + " Either specify them having a default or fixed value", err.Error()) + }) + t.Run("launchplan spec non empty schedule required param success", func(t *testing.T) { + setup() + registerFilesSetup() + variableMap := map[string]*core.Variable{ + "var1": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var1", + }, + "var2": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + Description: "var2 long descriptions probably needs truncate", + }, + } + wf := &admin.Workflow{ + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: variableMap, + }, + }, + }, + }, + }, + }, + } + mockAdminClient.OnGetWorkflowMatch(mock.Anything, mock.Anything).Return(wf, nil) + lpSpec := &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Project: "projectValue", + Domain: "domainValue", + Name: "workflowNameValue", + Version: "workflowVersionValue", + }, + EntityMetadata: &admin.LaunchPlanMetadata{ + Schedule: &admin.Schedule{ + KickoffTimeInputArg: "kick_off_time_arg", + }, + }, + DefaultInputs: &core.ParameterMap{ + Parameters: map[string]*core.Parameter{ + "var1": { + Var: &core.Variable{ + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}, + }, + }, + }, + }, + }, + FixedInputs: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "var2": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_Integer{ + Integer: 10, + }, + }, + }, + }, + }, + }, + }, + }, + } + err := validateLaunchSpec(ctx, lpSpec, cmdCtx) + assert.Nil(t, err) + }) +} diff --git a/flytectl/cmd/register/testdata/152_my_cron_scheduled_lp_3.pb b/flytectl/cmd/register/testdata/152_my_cron_scheduled_lp_3.pb new file mode 100644 index 0000000000..9f56c79ae1 Binary files /dev/null and b/flytectl/cmd/register/testdata/152_my_cron_scheduled_lp_3.pb differ