Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
Merge pull request #253 from jaffee/batch-ingest
Browse files Browse the repository at this point in the history
Batch ingest
  • Loading branch information
tgruben authored Oct 11, 2019
2 parents b745504 + d00044f commit 29aaccb
Show file tree
Hide file tree
Showing 12 changed files with 2,076 additions and 78 deletions.
14 changes: 7 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
defaults: &defaults
working_directory: /go/src/github.com/pilosa/go-pilosa
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
fast-checkout: &fast-checkout
Expand Down Expand Up @@ -30,18 +30,18 @@ jobs:
- *fast-checkout
- run: make install-gometalinter
- run: make gometalinter
test-golang-1.12-rc: &base-test
test-golang-1.13: &base-test
<<: *defaults
steps:
- *fast-checkout
- run: make test-all
docker:
- image: circleci/golang:1.12-rc
- image: circleci/golang:1.13
- image: pilosa/pilosa:master
test-golang-1.11:
test-golang-1.12:
<<: *base-test
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.12
- image: pilosa/pilosa:master
workflows:
version: 2
Expand All @@ -51,9 +51,9 @@ workflows:
- linter:
requires:
- build
- test-golang-1.12-rc:
- test-golang-1.13:
requires:
- build
- test-golang-1.11:
- test-golang-1.12:
requires:
- build
176 changes: 166 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,81 @@ type Client struct {

importLogEncoder encoder
logLock sync.Mutex

shardNodes shardNodes
tick *time.Ticker
done chan struct{}
}

func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
return uris, nil
}
fragmentNodes, err := c.fetchFragmentNodes(index, shard)
if err != nil {
return nil, errors.Wrap(err, "trying to look up nodes for shard")
}
uris = make([]*URI, 0, len(fragmentNodes))
for _, fn := range fragmentNodes {
uris = append(uris, fn.URI())
}
c.shardNodes.Put(index, shard, uris)
return uris, nil
}

func (c *Client) runChangeDetection() {
for {
select {
case <-c.tick.C:
c.detectClusterChanges()
case <-c.done:
return
}
}
}

func (c *Client) Close() error {
c.tick.Stop()
close(c.done)
return nil
}

