From c61fa47876de5d0c7aecf75a9b76405003a9df1a Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Wed, 7 Jun 2023 07:56:36 -0500 Subject: [PATCH] Not stripping structure from literal types (#571) * not stripping structure from literal types Signed-off-by: Daniel Rammer * fixed new lint issues Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- pkg/compiler/transformers/k8s/inputs.go | 4 ++-- pkg/compiler/transformers/k8s/utils.go | 4 +++- pkg/compiler/transformers/k8s/utils_test.go | 1 + pkg/compiler/transformers/k8s/workflow.go | 2 +- pkg/compiler/validators/branch.go | 8 ++++---- .../nodes/dynamic/dynamic_workflow.go | 20 +++++++++---------- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/pkg/compiler/transformers/k8s/inputs.go b/pkg/compiler/transformers/k8s/inputs.go index 1886f4906..fe365c7ff 100644 --- a/pkg/compiler/transformers/k8s/inputs.go +++ b/pkg/compiler/transformers/k8s/inputs.go @@ -11,12 +11,12 @@ import ( func validateInputs(nodeID common.NodeID, iface *core.TypedInterface, inputs core.LiteralMap, errs errors.CompileErrors) (ok bool) { if iface == nil { errs.Collect(errors.NewValueRequiredErr(nodeID, "interface")) - return + return false } if iface.Inputs == nil { errs.Collect(errors.NewValueRequiredErr(nodeID, "interface.InputsRef")) - return + return false } varMap := make(map[string]*core.Variable, len(iface.Inputs.Variables)) diff --git a/pkg/compiler/transformers/k8s/utils.go b/pkg/compiler/transformers/k8s/utils.go index 5f8a0f85a..9b4bd6382 100644 --- a/pkg/compiler/transformers/k8s/utils.go +++ b/pkg/compiler/transformers/k8s/utils.go @@ -86,8 +86,10 @@ func StripTypeMetadata(t *core.LiteralType) *core.LiteralType { c := *t c.Metadata = nil - c.Structure = nil c.Annotation = nil + // Note that we cannot strip `Structure` from the type because the dynamic node output type is used to validate the + // interface of the dynamically compiled workflow. `Structure` is used to extend type checking information on + // differnent Flyte types and is therefore required to ensure correct type validation. switch underlyingType := c.Type.(type) { case *core.LiteralType_UnionType: diff --git a/pkg/compiler/transformers/k8s/utils_test.go b/pkg/compiler/transformers/k8s/utils_test.go index f3f924c91..b30146105 100644 --- a/pkg/compiler/transformers/k8s/utils_test.go +++ b/pkg/compiler/transformers/k8s/utils_test.go @@ -240,6 +240,7 @@ func TestStripTypeMetadata(t *testing.T) { Type: &core.LiteralType_Simple{ Simple: core.SimpleType_STRING, }, + Structure: &core.TypeStructure{Tag: "str"}, }, }, }, diff --git a/pkg/compiler/transformers/k8s/workflow.go b/pkg/compiler/transformers/k8s/workflow.go index c06d34d81..2719db93a 100644 --- a/pkg/compiler/transformers/k8s/workflow.go +++ b/pkg/compiler/transformers/k8s/workflow.go @@ -88,7 +88,7 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas if n := wf.Template.GetFailureNode(); n != nil { nodes, ok := buildNodeSpec(n, tasks, errs.NewScope()) if !ok { - return + return nil, errs } failureN = nodes[0] } diff --git a/pkg/compiler/validators/branch.go b/pkg/compiler/validators/branch.go index 5d3849d8a..01bec3c45 100644 --- a/pkg/compiler/validators/branch.go +++ b/pkg/compiler/validators/branch.go @@ -14,22 +14,22 @@ func validateBranchInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs error if branch := node.GetBranchNode(); branch == nil { errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch")) - return + return nil, false } if ifBlock := node.GetBranchNode().IfElse; ifBlock == nil { errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse")) - return + return nil, false } if ifCase := node.GetBranchNode().IfElse.Case; ifCase == nil { errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse.Case")) - return + return nil, false } if thenNode := node.GetBranchNode().IfElse.Case.ThenNode; thenNode == nil { errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Branch.IfElse.Case.ThenNode")) - return + return nil, false } var outputs map[string]*flyte.Variable diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow.go b/pkg/controller/nodes/dynamic/dynamic_workflow.go index eb891aa27..88b2ff66c 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow.go @@ -196,9 +196,9 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "unable to read futures file, maybe corrupted") } - closure, dynamicWf, workflowContext, err := d.buildDynamicWorkflow(ctx, nCtx, djSpec, dynamicNodeStatus) + closure, dynamicWf, err := d.buildDynamicWorkflow(ctx, nCtx, djSpec, dynamicNodeStatus) if err != nil { - return workflowContext, err + return dynamicWorkflowContext{}, err } if err := f.Cache(ctx, dynamicWf, closure); err != nil { @@ -222,28 +222,28 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C } func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, - djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, dynamicWorkflowContext, error) { + djSpec *core.DynamicJobSpec, dynamicNodeStatus v1alpha1.ExecutableNodeStatus) (*core.CompiledWorkflowClosure, *v1alpha1.FlyteWorkflow, error) { wf, err := d.buildDynamicWorkflowTemplate(ctx, djSpec, nCtx, dynamicNodeStatus) if err != nil { - return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template") + return nil, nil, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build dynamic workflow template") } compiledTasks, err := compileTasks(ctx, djSpec.Tasks) if err != nil { - return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks") + return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "failed to compile dynamic tasks") } // Get the requirements, that is, a list of all the task IDs and the launch plan IDs that will be called as part of this dynamic task. // The definition of these will need to be fetched from Admin (in order to get the interface). requirements, err := compiler.GetRequirements(wf, djSpec.Subworkflows) if err != nil { - return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows") + return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "failed to Get requirements for subworkflows") } // This method handles user vs system errors internally launchPlanInterfaces, err := d.getLaunchPlanInterfaces(ctx, requirements.GetRequiredLaunchPlanIds()) if err != nil { - return nil, nil, dynamicWorkflowContext{}, err + return nil, nil, err } // TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec. @@ -253,15 +253,15 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflow(ctx context.Context, nC var closure *core.CompiledWorkflowClosure closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces) if err != nil { - return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow") + return nil, nil, errors.Wrapf(utils.ErrorCodeUser, err, "malformed dynamic workflow") } dynamicWf, err := k8s.BuildFlyteWorkflow(closure, &core.LiteralMap{}, nil, "") if err != nil { - return nil, nil, dynamicWorkflowContext{}, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow") + return nil, nil, errors.Wrapf(utils.ErrorCodeSystem, err, "failed to build workflow") } - return closure, dynamicWf, dynamicWorkflowContext{}, nil + return closure, dynamicWf, nil } func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup,