From 4129aee12032289abdc85fe923dd10b527598f2f Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Tue, 8 Oct 2019 14:27:29 -0500 Subject: [PATCH] support for string slice values in batch ingest slight retry improvements --- client.go | 7 +- go.mod | 2 + gpexp/importbatch.go | 143 ++++++++++++++++++++++++++++++++++---- gpexp/importbatch_test.go | 108 ++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index f20c3a4..f2c2f72 100644 --- a/client.go +++ b/client.go @@ -227,7 +227,7 @@ func newClientWithOptions(options *ClientOptions) *Client { c.tracer = options.tracer } c.retries = *options.retries - c.minRetrySleepTime = 1 * time.Second + c.minRetrySleepTime = 100 * time.Millisecond c.maxRetrySleepTime = 2 * time.Minute c.importManager = newRecordImportManager(c) go c.runChangeDetection() @@ -1124,7 +1124,10 @@ func (c *Client) doRequest(host *URI, method, path string, headers map[string]st } err = errors.New(strings.TrimSpace(string(content))) } - c.logger.Printf("request failed with: %s, retrying (%d)", err.Error(), tries) + if tries == 0 { + break + } + c.logger.Printf("request failed with: %s status: %d, retrying after %d more time(s) after %v ", err.Error(), resp.StatusCode, tries, sleepTime) time.Sleep(sleepTime) sleepTime *= 2 if sleepTime > c.maxRetrySleepTime { diff --git a/go.mod b/go.mod index ac26e55..81c5c19 100644 --- a/go.mod +++ b/go.mod @@ -13,3 +13,5 @@ require ( github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) + +go 1.12 diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 4269c49..9ee21b6 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -65,6 +65,11 @@ type Batch struct { // which contain row IDs. rowIDs map[string][]uint64 + // rowIDSets is a map from field name to a batchSize slice of + // slices of row IDs. When a given record can have more than one + // value for a field, rowIDSets stores that information. + rowIDSets map[string][][]uint64 + // values holds the values for each record of an int field values map[string][]int64 @@ -80,6 +85,11 @@ type Batch struct { // for each field, keep a map of key to which record indexes that key mapped to toTranslate map[string]map[string][]int + // toTranslateSets is a map from field name to a map of string + // keys that need to be translated to sets of record indexes which + // those keys map to. + toTranslateSets map[string]map[string][]int + // for string ids which we weren't able to immediately translate, // keep a map of which record(s) each string id maps to. // @@ -117,6 +127,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi rowIDs := make(map[string][]uint64) values := make(map[string][]int64) tt := make(map[string]map[string][]int) + ttSets := make(map[string]map[string][]int) hasTime := false for _, field := range fields { headerMap[field.Name()] = field @@ -125,6 +136,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi case pilosa.FieldTypeDefault, pilosa.FieldTypeSet, pilosa.FieldTypeTime: if opts.Keys() { tt[field.Name()] = make(map[string][]int) + ttSets[field.Name()] = make(map[string][]int) } rowIDs[field.Name()] = make([]uint64, 0, size) hasTime = typ == pilosa.FieldTypeTime || hasTime @@ -135,17 +147,19 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi } } b := &Batch{ - client: client, - header: fields, - headerMap: headerMap, - index: index, - ids: make([]uint64, 0, size), - rowIDs: rowIDs, - values: values, - nullIndices: make(map[string][]uint64), - toTranslate: tt, - toTranslateID: make(map[string][]int), - transCache: NewMapTranslator(), + client: client, + header: fields, + headerMap: headerMap, + index: index, + ids: make([]uint64, 0, size), + rowIDs: rowIDs, + rowIDSets: make(map[string][][]uint64), + values: values, + nullIndices: make(map[string][]uint64), + toTranslate: tt, + toTranslateSets: ttSets, + toTranslateID: make(map[string][]int), + transCache: NewMapTranslator(), } if hasTime { b.times = make([]QuantizedTime, 0, size) @@ -291,6 +305,9 @@ func (b *Batch) Add(rec Row) error { return errors.Errorf("unsupported id type %T value %v", rid, rid) } + // curPos is the current position in b.ids, rowIDs[*], etc. + curPos := len(b.ids) - 1 + if b.times != nil { b.times = append(b.times, rec.Time) } @@ -310,7 +327,7 @@ func (b *Batch) Add(rec Row) error { if !ok { ints = make([]int, 0) } - ints = append(ints, len(rowIDs)) + ints = append(ints, curPos) b.toTranslate[field.Name()][val] = ints b.rowIDs[field.Name()] = append(rowIDs, 0) } @@ -318,6 +335,30 @@ func (b *Batch) Add(rec Row) error { b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], val) case int64: b.values[field.Name()] = append(b.values[field.Name()], val) + case []string: + rowIDSets, ok := b.rowIDSets[field.Name()] + if !ok { + rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids)) + } else { + rowIDSets = rowIDSets[:len(b.ids)-1] // grow this field's rowIDSets if necessary + } + + rowIDs := make([]uint64, 0, len(val)) + for _, k := range val { + if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), k); err != nil { + return errors.Wrap(err, "translating row from []string") + } else if ok { + rowIDs = append(rowIDs, rowID) + } else { + ints, ok := b.toTranslateSets[field.Name()][k] + if !ok { + ints = make([]int, 0, 1) + } + ints = append(ints, curPos) + b.toTranslateSets[field.Name()][k] = ints + } + } + b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs) case nil: if field.Opts().Type() == pilosa.FieldTypeInt { b.values[field.Name()] = append(b.values[field.Name()], 0) @@ -325,7 +366,7 @@ func (b *Batch) Add(rec Row) error { if !ok { nullIndices = make([]uint64, 0) } - nullIndices = append(nullIndices, uint64(len(b.ids)-1)) + nullIndices = append(nullIndices, uint64(curPos)) b.nullIndices[field.Name()] = nullIndices } else { @@ -430,6 +471,33 @@ func (b *Batch) doTranslation() error { } } + for fieldName, tt := range b.toTranslateSets { + keys = keys[:0] + + for k := range tt { + keys = append(keys, k) + } + + if len(keys) == 0 { + continue + } + // translate keys from Pilosa + ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys) + if err != nil { + return errors.Wrap(err, "translating row keys") + } + if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil { + return errors.Wrap(err, "adding rows to cache") + } + rowIDSets := b.rowIDSets[fieldName] + for j, key := range keys { + rowID := ids[j] + for _, recordIdx := range tt[key] { + rowIDSets[recordIdx] = append(rowIDSets[recordIdx], rowID) + } + } + } + return nil } @@ -469,6 +537,9 @@ func (b *Batch) makeFragments() (fragments, error) { } frags := make(fragments) for fname, rowIDs := range b.rowIDs { + if len(rowIDs) == 0 { + continue // this can happen when the values that came in for this field were string slices + } field := b.headerMap[fname] opts := field.Opts() curShard := ^uint64(0) // impossible sentinel value for shard. @@ -502,6 +573,45 @@ func (b *Batch) makeFragments() (fragments, error) { } } } + + for fname, rowIDSets := range b.rowIDSets { + field := b.headerMap[fname] + opts := field.Opts() + curShard := ^uint64(0) // impossible sentinel value for shard. + var curBM *roaring.Bitmap + for j := range b.ids { + col, rowIDs := b.ids[j], rowIDSets[j] + if len(rowIDs) == 0 { + continue + } + if col/shardWidth != curShard { + curShard = col / shardWidth + curBM = frags.GetOrCreate(curShard, fname, "") + } + // TODO this is super ugly, but we want to avoid setting + // bits on the standard view in the specific case when + // there isn't one. Should probably refactor this whole + // loop to be more general w.r.t. views. Also... tests for + // the NoStandardView case would be great. + if !(opts.Type() == pilosa.FieldTypeTime && opts.NoStandardView()) { + for _, row := range rowIDs { + curBM.DirectAdd(row*shardWidth + (col % shardWidth)) + } + } + if opts.Type() == pilosa.FieldTypeTime { + views, err := b.times[j].views(opts.TimeQuantum()) + if err != nil { + return nil, errors.Wrap(err, "calculating views") + } + for _, view := range views { + tbm := frags.GetOrCreate(curShard, fname, view) + for _, row := range rowIDs { + tbm.DirectAdd(row*shardWidth + (col % shardWidth)) + } + } + } + } + } return frags, nil } @@ -512,7 +622,6 @@ func (b *Batch) importValueData() error { shardWidth = pilosa.DefaultShardWidth } eg := errgroup.Group{} - ids := make([]uint64, len(b.ids)) for field, values := range b.values { // grow our temp ids slice to full length @@ -584,10 +693,16 @@ func (b *Batch) reset() { b.times = b.times[:0] for fieldName, rowIDs := range b.rowIDs { b.rowIDs[fieldName] = rowIDs[:0] + rowIDSet := b.rowIDSets[fieldName] + b.rowIDSets[fieldName] = rowIDSet[:0] m := b.toTranslate[fieldName] for k := range m { delete(m, k) // TODO pool these slices } + m = b.toTranslateSets[fieldName] + for k := range m { + delete(m, k) + } } for k := range b.toTranslateID { delete(b.toTranslateID, k) // TODO pool these slices diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index 6e3f6ec..d99f746 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -72,6 +72,114 @@ func TestImportBatchInts(t *testing.T) { } } +func TestStringSlice(t *testing.T) { + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("test-string-slice") + fields := make([]*pilosa.Field, 1) + fields[0] = idx.Field("strslice", pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100)) + err := client.SyncSchema(schema) + if err != nil { + t.Fatalf("syncing schema: %v", err) + } + defer func() { + err := client.DeleteIndex(idx) + if err != nil { + t.Logf("problem cleaning up from test: %v", err) + } + }() + + trans := NewMapTranslator() + err = trans.AddRows("test-string-slice", "strslice", []string{"c", "d", "f"}, []uint64{9, 10, 13}) + if err != nil { + t.Fatalf("adding to translator: %v", err) + } + + b, err := NewBatch(client, 3, idx, fields, OptTranslator(trans)) + if err != nil { + t.Fatalf("creating new batch: %v", err) + } + + r := Row{Values: make([]interface{}, len(fields))} + r.ID = uint64(0) + r.Values[0] = []string{"a"} + err = b.Add(r) + if err != nil { + t.Fatalf("adding to batch: %v", err) + } + if got := b.toTranslateSets["strslice"]["a"]; !reflect.DeepEqual(got, []int{0}) { + t.Fatalf("expected []int{0}, got: %v", got) + } + + r.ID = uint64(1) + r.Values[0] = []string{"a", "b", "c"} + err = b.Add(r) + if err != nil { + t.Fatalf("adding to batch: %v", err) + } + if got := b.toTranslateSets["strslice"]["a"]; !reflect.DeepEqual(got, []int{0, 1}) { + t.Fatalf("expected []int{0,1}, got: %v", got) + } + if got := b.toTranslateSets["strslice"]["b"]; !reflect.DeepEqual(got, []int{1}) { + t.Fatalf("expected []int{1}, got: %v", got) + } + if got, ok := b.toTranslateSets["strslice"]["c"]; ok { + t.Fatalf("should be nothing at c, got: %v", got) + } + if got := b.rowIDSets["strslice"][1]; !reflect.DeepEqual(got, []uint64{9}) { + t.Fatalf("expected c to map to rowID 9 but got %v", got) + } + + r.ID = uint64(2) + r.Values[0] = []string{"d", "e", "f"} + err = b.Add(r) + if err != ErrBatchNowFull { + t.Fatalf("adding to batch: %v", err) + } + if got, ok := b.toTranslateSets["strslice"]["d"]; ok { + t.Fatalf("should be nothing at d, got: %v", got) + } + if got, ok := b.toTranslateSets["strslice"]["f"]; ok { + t.Fatalf("should be nothing at f, got: %v", got) + } + if got := b.toTranslateSets["strslice"]["e"]; !reflect.DeepEqual(got, []int{2}) { + t.Fatalf("expected []int{2}, got: %v", got) + } + if got := b.rowIDSets["strslice"][2]; !reflect.DeepEqual(got, []uint64{10, 13}) { + t.Fatalf("expected c to map to rowID 9 but got %v", got) + } + + err = b.doTranslation() + if err != nil { + t.Fatalf("translating: %v", err) + } + + if got := b.rowIDSets["strslice"][0]; !reflect.DeepEqual(got, []uint64{1}) { + t.Fatalf("after translation, rec 0: %v", got) + } + if got := b.rowIDSets["strslice"][1]; !reflect.DeepEqual(got, []uint64{9, 1, 2}) { + t.Fatalf("after translation, rec 1: %v", got) + } + if got := b.rowIDSets["strslice"][2]; !reflect.DeepEqual(got, []uint64{10, 13, 3}) { + t.Fatalf("after translation, rec 2: %v", got) + } + + err = b.doImport() + if err != nil { + t.Fatalf("doing import: %v", err) + } + + resp, err := client.Query(idx.BatchQuery(fields[0].Row("a"))) + if err != nil { + t.Fatalf("querying: %v", err) + } + result := resp.Result() + if !reflect.DeepEqual(result.Row().Columns, []uint64{0, 1}) { + t.Fatalf("expected a to be [0,1], got %v", result.Row().Columns) + } + +} + func TestBatches(t *testing.T) { client := pilosa.DefaultClient() schema := pilosa.NewSchema()