Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Batch ingest #253

Merged
merged 26 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a3d15a2
WIP: proof of concept for batch based ingest
jaffee Aug 20, 2019
62bc87f
support for string record IDs, parity with old ingest
jaffee Aug 21, 2019
edf6cb8
implement int field support
jaffee Aug 26, 2019
3146a31
add many config options to picsv, test, benchmark
jaffee Aug 27, 2019
802f882
support for empty values in importbatch
jaffee Aug 27, 2019
9634954
documentation for batch importer
jaffee Aug 27, 2019
2fa57bd
gofmt -s importbatch.go
jaffee Aug 27, 2019
eed88d3
add locks on translator cache, shardNodes invalidation
jaffee Aug 27, 2019
4cc60a1
gofmt -s
jaffee Aug 28, 2019
4d7a11e
move Translator out of client to ImportBatch
jaffee Aug 28, 2019
c33123a
interfacify Translator, add errors and batch addition
jaffee Aug 28, 2019
6d88b17
export client KeyTranslation and ImportRoaring methods
jaffee Aug 28, 2019
994cf36
move batch stuff to subpackage
jaffee Aug 28, 2019
a930004
update circle CI to Go 1.12/13-rc
jaffee Aug 28, 2019
8e2f334
use shardnode cluster change detector w/ cleanup
jaffee Aug 28, 2019
c2b019c
remove picsv command (moving to pdk)
jaffee Aug 31, 2019
b811c3b
start adding time quantum support to batch import
jaffee Sep 9, 2019
e40d84a
complete per-record timestamp support
jaffee Sep 9, 2019
2dfc4b7
test and fix clearValues bug
jaffee Sep 10, 2019
bfe8680
handle byte slice batch record IDs
jaffee Oct 4, 2019
9b81c1a
fix and comment for importvalues
jaffee Oct 4, 2019
4129aee
support for string slice values in batch ingest
jaffee Oct 8, 2019
6791c14
fix importbatch bug, wrap some errs
jaffee Oct 8, 2019
1d062fd
support multiple of the same field in batch
jaffee Oct 9, 2019
58c2816
map from int instead of slice
jaffee Oct 9, 2019
d00044f
fix flaky test due to unpredictable translation order
jaffee Oct 11, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
defaults: &defaults
working_directory: /go/src/github.com/pilosa/go-pilosa
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.12
environment:
GO111MODULE: "on"
fast-checkout: &fast-checkout
Expand Down Expand Up @@ -30,18 +30,18 @@ jobs:
- *fast-checkout
- run: make install-gometalinter
- run: make gometalinter
test-golang-1.12-rc: &base-test
test-golang-1.13: &base-test
<<: *defaults
steps:
- *fast-checkout
- run: make test-all
docker:
- image: circleci/golang:1.12-rc
- image: circleci/golang:1.13
- image: pilosa/pilosa:master
test-golang-1.11:
test-golang-1.12:
<<: *base-test
docker:
- image: circleci/golang:1.11
- image: circleci/golang:1.12
- image: pilosa/pilosa:master
workflows:
version: 2
Expand All @@ -51,9 +51,9 @@ workflows:
- linter:
requires:
- build
- test-golang-1.12-rc:
- test-golang-1.13:
requires:
- build
- test-golang-1.11:
- test-golang-1.12:
requires:
- build
176 changes: 166 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,81 @@ type Client struct {

importLogEncoder encoder
logLock sync.Mutex

shardNodes shardNodes
tick *time.Ticker
done chan struct{}
}

func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
return uris, nil
}
fragmentNodes, err := c.fetchFragmentNodes(index, shard)
if err != nil {
return nil, errors.Wrap(err, "trying to look up nodes for shard")
}
uris = make([]*URI, 0, len(fragmentNodes))
for _, fn := range fragmentNodes {
uris = append(uris, fn.URI())
}
c.shardNodes.Put(index, shard, uris)
return uris, nil
}

func (c *Client) runChangeDetection() {
for {
select {
case <-c.tick.C:
c.detectClusterChanges()
case <-c.done:
return
}
}
}

func (c *Client) Close() error {
c.tick.Stop()
close(c.done)
return nil
}

