Skip to content

Commit

Permalink
Fixed the version bug and also added force flag (#246)
Browse files Browse the repository at this point in the history
Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Dec 22, 2021
1 parent 2d93fcf commit 51226d5
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 24 deletions.
3 changes: 2 additions & 1 deletion flytectl/cmd/config/subcommand/register/files_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ var (

// FilesConfig containing flags used for registration
type FilesConfig struct {
Version string `json:"version" pflag:",version of the entity to be registered with flyte."`
Version string `json:"version" pflag:",version of the entity to be registered with flyte which are un-versioned after serialization."`
Force bool `json:"force" pflag:",force use of version number on entities registered with flyte."`
ContinueOnError bool `json:"continueOnError" pflag:",continue on error when registering files."`
Archive bool `json:"archive" pflag:",pass in archive file either an http link or local path."`
AssumableIamRole string `json:"assumableIamRole" pflag:", custom assumable iam auth role to register launch plans with."`
Expand Down
3 changes: 2 additions & 1 deletion flytectl/cmd/config/subcommand/register/filesconfig_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytectl/cmd/config/subcommand/register/filesconfig_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 21 additions & 20 deletions flytectl/cmd/register/register_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func unMarshalContents(ctx context.Context, fileContents []byte, fname string) (

}

func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext, dryRun bool, version string) error {
func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext, dryRun bool) error {
switch v := message.(type) {
case *admin.LaunchPlan:
launchPlan := message.(*admin.LaunchPlan)
Expand All @@ -112,7 +112,7 @@ func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.Command
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: launchPlan.Id.Name,
Version: version,
Version: launchPlan.Id.Version,
},
Spec: launchPlan.Spec,
})
Expand All @@ -130,7 +130,7 @@ func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.Command
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: workflowSpec.Template.Id.Name,
Version: version,
Version: workflowSpec.Template.Id.Version,
},
Spec: workflowSpec,
})
Expand All @@ -148,7 +148,7 @@ func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.Command
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: taskSpec.Template.Id.Name,
Version: version,
Version: taskSpec.Template.Id.Version,
},
Spec: taskSpec,
})
Expand All @@ -158,41 +158,41 @@ func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.Command
}
}

