Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
transform wfClosure to crd fields
Browse files Browse the repository at this point in the history
Signed-off-by: Babis Kiosidis <[email protected]>
  • Loading branch information
ckiosidis committed Aug 9, 2022
1 parent b85ea6a commit f7a0c13
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func buildNodes(nodes []*core.Node, tasks []*core.CompiledTask, errs errors.Comp
return res, !errs.HasErrors()
}

func buildTasks(tasks []*core.CompiledTask, errs errors.CompileErrors) map[common.TaskIDKey]*v1alpha1.TaskSpec {
func BuildTasks(tasks []*core.CompiledTask, errs errors.CompileErrors) map[common.TaskIDKey]*v1alpha1.TaskSpec {
res := make(map[common.TaskIDKey]*v1alpha1.TaskSpec, len(tasks))
for _, flyteTask := range tasks {
if flyteTask == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/transformers/k8s/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestBuildTasks(t *testing.T) {
errs := errors.NewCompileErrors()

t.Run("Tasks with annotations", func(t *testing.T) {
taskMap := buildTasks(tasks, errs)
taskMap := BuildTasks(tasks, errs)

annInputTask := taskMap[(&core.Identifier{Name: "annotatedInput"}).String()]
assert.Nil(t, annInputTask.Interface.Inputs.Variables["a"].Type.Annotation)
Expand Down
14 changes: 7 additions & 7 deletions pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func WorkflowNameFromID(id string) string {
return tokens[2]
}

func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) (
func BuildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) (
spec *v1alpha1.WorkflowSpec, err error) {
wf.Template.Interface = StripInterfaceTypeMetadata(wf.Template.Interface)

Expand Down Expand Up @@ -172,15 +172,15 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
t.Template.Interface = StripInterfaceTypeMetadata(t.Template.Interface)
}

primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
primarySpec, err := BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
return nil, errs
}

subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows))
for _, subWf := range wfClosure.SubWorkflows {
spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
spec, err := BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
} else {
Expand Down Expand Up @@ -221,7 +221,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
Inputs: &v1alpha1.Inputs{LiteralMap: inputs},
WorkflowSpec: primarySpec,
SubWorkflows: subwfs,
Tasks: buildTasks(tasks, errs.NewScope()),
Tasks: BuildTasks(tasks, errs.NewScope()),
NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible},
}

Expand Down Expand Up @@ -269,15 +269,15 @@ func MergeWorkflowClosure(flyteWf *v1alpha1.FlyteWorkflow, wfClosure *core.Compi
return errs
}

primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
primarySpec, err := BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
return errs
}

subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows))
for _, subWf := range wfClosure.SubWorkflows {
spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
spec, err := BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
} else {
Expand All @@ -289,7 +289,7 @@ func MergeWorkflowClosure(flyteWf *v1alpha1.FlyteWorkflow, wfClosure *core.Compi
return errs
}

tasks := buildTasks(wfClosure.Tasks, errs.NewScope())
tasks := BuildTasks(wfClosure.Tasks, errs.NewScope())

flyteWf.WorkflowSpec = primarySpec
flyteWf.SubWorkflows = subwfs
Expand Down
41 changes: 37 additions & 4 deletions pkg/controller/crdfieldsstore/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package crdfieldsstore

import (
"context"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytestdlib/storage"
Expand All @@ -17,14 +20,44 @@ func (p *passthroughWorkflowClosureStore) Get(ctx context.Context, dataReference
return nil, ErrLocationEmpty
}

workflowClosure := &core.CompiledWorkflowClosure{}
err := p.dataStore.ReadProtobuf(ctx, dataReference, workflowClosure)
wfClosure := &core.CompiledWorkflowClosure{}
err := p.dataStore.ReadProtobuf(ctx, dataReference, wfClosure)
if err != nil {
return nil, err
}

// FIXME transform workflowClosure to crd fields
wfClosureCrdFields := &WfClosureCrdFields{}
errs := errors.NewCompileErrors()
if wfClosure == nil {
errs.Collect(errors.NewValueRequiredErr("root", "wfClosure"))
return nil, errs
}

primarySpec, err := k8s.BuildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
return nil, errs
}

for _, t := range wfClosure.Tasks {
t.Template.Interface = k8s.StripInterfaceTypeMetadata(t.Template.Interface)
}
tasks := k8s.BuildTasks(wfClosure.Tasks, errs.NewScope())

subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows))
for _, subWf := range wfClosure.SubWorkflows {
spec, err := k8s.BuildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope())
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
} else {
subwfs[subWf.Template.Id.String()] = spec
}
}

wfClosureCrdFields := &WfClosureCrdFields{
WorkflowSpec: primarySpec,
SubWorkflows: subwfs,
Tasks: tasks,
}
return wfClosureCrdFields, nil
}

Expand Down

0 comments on commit f7a0c13

Please sign in to comment.