From b48f94cc36568ce7c624409c2aa40fb587b0afde Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Wed, 7 Feb 2018 21:53:06 -0600 Subject: [PATCH] implement concurrent imports with errgroup --- client.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index 6a87d54..a9c29e0 100644 --- a/client.go +++ b/client.go @@ -48,6 +48,7 @@ import ( "github.com/golang/protobuf/proto" pbuf "github.com/pilosa/go-pilosa/gopilosa_pbuf" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" ) const maxHosts = 10 @@ -528,21 +529,25 @@ func (c *Client) importBits(indexName string, frameName string, slice uint64, bi sort.Sort(bitsForSort(bits)) nodes, err := c.fetchFragmentNodes(indexName, slice) if err != nil { - return err + return errors.Wrap(err, "fetching fragment nodes") } + + eg := errgroup.Group{} for _, node := range nodes { uri, err := NewURIFromAddress(node.Host) if err != nil { - return err + return errors.Wrap(err, "getting uri from address") } - uri.SetScheme(node.Scheme) - err = c.importNode(uri, bitsToImportRequest(indexName, frameName, slice, bits)) + err = uri.SetScheme(node.Scheme) if err != nil { - return err + return errors.Wrap(err, "setting scheme on uri") } + eg.Go(func() error { + return c.importNode(uri, bitsToImportRequest(indexName, frameName, slice, bits)) + }) } - - return nil + err = eg.Wait() + return errors.Wrap(err, "importing to nodes") } func (c *Client) importBitsK(indexName string, frameName string, bits []Bit) error {