From 1eda51e57a45b66e708e0854cb778ba77ae8a9b1 Mon Sep 17 00:00:00 2001 From: Honnix Date: Mon, 23 Oct 2023 22:25:58 +0200 Subject: [PATCH] Use proto file name to infer message type (#436) Signed-off-by: Hongxin Liang --- cmd/register/register_util.go | 75 ++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/cmd/register/register_util.go b/cmd/register/register_util.go index 87d8be43db..18ecede8ba 100644 --- a/cmd/register/register_util.go +++ b/cmd/register/register_util.go @@ -92,37 +92,74 @@ var projectColumns = []printer.Column{ {Header: "Additional Info", JSONPath: "$.Info"}, } +// Regex to match file name like xxx_1.pb, xxx_2.pb, or xxx_3.pb, and the subgroup catches the number 1, 2 or 3 +// This is used to match proto files created by pyflyte, where xxx_1.pb is a task spec, xxx_2.pb is a workflow spec, and xxx_3.pb is launch plan +var fnameRegex = regexp.MustCompile(`^.*_(?P[1-3])\.pb$`) + +type unMarshalFunc = func(ctx context.Context, fileContents []byte, fname string, errCollection errors2.ErrorCollection) (proto.Message, error) + +// Order matters here +var unMarshalFuncs = []unMarshalFunc{ + unMarshalTask, + unMarshalWorkflow, + unMarshalLaunchPlan, +} + func UnMarshalContents(ctx context.Context, fileContents []byte, fname string) (proto.Message, error) { - workflowSpec := &admin.WorkflowSpec{} errCollection := errors2.ErrorCollection{} - err := proto.Unmarshal(fileContents, workflowSpec) - if err == nil { - return workflowSpec, nil + + for _, f := range reorderUnMarshalFuncs(fname) { + if m, err := f(ctx, fileContents, fname, errCollection); err == nil { + return m, nil + } } - errCollection.Append(fmt.Errorf("as a Workflow: %w", err)) + return nil, fmt.Errorf("failed unmarshalling file %v. Errors: %w", fname, errCollection.ErrorOrDefault()) +} - logger.Debugf(ctx, "Failed to unmarshal file %v for workflow type", fname) - taskSpec := &admin.TaskSpec{} - err = proto.Unmarshal(fileContents, taskSpec) - if err == nil { - return taskSpec, nil - } +func unMarshalTask(ctx context.Context, fileContents []byte, fname string, errCollection errors2.ErrorCollection) (proto.Message, error) { + return unMarshal(ctx, fileContents, fname, errCollection, "Task", "task", &admin.TaskSpec{}) +} - errCollection.Append(fmt.Errorf("as a Task: %w", err)) +func unMarshalWorkflow(ctx context.Context, fileContents []byte, fname string, errCollection errors2.ErrorCollection) (proto.Message, error) { + return unMarshal(ctx, fileContents, fname, errCollection, "Workflow", "workflow", &admin.WorkflowSpec{}) +} + +func unMarshalLaunchPlan(ctx context.Context, fileContents []byte, fname string, errCollection errors2.ErrorCollection) (proto.Message, error) { + return unMarshal(ctx, fileContents, fname, errCollection, "Launchplan", "launch plan", &admin.LaunchPlan{}) +} - logger.Debugf(ctx, "Failed to unmarshal file %v for task type", fname) - launchPlan := &admin.LaunchPlan{} - err = proto.Unmarshal(fileContents, launchPlan) +func unMarshal(ctx context.Context, fileContents []byte, fname string, errCollection errors2.ErrorCollection, tpe string, typeAlt string, m proto.Message) (proto.Message, error) { + err := proto.Unmarshal(fileContents, m) if err == nil { - return launchPlan, nil + return m, nil } - errCollection.Append(fmt.Errorf("as a Launchplan: %w", err)) + errCollection.Append(fmt.Errorf("as a %s type: %w", tpe, err)) + logger.Debugf(ctx, "Failed to unmarshal file %s for %v type", fname, typeAlt) + return nil, err +} - logger.Debugf(ctx, "Failed to unmarshal file %v for launch plan type", fname) - return nil, fmt.Errorf("failed unmarshalling file %v. Errors: %w", fname, errCollection.ErrorOrDefault()) +func reorderUnMarshalFuncs(fname string) []unMarshalFunc { + if match := fnameRegex.FindStringSubmatch(fname); match != nil { + indexStr := match[fnameRegex.SubexpIndex("index")] + index, err := strconv.Atoi(indexStr) + if err != nil { + panic(fmt.Sprintf("unexpected error when coverting [%s] to int, file name [%s]", indexStr, fname)) + } + + var reordered []unMarshalFunc + for i, f := range unMarshalFuncs { + if i == index-1 { + reordered = append([]unMarshalFunc{f}, reordered...) + } else { + reordered = append(reordered, f) + } + } + return reordered + } + return unMarshalFuncs } func register(ctx context.Context, message proto.Message, cmdCtx cmdCore.CommandContext, dryRun, enableSchedule bool) error {