Skip to content

Commit

Permalink
Fixed validation logic for launch plan schedule registration (flyteor…
Browse files Browse the repository at this point in the history
  • Loading branch information
pmahindrakar-oss authored Feb 19, 2022
1 parent c62de8c commit e45aabf
Show file tree
Hide file tree
Showing 3 changed files with 442 additions and 119 deletions.
119 changes: 95 additions & 24 deletions cmd/register/register_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,39 +275,94 @@ func hydrateTaskSpec(task *admin.TaskSpec, sourceCode, sourceUploadPath, version
return nil
}

func validateLaunchSpec(lpSpec *admin.LaunchPlanSpec) error {
if lpSpec == nil {
return nil
}
if lpSpec.EntityMetadata != nil && lpSpec.EntityMetadata.Schedule != nil {
schedule := lpSpec.EntityMetadata.Schedule
var scheduleFixedParams []string
if lpSpec.DefaultInputs != nil {
for paramKey := range lpSpec.DefaultInputs.Parameters {
if paramKey != schedule.KickoffTimeInputArg {
scheduleFixedParams = append(scheduleFixedParams, paramKey)
}
func validateLPWithSchedule(lpSpec *admin.LaunchPlanSpec, wf *admin.Workflow) error {
schedule := lpSpec.EntityMetadata.Schedule
var scheduleRequiredParams []string
if wf != nil && wf.Closure != nil && wf.Closure.CompiledWorkflow != nil &&
wf.Closure.CompiledWorkflow.Primary != nil && wf.Closure.CompiledWorkflow.Primary.Template != nil &&
wf.Closure.CompiledWorkflow.Primary.Template.Interface != nil &&
wf.Closure.CompiledWorkflow.Primary.Template.Interface.Inputs != nil {
variables := wf.Closure.CompiledWorkflow.Primary.Template.Interface.Inputs.Variables
for varName := range variables {
if varName != schedule.KickoffTimeInputArg {
scheduleRequiredParams = append(scheduleRequiredParams, varName)
}
}
if (lpSpec.FixedInputs == nil && len(scheduleFixedParams) > 0) ||
(len(scheduleFixedParams) > len(lpSpec.FixedInputs.Literals)) {
fixedInputLen := 0
if lpSpec.FixedInputs != nil {
fixedInputLen = len(lpSpec.FixedInputs.Literals)

}
// Either the scheduled param should have default or fixed values
var scheduleParamsWithValues []string
// Check for default values
if lpSpec.DefaultInputs != nil {
for paramName := range lpSpec.DefaultInputs.Parameters {
if paramName != schedule.KickoffTimeInputArg {
scheduleParamsWithValues = append(scheduleParamsWithValues, paramName)
}
return fmt.Errorf("param values are missing on scheduled workflow."+
"additional args other than %v on scheduled workflow are %v > %v fixed values", schedule.KickoffTimeInputArg,
len(scheduleFixedParams), fixedInputLen)
}
}
// Check for fixed values
if lpSpec.FixedInputs != nil && lpSpec.FixedInputs.Literals != nil {
for fixedLiteralName := range lpSpec.FixedInputs.Literals {
scheduleParamsWithValues = append(scheduleParamsWithValues, fixedLiteralName)
}
}

diffSet := leftDiff(scheduleRequiredParams, scheduleParamsWithValues)
if len(diffSet) > 0 {
return fmt.Errorf("param values are missing on scheduled workflow "+
"for the following params %v. Either specify them having a default or fixed value", diffSet)
}
return nil
}

func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) error {

if err := validateLaunchSpec(lpSpec); err != nil {
func validateLaunchSpec(ctx context.Context, lpSpec *admin.LaunchPlanSpec, cmdCtx cmdCore.CommandContext) error {
if lpSpec == nil || lpSpec.WorkflowId == nil || lpSpec.EntityMetadata == nil ||
lpSpec.EntityMetadata.Schedule == nil {
return nil
}
// Fetch the workflow spec using the identifier
workflowID := lpSpec.WorkflowId
wf, err := cmdCtx.AdminFetcherExt().FetchWorkflowVersion(ctx, workflowID.Name, workflowID.Version,
workflowID.Project, workflowID.Domain)
if err != nil {
return err
}

return validateLPWithSchedule(lpSpec, wf)
}

// Finds the left diff between to two string slices
// If a and b are two sets then the o/p c is defined as :
// c = a - a ^ b
// where ^ is intersection slice of a and b
// and - removes all the common elements and returns a new slice
// a= {1,2,3}
// b = {3,4,5}
// o/p c = {1,2}
func leftDiff(a, b []string) []string {
m := make(map[string]bool)

for _, item := range a {
m[item] = true
}

for _, item := range b {
delete(m, item)
}
// nil semantics on return
if len(m) == 0 {
return nil
}
c := make([]string, len(m))
index := 0
for item := range m {
c[index] = item
index++
}
return c
}

func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) error {
assumableIamRole := len(configAssumableIamRole) > 0
k8sServiceAcct := len(configK8sServiceAccount) > 0
outputLocationPrefix := len(configOutputLocationPrefix) > 0
Expand All @@ -325,6 +380,18 @@ func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccoun
return nil
}

// Validate the spec before sending it to admin.
func validateSpec(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext) error {
switch v := message.(type) {
case *admin.LaunchPlan:
launchPlan := v
if err := validateLaunchSpec(ctx, launchPlan.Spec, cmdCtx); err != nil {
return err
}
}
return nil
}

func hydrateSpec(message proto.Message, sourceCode string, config rconfig.FilesConfig) error {
switch v := message.(type) {
case *admin.LaunchPlan:
Expand Down Expand Up @@ -479,7 +546,11 @@ func registerFile(ctx context.Context, fileName, sourceCode string, registerResu
}

logger.Debugf(ctx, "Hydrated spec : %v", getJSONSpec(spec))

if err = validateSpec(ctx, spec, cmdCtx); err != nil {
registerResult = Result{Name: fileName, Status: "Failed", Info: fmt.Sprintf("Error hydrating spec due to %v", err)}
registerResults = append(registerResults, registerResult)
return registerResults, err
}
if err := register(ctx, spec, cmdCtx, config.DryRun); err != nil {
// If error is AlreadyExists then dont consider this to be an error but just a warning state
if grpcError := status.Code(err); grpcError == codes.AlreadyExists {
Expand Down
Loading

0 comments on commit e45aabf

Please sign in to comment.