// detectClusterChanges chooses a random index and shard from the
// shardNodes cache and deletes it. It then looks it up from Pilosa to
// see if it still matches, and if not it drops the whole cache.
func (c *Client) detectClusterChanges() {
c.shardNodes.mu.Lock()
// we rely on Go's random map iteration order to get a random
// element. If it doesn't end up being random, it shouldn't
// actually matter.
for index, shardMap := range c.shardNodes.data {
for shard, uris := range shardMap {
delete(shardMap, shard)
c.shardNodes.data[index] = shardMap
c.shardNodes.mu.Unlock()
newURIs, err := c.getURIsForShard(index, shard) // refetch URIs from server.
if err != nil {
c.logger.Printf("problem invalidating shard node cache: %v", err)
return
}
if len(uris) != len(newURIs) {
c.logger.Printf("invalidating shard node cache old: %s, new: %s", URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
for i := range uris {
u1, u2 := uris[i], newURIs[i]
if *u1 != *u2 {
c.logger.Printf("invalidating shard node cache, uri mismatch at %d old: %s, new: %s", i, URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
}
break
}
break
}
}

// DefaultClient creates a client with the default address and options.
Expand Down Expand Up @@ -138,6 +213,10 @@ func newClientWithOptions(options *ClientOptions) *Client {
client: newHTTPClient(options.withDefaults()),
logger: log.New(os.Stderr, "go-pilosa ", log.Flags()),
coordinatorLock: &sync.RWMutex{},

shardNodes: newShardNodes(),
tick: time.NewTicker(time.Minute),
done: make(chan struct{}, 0),
}
if options.importLogWriter != nil {
c.importLogEncoder = newImportLogEncoder(options.importLogWriter)
Expand All @@ -148,9 +227,10 @@ func newClientWithOptions(options *ClientOptions) *Client {
c.tracer = options.tracer
}
c.retries = *options.retries
c.minRetrySleepTime = 1 * time.Second
c.minRetrySleepTime = 100 * time.Millisecond
c.maxRetrySleepTime = 2 * time.Minute
c.importManager = newRecordImportManager(c)
go c.runChangeDetection()
return c

}
Expand Down Expand Up @@ -282,7 +362,7 @@ func (c *Client) EnsureIndex(index *Index) error {
if err == ErrIndexExists {
return nil
}
return err
return errors.Wrap(err, "creating index")
}

// EnsureField creates a field on the server if it doesn't exists.
Expand All @@ -296,13 +376,17 @@ func (c *Client) EnsureField(field *Field) error {

// DeleteIndex deletes an index on the server.
func (c *Client) DeleteIndex(index *Index) error {
return c.DeleteIndexByName(index.Name())
}

// DeleteIndexByName deletes the named index on the server.
func (c *Client) DeleteIndexByName(index string) error {
span := c.tracer.StartSpan("Client.DeleteIndex")
defer span.Finish()

path := fmt.Sprintf("/index/%s", index.name)
path := fmt.Sprintf("/index/%s", index)
_, _, err := c.httpRequest("DELETE", path, nil, nil, false)
return err

}

// DeleteField deletes a field on the server.
Expand Down Expand Up @@ -340,7 +424,7 @@ func (c *Client) syncSchema(schema *Schema, serverSchema *Schema) error {
if _, ok := serverSchema.indexes[indexName]; !ok {
err = c.EnsureIndex(index)
if err != nil {
return err
return errors.Wrap(err, "ensuring index")
}
}
for _, field := range index.fields {
Expand Down Expand Up @@ -535,7 +619,7 @@ func (c *Client) translateRecordsRowKeys(rowKeyIDMap *lru.LRU, field *Field, col
}
if len(keys) > 0 {
// translate missing keys
ids, err := c.translateRowKeys(field, keys)
ids, err := c.TranslateRowKeys(field, keys)
if err != nil {
return err
}
Expand Down Expand Up @@ -572,7 +656,7 @@ func (c *Client) translateRecordsColumnKeys(columnKeyIDMap *lru.LRU, index *Inde
}
if len(keys) > 0 {
// translate missing keys
ids, err := c.translateColumnKeys(index, keys)
ids, err := c.TranslateColumnKeys(index, keys)
if err != nil {
return err
}
Expand Down Expand Up @@ -641,6 +725,63 @@ func (c *Client) importValues(field *Field,
return errors.Wrap(err, "importing values to nodes")
}

// ImportValues takes the given integer values and column ids (which
// must all be in the given shard) and imports them into the given
// index,field,shard on all nodes which should hold that shard. It
// assumes that the ids have been translated from keys if necessary
// and so tells Pilosa to ignore checking if the index uses column
// keys. ImportValues wraps EncodeImportValues and DoImportValues —
// these are broken out and exported so that performance conscious
// users can re-use the same vals and ids byte buffers for local
// encoding, while performing the imports concurrently.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error {
path, data, err := c.EncodeImportValues(index, field, shard, vals, ids, clear)
if err != nil {
return errors.Wrap(err, "encoding import-values request")
}
err = c.DoImportValues(index, shard, path, data)
return errors.Wrap(err, "doing import values")
}

// EncodeImportValues computes the HTTP path and payload for an
// import-values request. It is typically followed by a call to
// DoImportValues.
func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) (path string, data []byte, err error) {
msg := &pbuf.ImportValueRequest{
Index: index,
Field: field,
Shard: shard,
ColumnIDs: ids,
Values: vals,
}
data, err = proto.Marshal(msg)
if err != nil {
return "", nil, errors.Wrap(err, "marshaling to protobuf")
}
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
return path, data, nil
}

// DoImportValues takes a path and data payload (normally from
// EncodeImportValues), logs the import, finds all nodes which own
// this shard, and concurrently imports to those nodes.
func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error {
c.logImport(index, path, shard, false, data)

uris, err := c.getURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}

eg := errgroup.Group{}
for _, uri := range uris {
eg.Go(func() error {
return c.importData(uri, path, data)
})
}
return errors.Wrap(eg.Wait(), "importing values to nodes")
}

func importPathData(field *Field, shard uint64, msg proto.Message, options *ImportOptions) (path string, data []byte, err error) {
data, err = proto.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -704,6 +845,18 @@ func (c *Client) importData(uri *URI, path string, data []byte) error {
return nil
}

// ImportRoaringBitmap can import pre-made bitmaps for a number of
// different views into the given field/shard. If the view name in the
// map is an empty string, the standard view will be used.
func (c *Client) ImportRoaringBitmap(field *Field, shard uint64, views map[string]*roaring.Bitmap, clear bool) error {
uris, err := c.getURIsForShard(field.index.Name(), shard)
if err != nil {
return errors.Wrap(err, "getting URIs for import")
}
err = c.importRoaringBitmap(uris[0], field, shard, views, &ImportOptions{clear: clear})
return errors.Wrap(err, "importing bitmap")
}

func (c *Client) importRoaringBitmap(uri *URI, field *Field, shard uint64, views viewImports, options *ImportOptions) error {
protoViews := []*pbuf.ImportRoaringRequestView{}
for name, bmp := range views {
Expand Down Expand Up @@ -971,7 +1124,10 @@ func (c *Client) doRequest(host *URI, method, path string, headers map[string]st
}
err = errors.New(strings.TrimSpace(string(content)))
}
c.logger.Printf("request failed with: %s, retrying (%d)", err.Error(), tries)
if tries == 0 {
break
}
c.logger.Printf("request failed with: %s status: %d, retrying after %d more time(s) after %v ", err.Error(), resp.StatusCode, tries, sleepTime)
time.Sleep(sleepTime)
sleepTime *= 2
if sleepTime > c.maxRetrySleepTime {
Expand Down Expand Up @@ -1023,7 +1179,7 @@ func (c *Client) augmentHeaders(headers map[string]string) map[string]string {
return headers
}

func (c *Client) translateRowKeys(field *Field, keys []string) ([]uint64, error) {
func (c *Client) TranslateRowKeys(field *Field, keys []string) ([]uint64, error) {
req := &pbuf.TranslateKeysRequest{
Index: field.index.name,
Field: field.name,
Expand All @@ -1032,7 +1188,7 @@ func (c *Client) translateRowKeys(field *Field, keys []string) ([]uint64, error)
return c.translateKeys(req, keys)
}

func (c *Client) translateColumnKeys(index *Index, keys []string) ([]uint64, error) {
func (c *Client) TranslateColumnKeys(index *Index, keys []string) ([]uint64, error) {
req := &pbuf.TranslateKeysRequest{
Index: index.name,
Keys: keys,
Expand Down
9 changes: 9 additions & 0 deletions client_internal_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ func TestImportWithReplayErrors(t *testing.T) {
t.Fatal("import replay hanging when no schema created")
}
}

func TestDetectClusterChanges(t *testing.T) {
c := getClient()
defer c.Close()
c.shardNodes.data["blah"] = make(map[uint64][]*URI)
c.shardNodes.data["blah"][1] = []*URI{{scheme: "zzz"}}

c.detectClusterChanges()
}
Loading