Skip to content

Commit

Permalink
Add default input default behavior check for scheduler workflow regis…
Browse files Browse the repository at this point in the history
…tration (flyteorg#368)
  • Loading branch information
pmahindrakar-oss authored and austin362667 committed May 7, 2024
1 parent 9872775 commit 7520331
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
4 changes: 2 additions & 2 deletions flytectl/cmd/register/register_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func validateLPWithSchedule(lpSpec *admin.LaunchPlanSpec, wf *admin.Workflow) er
var scheduleParamsWithValues []string
// Check for default values
if lpSpec.DefaultInputs != nil {
for paramName := range lpSpec.DefaultInputs.Parameters {
if paramName != schedule.KickoffTimeInputArg {
for paramName, paramValue := range lpSpec.DefaultInputs.Parameters {
if paramName != schedule.KickoffTimeInputArg && paramValue.GetDefault() != nil {
scheduleParamsWithValues = append(scheduleParamsWithValues, paramName)
}
}
Expand Down
79 changes: 78 additions & 1 deletion flytectl/cmd/register/register_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func TestValidateLaunchSpec(t *testing.T) {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "param values are missing on scheduled workflow for the following params")
})
t.Run("launchplan spec non empty schedule required param success", func(t *testing.T) {
t.Run("launchplan spec non empty schedule default param success", func(t *testing.T) {
s := setup()
registerFilesSetup()
variableMap := map[string]*core.Variable{
Expand Down Expand Up @@ -820,6 +820,21 @@ func TestValidateLaunchSpec(t *testing.T) {
Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER},
},
},
Behavior: &core.Parameter_Default{
Default: &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Primitive{
Primitive: &core.Primitive{
Value: &core.Primitive_Integer{
Integer: 10,
},
},
},
},
},
},
},
},
},
},
Expand All @@ -844,4 +859,66 @@ func TestValidateLaunchSpec(t *testing.T) {
err := validateLaunchSpec(ctx, lpSpec, s.CmdCtx)
assert.Nil(t, err)
})

t.Run("launchplan spec non empty schedule required param without value fail", func(t *testing.T) {
s := setup()
registerFilesSetup()
variableMap := map[string]*core.Variable{
"var1": {
Type: &core.LiteralType{
Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_INTEGER,
},
},
},
},
Description: "var1",
},
}
wf := &admin.Workflow{
Closure: &admin.WorkflowClosure{
CompiledWorkflow: &core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Interface: &core.TypedInterface{
Inputs: &core.VariableMap{
Variables: variableMap,
},
},
},
},
},
},
}
s.FetcherExt.OnFetchWorkflowVersionMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
lpSpec := &admin.LaunchPlanSpec{
WorkflowId: &core.Identifier{
Project: "projectValue",
Domain: "domainValue",
Name: "workflowNameValue",
Version: "workflowVersionValue",
},
EntityMetadata: &admin.LaunchPlanMetadata{
Schedule: &admin.Schedule{
KickoffTimeInputArg: "kick_off_time_arg",
},
},
DefaultInputs: &core.ParameterMap{
Parameters: map[string]*core.Parameter{
"var1": {
Var: &core.Variable{
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER},
},
},
},
},
},
}
err := validateLaunchSpec(ctx, lpSpec, s.CmdCtx)
assert.NotNil(t, err)
assert.Equal(t, fmt.Errorf("param values are missing on scheduled workflow for the following params [var1]. Either specify them having a default or fixed value"), err)
})
}

0 comments on commit 7520331

Please sign in to comment.