diff --git a/pkg/compiler/transformers/k8s/node.go b/pkg/compiler/transformers/k8s/node.go index aece64a9a..7331cb058 100644 --- a/pkg/compiler/transformers/k8s/node.go +++ b/pkg/compiler/transformers/k8s/node.go @@ -210,7 +210,7 @@ func buildNodes(nodes []*core.Node, tasks []*core.CompiledTask, errs errors.Comp return res, !errs.HasErrors() } -func buildTasks(tasks []*core.CompiledTask, errs errors.CompileErrors) map[common.TaskIDKey]*v1alpha1.TaskSpec { +func BuildTasks(tasks []*core.CompiledTask, errs errors.CompileErrors) map[common.TaskIDKey]*v1alpha1.TaskSpec { res := make(map[common.TaskIDKey]*v1alpha1.TaskSpec, len(tasks)) for _, flyteTask := range tasks { if flyteTask == nil { diff --git a/pkg/compiler/transformers/k8s/node_test.go b/pkg/compiler/transformers/k8s/node_test.go index a5b7ef275..d2eefa472 100644 --- a/pkg/compiler/transformers/k8s/node_test.go +++ b/pkg/compiler/transformers/k8s/node_test.go @@ -263,7 +263,7 @@ func TestBuildTasks(t *testing.T) { errs := errors.NewCompileErrors() t.Run("Tasks with annotations", func(t *testing.T) { - taskMap := buildTasks(tasks, errs) + taskMap := BuildTasks(tasks, errs) annInputTask := taskMap[(&core.Identifier{Name: "annotatedInput"}).String()] assert.Nil(t, annInputTask.Interface.Inputs.Variables["a"].Type.Annotation) diff --git a/pkg/compiler/transformers/k8s/workflow.go b/pkg/compiler/transformers/k8s/workflow.go index 32396ee41..167db9e97 100644 --- a/pkg/compiler/transformers/k8s/workflow.go +++ b/pkg/compiler/transformers/k8s/workflow.go @@ -80,7 +80,7 @@ func WorkflowNameFromID(id string) string { return tokens[2] } -func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) ( +func BuildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) ( spec *v1alpha1.WorkflowSpec, err error) { wf.Template.Interface = StripInterfaceTypeMetadata(wf.Template.Interface) @@ -172,7 +172,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li t.Template.Interface = StripInterfaceTypeMetadata(t.Template.Interface) } - primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) + primarySpec, err := BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) return nil, errs @@ -180,7 +180,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows)) for _, subWf := range wfClosure.SubWorkflows { - spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) + spec, err := BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) } else { @@ -221,7 +221,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li Inputs: &v1alpha1.Inputs{LiteralMap: inputs}, WorkflowSpec: primarySpec, SubWorkflows: subwfs, - Tasks: buildTasks(tasks, errs.NewScope()), + Tasks: BuildTasks(tasks, errs.NewScope()), NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, } @@ -269,7 +269,7 @@ func MergeWorkflowClosure(flyteWf *v1alpha1.FlyteWorkflow, wfClosure *core.Compi return errs } - primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) + primarySpec, err := BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) return errs @@ -277,7 +277,7 @@ func MergeWorkflowClosure(flyteWf *v1alpha1.FlyteWorkflow, wfClosure *core.Compi subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows)) for _, subWf := range wfClosure.SubWorkflows { - spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) + spec, err := BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) } else { @@ -289,7 +289,7 @@ func MergeWorkflowClosure(flyteWf *v1alpha1.FlyteWorkflow, wfClosure *core.Compi return errs } - tasks := buildTasks(wfClosure.Tasks, errs.NewScope()) + tasks := BuildTasks(wfClosure.Tasks, errs.NewScope()) flyteWf.WorkflowSpec = primarySpec flyteWf.SubWorkflows = subwfs diff --git a/pkg/controller/crdfieldsstore/passthrough.go b/pkg/controller/crdfieldsstore/passthrough.go index c682843a3..a617f6f59 100644 --- a/pkg/controller/crdfieldsstore/passthrough.go +++ b/pkg/controller/crdfieldsstore/passthrough.go @@ -2,7 +2,10 @@ package crdfieldsstore import ( "context" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytepropeller/pkg/compiler/errors" + "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytestdlib/storage" @@ -17,14 +20,44 @@ func (p *passthroughWorkflowClosureStore) Get(ctx context.Context, dataReference return nil, ErrLocationEmpty } - workflowClosure := &core.CompiledWorkflowClosure{} - err := p.dataStore.ReadProtobuf(ctx, dataReference, workflowClosure) + wfClosure := &core.CompiledWorkflowClosure{} + err := p.dataStore.ReadProtobuf(ctx, dataReference, wfClosure) if err != nil { return nil, err } - // FIXME transform workflowClosure to crd fields - wfClosureCrdFields := &WfClosureCrdFields{} + errs := errors.NewCompileErrors() + if wfClosure == nil { + errs.Collect(errors.NewValueRequiredErr("root", "wfClosure")) + return nil, errs + } + + primarySpec, err := k8s.BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) + if err != nil { + errs.Collect(errors.NewWorkflowBuildError(err)) + return nil, errs + } + + for _, t := range wfClosure.Tasks { + t.Template.Interface = k8s.StripInterfaceTypeMetadata(t.Template.Interface) + } + tasks := k8s.BuildTasks(wfClosure.Tasks, errs.NewScope()) + + subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows)) + for _, subWf := range wfClosure.SubWorkflows { + spec, err := k8s.BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) + if err != nil { + errs.Collect(errors.NewWorkflowBuildError(err)) + } else { + subwfs[subWf.Template.Id.String()] = spec + } + } + + wfClosureCrdFields := &WfClosureCrdFields{ + WorkflowSpec: primarySpec, + SubWorkflows: subwfs, + Tasks: tasks, + } return wfClosureCrdFields, nil }