func hydrateNode(node *core.Node, version string) error {
func hydrateNode(node *core.Node, version string, force bool) error {
targetNode := node.Target
switch v := targetNode.(type) {
case *core.Node_TaskNode:
taskNodeWrapper := targetNode.(*core.Node_TaskNode)
taskNodeReference := taskNodeWrapper.TaskNode.Reference.(*core.TaskNode_ReferenceId)
hydrateIdentifier(taskNodeReference.ReferenceId, version)
hydrateIdentifier(taskNodeReference.ReferenceId, version, force)
case *core.Node_WorkflowNode:
workflowNodeWrapper := targetNode.(*core.Node_WorkflowNode)
switch workflowNodeWrapper.WorkflowNode.Reference.(type) {
case *core.WorkflowNode_SubWorkflowRef:
subWorkflowNodeReference := workflowNodeWrapper.WorkflowNode.Reference.(*core.WorkflowNode_SubWorkflowRef)
hydrateIdentifier(subWorkflowNodeReference.SubWorkflowRef, version)
hydrateIdentifier(subWorkflowNodeReference.SubWorkflowRef, version, force)
case *core.WorkflowNode_LaunchplanRef:
launchPlanNodeReference := workflowNodeWrapper.WorkflowNode.Reference.(*core.WorkflowNode_LaunchplanRef)
hydrateIdentifier(launchPlanNodeReference.LaunchplanRef, version)
hydrateIdentifier(launchPlanNodeReference.LaunchplanRef, version, force)
default:
return fmt.Errorf("unknown type %T", workflowNodeWrapper.WorkflowNode.Reference)
}
case *core.Node_BranchNode:
branchNodeWrapper := targetNode.(*core.Node_BranchNode)
if err := hydrateNode(branchNodeWrapper.BranchNode.IfElse.Case.ThenNode, version); err != nil {
if err := hydrateNode(branchNodeWrapper.BranchNode.IfElse.Case.ThenNode, version, force); err != nil {
return fmt.Errorf("failed to hydrateNode")
}
if len(branchNodeWrapper.BranchNode.IfElse.Other) > 0 {
for _, ifBlock := range branchNodeWrapper.BranchNode.IfElse.Other {
if err := hydrateNode(ifBlock.ThenNode, version); err != nil {
if err := hydrateNode(ifBlock.ThenNode, version, force); err != nil {
return fmt.Errorf("failed to hydrateNode")
}
}
}
switch branchNodeWrapper.BranchNode.IfElse.Default.(type) {
case *core.IfElseBlock_ElseNode:
elseNodeReference := branchNodeWrapper.BranchNode.IfElse.Default.(*core.IfElseBlock_ElseNode)
if err := hydrateNode(elseNodeReference.ElseNode, version); err != nil {
if err := hydrateNode(elseNodeReference.ElseNode, version, force); err != nil {
return fmt.Errorf("failed to hydrateNode")
}

Expand All @@ -207,14 +207,14 @@ func hydrateNode(node *core.Node, version string) error {
return nil
}

func hydrateIdentifier(identifier *core.Identifier, version string) {
func hydrateIdentifier(identifier *core.Identifier, version string, force bool) {
if identifier.Project == "" || identifier.Project == registrationProjectPattern {
identifier.Project = config.GetConfig().Project
}
if identifier.Domain == "" || identifier.Domain == registrationDomainPattern {
identifier.Domain = config.GetConfig().Domain
}
if identifier.Version == "" || identifier.Version == registrationVersionPattern {
if force || identifier.Version == "" || identifier.Version == registrationVersionPattern {
identifier.Version = version
}
}
Expand Down Expand Up @@ -315,29 +315,30 @@ func hydrateSpec(message proto.Message, sourceCode string, config rconfig.FilesC
switch v := message.(type) {
case *admin.LaunchPlan:
launchPlan := message.(*admin.LaunchPlan)
hydrateIdentifier(launchPlan.Spec.WorkflowId, config.Version)
hydrateIdentifier(launchPlan.Id, config.Version, config.Force)
hydrateIdentifier(launchPlan.Spec.WorkflowId, config.Version, config.Force)
if err := hydrateLaunchPlanSpec(config.AssumableIamRole, config.K8sServiceAccount, config.OutputLocationPrefix, launchPlan.Spec); err != nil {
return err
}
case *admin.WorkflowSpec:
workflowSpec := message.(*admin.WorkflowSpec)
for _, Noderef := range workflowSpec.Template.Nodes {
if err := hydrateNode(Noderef, config.Version); err != nil {
if err := hydrateNode(Noderef, config.Version, config.Force); err != nil {
return err
}
}
hydrateIdentifier(workflowSpec.Template.Id, config.Version)
hydrateIdentifier(workflowSpec.Template.Id, config.Version, config.Force)
for _, subWorkflow := range workflowSpec.SubWorkflows {
for _, Noderef := range subWorkflow.Nodes {
if err := hydrateNode(Noderef, config.Version); err != nil {
if err := hydrateNode(Noderef, config.Version, config.Force); err != nil {
return err
}
}
hydrateIdentifier(subWorkflow.Id, config.Version)
hydrateIdentifier(subWorkflow.Id, config.Version, config.Force)
}
case *admin.TaskSpec:
taskSpec := message.(*admin.TaskSpec)
hydrateIdentifier(taskSpec.Template.Id, config.Version)
hydrateIdentifier(taskSpec.Template.Id, config.Version, config.Force)
// In case of fast serialize input proto also have on additional variable to substitute i.e destination bucket for source code
if err := hydrateTaskSpec(taskSpec, sourceCode, config.SourceUploadPath, config.Version); err != nil {
return err
Expand Down Expand Up @@ -465,7 +466,7 @@ func registerFile(ctx context.Context, fileName, sourceCode string, registerResu

logger.Debugf(ctx, "Hydrated spec : %v", getJSONSpec(spec))

if err := register(ctx, spec, cmdCtx, config.DryRun, config.Version); err != nil {
if err := register(ctx, spec, cmdCtx, config.DryRun); err != nil {
// If error is AlreadyExists then dont consider this to be an error but just a warning state
if grpcError := status.Code(err); grpcError == codes.AlreadyExists {
registerResult = Result{Name: fileName, Status: "Success", Info: fmt.Sprintf("%v", grpcError.String())}
Expand Down
4 changes: 2 additions & 2 deletions flytectl/cmd/register/register_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func TestRegister(t *testing.T) {
setup()
registerFilesSetup()
node := &admin.NodeExecution{}
err := register(ctx, node, cmdCtx, rconfig.DefaultFilesConfig.DryRun, rconfig.DefaultFilesConfig.Version)
err := register(ctx, node, cmdCtx, rconfig.DefaultFilesConfig.DryRun)
assert.NotNil(t, err)
})
}
Expand All @@ -488,7 +488,7 @@ func TestHydrateNode(t *testing.T) {
setup()
registerFilesSetup()
node := &core.Node{}
err := hydrateNode(node, rconfig.DefaultFilesConfig.Version)
err := hydrateNode(node, rconfig.DefaultFilesConfig.Version, true)
assert.NotNil(t, err)
})

Expand Down

0 comments on commit 51226d5

Please sign in to comment.