Skip to content

Commit

Permalink
fix: recursive export
Browse files Browse the repository at this point in the history
  • Loading branch information
Duologic committed Jan 9, 2021
1 parent cbec6fa commit c15952a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
14 changes: 13 additions & 1 deletion cmd/tk/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func exportCmd() *cli.Command {
}

for _, env := range envs {
paths = append(paths, filepath.Join(rootDir, env.Metadata.Namespace))
path := filepath.Join(rootDir, env.Metadata.Namespace)
if !hasPath(paths, path) {
paths = append(paths, path)
}
}
continue
}
Expand All @@ -89,3 +92,12 @@ func exportCmd() *cli.Command {
}
return cmd
}

func hasPath(paths []string, path string) bool {
for _, p := range paths {
if p == path {
return true
}
}
return false
}
28 changes: 19 additions & 9 deletions pkg/tanka/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tanka

import (
"fmt"
"sync"

"k8s.io/apimachinery/pkg/labels"

Expand All @@ -11,44 +12,52 @@ import (
const defaultParallelism = 8

type parallelOpts struct {
JsonnetOpts JsonnetOpts
JsonnetOpts
Selector labels.Selector
Parallelism int
}

// parallelLoadEnvironments evaluates multiple environments in parallel
func parallelLoadEnvironments(paths []string, opts parallelOpts) ([]*v1alpha1.Environment, error) {
wg := sync.WaitGroup{}
jobsCh := make(chan parallelJob)
outCh := make(chan parallelOut)

if opts.Parallelism <= 0 {
opts.Parallelism = defaultParallelism
}

for i := 0; i < opts.Parallelism; i++ {
go parallelWorker(jobsCh, outCh)
wg.Add(1)
go func() {
defer wg.Done()
parallelWorker(jobsCh)
}()
}

var results []*parallelOut
for _, path := range paths {
out := &parallelOut{}
results = append(results, out)
jobsCh <- parallelJob{
path: path,
opts: Opts{JsonnetOpts: opts.JsonnetOpts},
out: out,
}
}
close(jobsCh)

var envs []*v1alpha1.Environment
var errors []error
for i := 0; i < len(paths); i++ {
out := <-outCh
for _, out := range results {
if out.err != nil {
errors = append(errors, out.err)
continue
}
if opts.Selector == nil || opts.Selector.Empty() || opts.Selector.Matches(out.env.Metadata) {
envs = append(envs, out.env)
envs = append(envs, &out.env)
}
}
wg.Wait()

if len(errors) != 0 {
return envs, ErrParallel{errors: errors}
Expand All @@ -60,19 +69,20 @@ func parallelLoadEnvironments(paths []string, opts parallelOpts) ([]*v1alpha1.En
type parallelJob struct {
path string
opts Opts
out *parallelOut
}

type parallelOut struct {
env *v1alpha1.Environment
env v1alpha1.Environment
err error
}

func parallelWorker(jobsCh <-chan parallelJob, outCh chan parallelOut) {
func parallelWorker(jobsCh <-chan parallelJob) {
for job := range jobsCh {
env, err := LoadEnvironment(job.path, job.opts)
if err != nil {
err = fmt.Errorf("%s:\n %w", job.path, err)
}
outCh <- parallelOut{env: env, err: err}
*job.out = parallelOut{*env, err}
}
}

0 comments on commit c15952a

Please sign in to comment.