// detectClusterChanges chooses a random index and shard from the
// shardNodes cache and deletes it. It then looks it up from Pilosa to
// see if it still matches, and if not it drops the whole cache.
func (c *Client) detectClusterChanges() {
c.shardNodes.mu.Lock()
// we rely on Go's random map iteration order to get a random
// element. If it doesn't end up being random, it shouldn't
// actually matter.
for index, shardMap := range c.shardNodes.data {
for shard, uris := range shardMap {
delete(shardMap, shard)
c.shardNodes.data[index] = shardMap
c.shardNodes.mu.Unlock()
newURIs, err := c.getURIsForShard(index, shard) // refetch URIs from server.
if err != nil {
c.logger.Printf("problem invalidating shard node cache: %v", err)
return
}
if len(uris) != len(newURIs) {
c.logger.Printf("invalidating shard node cache old: %s, new: %s", URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
for i := range uris {
u1, u2 := uris[i], newURIs[i]
if *u1 != *u2 {
c.logger.Printf("invalidating shard node cache, uri mismatch at %d old: %s, new: %s", i, URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
}
break
}
break
}
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -138,6 +213,10 @@ func newClientWithOptions(options *ClientOptions) *Client {
client: newHTTPClient(options.withDefaults()),
logger: log.New(os.Stderr, "go-pilosa ", log.Flags()),
coordinatorLock: &sync.RWMutex{},

shardNodes: newShardNodes(),
tick: time.NewTicker(time.Minute),
done: make(chan struct{}, 0),
}
if options.importLogWriter != nil {
c.importLogEncoder = newImportLogEncoder(options.importLogWriter)
Expand All @@ -148,9 +227,10 @@ func newClientWithOptions(options *ClientOptions) *Client {
c.tracer = options.tracer
}
c.retries = *options.retries
c.minRetrySleepTime = 1 * time.Second
c.minRetrySleepTime = 100 * time.Millisecond
c.maxRetrySleepTime = 2 * time.Minute
c.importManager = newRecordImportManager(c)
go c.runChangeDetection()
return c

}
Expand Down Expand Up @@ -282,7 +362,7 @@ func (c *Client) EnsureIndex(index *Index) error {
if err == ErrIndexExists {
return nil
}
return err
return errors.Wrap(err, "creating index")
}

// EnsureField creates a field on the server if it doesn't exists.
Expand All @@ -296,13 +376,17 @@ func (c *Client) EnsureField(field *Field) error {

// DeleteIndex deletes an index on the server.
func (c *Client) DeleteIndex(index *Index) error {
return c.DeleteIndexByName(index.Name())
}

// DeleteIndexByName deletes the named index on the server.
func (c *Client) DeleteIndexByName(index string) error {
span := c.tracer.StartSpan("Client.DeleteIndex")
defer span.Finish()

path := fmt.Sprintf("/index/%s", index.name)
path := fmt.Sprintf("/index/%s", index)
_, _, err := c.httpRequest("DELETE", path, nil, nil, false)
return err

}

// DeleteField deletes a field on the server.
Expand Down Expand Up @@ -340,7 +424,7 @@ func (c *Client) syncSchema(schema *Schema, serverSchema *Schema) error {
if _, ok := serverSchema.indexes[indexName]; !ok {
err = c.EnsureIndex(index)
if err != nil {
return err
return errors.Wrap(err, "ensuring index")
}
}
for _, field := range index.fields {
Expand Down Expand Up @@ -535,7 +619,7 @@ func (c *Client) translateRecordsRowKeys(rowKeyIDMap *lru.LRU, field *Field, col
}
if len(keys) > 0 {
// translate missing keys
ids, err := c.translateRowKeys(field, keys)
ids, err := c.TranslateRowKeys(field, keys)
if err != nil {
return err
}
Expand Down Expand Up @@ -572,7 +656,7 @@ func (c *Client) translateRecordsColumnKeys(columnKeyIDMap *lru.LRU, index *Inde
}
if len(keys) > 0 {
// translate missing keys
ids, err := c.translateColumnKeys(index, keys)
ids, err := c.TranslateColumnKeys(index, keys)
if err != nil {
return err
}
Expand Down Expand Up @@ -641,6 +725,63 @@ func (c *Client) importValues(field *Field,
return errors.Wrap(err, "importing values to nodes")
}

// ImportValues takes the given integer values and column ids (which
// must all be in the given shard) and imports them into the given
// index,field,shard on all nodes which should hold that shard. It
// assumes that the ids have been translated from keys if necessary
// and so tells Pilosa to ignore checking if the index uses column
// keys. ImportValues wraps EncodeImportValues and DoImportValues —
// these are broken out and exported so that performance conscious
// users can re-use the same vals and ids byte buffers for local
// encoding, while performing the imports concurrently.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error {
path, data, err := c.EncodeImportValues(index, field, shard, vals, ids, clear)
if err != nil {
return errors.Wrap(err, "encoding import-values request")
}
err = c.DoImportValues(index, shard, path, data)
return errors.Wrap(err, "doing import values")
}

// EncodeImportValues computes the HTTP path and payload for an
// import-values request. It is typically followed by a call to
// DoImportValues.
func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) (path string, data []byte, err error) {
msg := &pbuf.ImportValueRequest{
Index: index,
Field: field,
Shard: shard,
ColumnIDs: ids,
Values: vals,
}
data, err = proto.Marshal(msg)
if err != nil {
return "", nil, errors.Wrap(err, "marshaling to protobuf")
}
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
return path, data, nil
}

// DoImportValues takes a path and data payload (normally from
// EncodeImportValues), logs the import, finds all nodes which own
// this shard, and concurrently imports to those nodes.
func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error {
c.logImport(index, path, shard, false, data)

uris, err := c.getURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}

eg := errgroup.Group{}
for _, uri := range uris {
eg.Go(func() error {
return c.importData(uri, path, data)
})
}
return errors.Wrap(eg.Wait(), "importing values to nodes")
}

func importPathData(field *Field, shard uint64, msg proto.Message, options *ImportOptions) (path string, data []byte, err error) {
data, err = proto.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -704,6 +845,18 @@ func (c *Client) importData(uri *URI, path string, data []byte) error {
return nil
}

// ImportRoaringBitmap can import pre-made bitmaps for a number of
// different views into the given field/shard. If the view name in the
// map is an empty string, the standard view will be used.
func (c *Client) ImportRoaringBitmap(field *Field, shard uint64, views map[string]*roaring.Bitmap, clear bool) error {
uris, err := c.getURIsForShard(field.index.Name(), shard)
if err != nil {
return errors.Wrap(err, "getting URIs for import")
}
err = c.importRoaringBitmap(uris[0], field, shard, views, &ImportOptions{clear: clear})
return errors.Wrap(err, "importing bitmap")
}

func (c *Client) importRoaringBitmap(uri *URI, field *Field, shard uint64, views viewImports, options *ImportOptions) error {
protoViews := []*pbuf.ImportRoaringRequestView{}
for name, bmp := range views {
Expand Down Expand Up @@ -971,7 +1124,10 @@ func (c *Client) doRequest(host *URI, method, path string, headers map[string]st
}
err = errors.New(strings.TrimSpace(string(content)))
}
c.logger.Printf("request failed with: %s, retrying (%d)", err.Error(), tries)
if tries == 0 {
break
}
c.logger.Printf("request failed with: %s status: %d, retrying after %d more time(s) after %v ", err.Error(), resp.StatusCode, tries, sleepTime)
time.Sleep(sleepTime)
sleepTime *= 2
if sleepTime > c.maxRetrySleepTime {
Expand Down Expand Up @@ -1023,7 +1179,7 @@ func (c *Client) augmentHeaders(headers map[string]string) map[string]string {
return headers
}

func (c *Client) translateRowKeys(field *Field, keys []string) ([]uint64, error) {
func (c *Client) TranslateRowKeys(field *Field, keys []string) ([]uint64, error) {
req := &pbuf.TranslateKeysRequest{
Index: field.index.name,
Field: field.name,
Expand All @@ -1032,7 +1188,7 @@ func (c *Client) translateRowKeys(field *Field, keys []string) ([]uint64, error)
return c.translateKeys(req, keys)
}

func (c *Client) translateColumnKeys(index *Index, keys []string) ([]uint64, error) {
func (c *Client) TranslateColumnKeys(index *Index, keys []string) ([]uint64, error) {
req := &pbuf.TranslateKeysRequest{
Index: index.name,
Keys: keys,
Expand Down
9 changes: 9 additions & 0 deletions client_internal_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ func TestImportWithReplayErrors(t *testing.T) {
t.Fatal("import replay hanging when no schema created")
}
}

func TestDetectClusterChanges(t *testing.T) {
c := getClient()
defer c.Close()
c.shardNodes.data["blah"] = make(map[uint64][]*URI)
c.shardNodes.data["blah"][1] = []*URI{{scheme: "zzz"}}

c.detectClusterChanges()
}
Loading

0 comments on commit 29aaccb

Please sign in to comment.