Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Commit

Permalink
reimpl batching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
srinandan committed Aug 5, 2019
1 parent a7d6e56 commit 9a34227
Showing 1 changed file with 92 additions and 50 deletions.
142 changes: 92 additions & 50 deletions cmd/apis/impapis/impapis.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package impapis

import (
"fmt"
"net/url"
"os"
"path"
Expand All @@ -11,7 +10,6 @@ import (

"github.com/spf13/cobra"
"github.com/srinandan/apigeecli/cmd/shared"
"github.com/srinandan/apigeecli/cmd/types"
)

var Cmd = &cobra.Command{
Expand All @@ -37,7 +35,7 @@ func init() {
_ = Cmd.MarkFlagRequired("folder")
}

func createAsyncAPI(u *url.URL, bundlePath string, wg *sync.WaitGroup, errChan chan<- *types.ImportError) {
func createAsyncAPI(u *url.URL, bundlePath string, wg *sync.WaitGroup) {

defer wg.Done()
_, fileName := filepath.Split(bundlePath)
Expand All @@ -49,24 +47,38 @@ func createAsyncAPI(u *url.URL, bundlePath string, wg *sync.WaitGroup, errChan c
u.RawQuery = q.Encode()
err := shared.ReadBundle(bundlePath)
if err != nil {
errChan <- &types.ImportError{Err: err}
shared.Error.Fatalln(err)
return
}

_, err = shared.PostHttpOctet(true, u.String(), bundlePath)
if err != nil {
errChan <- &types.ImportError{Err: err}
shared.Error.Fatalln(err)
return
}

errChan <- &types.ImportError{Err: nil}
shared.Info.Printf("Completed entity: %s", u.String())
}

//batch creates a batch of proxies to import
func batch(u *url.URL, entities []string, pwg *sync.WaitGroup) {

defer pwg.Done()
//batch workgroup
var bwg sync.WaitGroup

bwg.Add(len(entities))

for _, entity := range entities {
go createAsyncAPI(u, entity, &bwg)
}
bwg.Wait()
}

func createAPIs(u *url.URL) error {

var errChan = make(chan *types.ImportError)
var wg sync.WaitGroup
var proxyBundles []string
var pwg sync.WaitGroup
var entities []string

u.Path = path.Join(u.Path, shared.RootArgs.Org, "apis")

Expand All @@ -77,64 +89,94 @@ func createAPIs(u *url.URL) error {
if filepath.Ext(path) != ".zip" {
return nil
}
proxyBundles = append(proxyBundles, path)
entities = append(entities, path)
return nil
})

if err != nil {
return err
}

numAPIs := len(proxyBundles)
shared.Info.Printf("Found %d bundles in the folder\n", numAPIs)
numEntities := len(entities)
shared.Info.Printf("Found %d proxy bundles in the folder\n", numEntities)
shared.Info.Printf("Create proxies with %d connections\n", conn)

if numAPIs < conn {
wg.Add(numAPIs)
for i := 0; i < numAPIs; i++ {
go createAsyncAPI(u, proxyBundles[i], &wg, errChan)
}
numOfLoops, remaining := numEntities/conn, numEntities%conn

go func() {
wg.Wait()
close(errChan)
}()

} else {
numOfLoops, remaining := numAPIs/conn, numAPIs%conn
for i := 0; i < numOfLoops; i++ {
shared.Info.Printf("Create %d batch of proxies\n", i)
wg.Add(conn)
for j := 0; j < conn; j++ {
go createAsyncAPI(u, proxyBundles[j+(i*conn)], &wg, errChan)
//ensure connections aren't greater than entities
if conn > numEntities {
conn = numEntities
}

start := 0

for i, end := 0, 0; i < numOfLoops; i++ {
pwg.Add(1)
end = (i * conn) + conn
shared.Info.Printf("Creating batch %d of bundles\n", (i + 1))
go batch(u, entities[start:end], &pwg)
start = end
pwg.Wait()
}

if remaining > 0 {
pwg.Add(1)
shared.Info.Printf("Creating remaining %d bundles\n", remaining)
go batch(u, entities[start:numEntities], &pwg)
pwg.Wait()
}

return nil
/*
shared.Info.Printf("Found %d bundles in the folder\n", numAPIs)
shared.Info.Printf("Create proxies with %d connections\n", conn)
if numAPIs < conn {
wg.Add(numAPIs)
for i := 0; i < numAPIs; i++ {
go createAsyncAPI(u, proxyBundles[i], &wg, errChan)
}
go func() {
wg.Wait()
close(errChan)
}()
}
wg.Add(remaining)
shared.Info.Printf("Create remaining %d proxies\n", remaining)
for i := (numAPIs - remaining); i < numAPIs; i++ {
go createAsyncAPI(u, proxyBundles[i], &wg, errChan)
} else {
numOfLoops, remaining := numAPIs/conn, numAPIs%conn
for i := 0; i < numOfLoops; i++ {
shared.Info.Printf("Create %d batch of proxies\n", i)
wg.Add(conn)
for j := 0; j < conn; j++ {
go createAsyncAPI(u, proxyBundles[j+(i*conn)], &wg, errChan)
}
go func() {
wg.Wait()
}()
}
wg.Add(remaining)
shared.Info.Printf("Create remaining %d proxies\n", remaining)
for i := (numAPIs - remaining); i < numAPIs; i++ {
go createAsyncAPI(u, proxyBundles[i], &wg, errChan)
}
go func() {
wg.Wait()
close(errChan)
}()
}
go func() {
wg.Wait()
close(errChan)
}()
}
//print any errors and return an err
var errs = false
for errAPIs := range errChan {
if errAPIs.Err != nil {
shared.Error.Fatalln(errAPIs.Err)
errs = true
//print any errors and return an err
var errs = false
for errAPIs := range errChan {
if errAPIs.Err != nil {
shared.Error.Fatalln(errAPIs.Err)
errs = true
}
}
}
if errs {
return fmt.Errorf("problem creating one of more products")
}
return nil
if errs {
return fmt.Errorf("problem creating one of more products")
}
return nil*/
}

0 comments on commit 9a34227

Please sign in to comment.