Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
support for string slice values in batch ingest
Browse files Browse the repository at this point in the history
slight retry improvements
  • Loading branch information
jaffee committed Oct 8, 2019
1 parent 9b81c1a commit 4129aee
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 16 deletions.
7 changes: 5 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func newClientWithOptions(options *ClientOptions) *Client {
c.tracer = options.tracer
}
c.retries = *options.retries
c.minRetrySleepTime = 1 * time.Second
c.minRetrySleepTime = 100 * time.Millisecond
c.maxRetrySleepTime = 2 * time.Minute
c.importManager = newRecordImportManager(c)
go c.runChangeDetection()
Expand Down Expand Up @@ -1124,7 +1124,10 @@ func (c *Client) doRequest(host *URI, method, path string, headers map[string]st
}
err = errors.New(strings.TrimSpace(string(content)))
}
c.logger.Printf("request failed with: %s, retrying (%d)", err.Error(), tries)
if tries == 0 {
break
}
c.logger.Printf("request failed with: %s status: %d, retrying after %d more time(s) after %v ", err.Error(), resp.StatusCode, tries, sleepTime)
time.Sleep(sleepTime)
sleepTime *= 2
if sleepTime > c.maxRetrySleepTime {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ require (
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)

go 1.12
143 changes: 129 additions & 14 deletions gpexp/importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type Batch struct {
// which contain row IDs.
rowIDs map[string][]uint64

// rowIDSets is a map from field name to a batchSize slice of
// slices of row IDs. When a given record can have more than one
// value for a field, rowIDSets stores that information.
rowIDSets map[string][][]uint64

// values holds the values for each record of an int field
values map[string][]int64

Expand All @@ -80,6 +85,11 @@ type Batch struct {
// for each field, keep a map of key to which record indexes that key mapped to
toTranslate map[string]map[string][]int

// toTranslateSets is a map from field name to a map of string
// keys that need to be translated to sets of record indexes which
// those keys map to.
toTranslateSets map[string]map[string][]int

// for string ids which we weren't able to immediately translate,
// keep a map of which record(s) each string id maps to.
//
Expand Down Expand Up @@ -117,6 +127,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
rowIDs := make(map[string][]uint64)
values := make(map[string][]int64)
tt := make(map[string]map[string][]int)
ttSets := make(map[string]map[string][]int)
hasTime := false
for _, field := range fields {
headerMap[field.Name()] = field
Expand All @@ -125,6 +136,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
case pilosa.FieldTypeDefault, pilosa.FieldTypeSet, pilosa.FieldTypeTime:
if opts.Keys() {
tt[field.Name()] = make(map[string][]int)
ttSets[field.Name()] = make(map[string][]int)
}
rowIDs[field.Name()] = make([]uint64, 0, size)
hasTime = typ == pilosa.FieldTypeTime || hasTime
Expand All @@ -135,17 +147,19 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
}
}
b := &Batch{
client: client,
header: fields,
headerMap: headerMap,
index: index,
ids: make([]uint64, 0, size),
rowIDs: rowIDs,
values: values,
nullIndices: make(map[string][]uint64),
toTranslate: tt,
toTranslateID: make(map[string][]int),
transCache: NewMapTranslator(),
client: client,
header: fields,
headerMap: headerMap,
index: index,
ids: make([]uint64, 0, size),
rowIDs: rowIDs,
rowIDSets: make(map[string][][]uint64),
values: values,
nullIndices: make(map[string][]uint64),
toTranslate: tt,
toTranslateSets: ttSets,
toTranslateID: make(map[string][]int),
transCache: NewMapTranslator(),
}
if hasTime {
b.times = make([]QuantizedTime, 0, size)
Expand Down Expand Up @@ -291,6 +305,9 @@ func (b *Batch) Add(rec Row) error {
return errors.Errorf("unsupported id type %T value %v", rid, rid)
}

// curPos is the current position in b.ids, rowIDs[*], etc.
curPos := len(b.ids) - 1

if b.times != nil {
b.times = append(b.times, rec.Time)
}
Expand All @@ -310,22 +327,46 @@ func (b *Batch) Add(rec Row) error {
if !ok {
ints = make([]int, 0)
}
ints = append(ints, len(rowIDs))
ints = append(ints, curPos)
b.toTranslate[field.Name()][val] = ints
b.rowIDs[field.Name()] = append(rowIDs, 0)
}
case uint64:
b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], val)
case int64:
b.values[field.Name()] = append(b.values[field.Name()], val)
case []string:
rowIDSets, ok := b.rowIDSets[field.Name()]
if !ok {
rowIDSets = make([][]uint64, len(b.ids)-1, cap(b.ids))
} else {
rowIDSets = rowIDSets[:len(b.ids)-1] // grow this field's rowIDSets if necessary
}

rowIDs := make([]uint64, 0, len(val))
for _, k := range val {
if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), k); err != nil {
return errors.Wrap(err, "translating row from []string")
} else if ok {
rowIDs = append(rowIDs, rowID)
} else {
ints, ok := b.toTranslateSets[field.Name()][k]
if !ok {
ints = make([]int, 0, 1)
}
ints = append(ints, curPos)
b.toTranslateSets[field.Name()][k] = ints
}
}
b.rowIDSets[field.Name()] = append(rowIDSets, rowIDs)
case nil:
if field.Opts().Type() == pilosa.FieldTypeInt {
b.values[field.Name()] = append(b.values[field.Name()], 0)
nullIndices, ok := b.nullIndices[field.Name()]
if !ok {
nullIndices = make([]uint64, 0)
}
nullIndices = append(nullIndices, uint64(len(b.ids)-1))
nullIndices = append(nullIndices, uint64(curPos))
b.nullIndices[field.Name()] = nullIndices

} else {
Expand Down Expand Up @@ -430,6 +471,33 @@ func (b *Batch) doTranslation() error {
}
}

for fieldName, tt := range b.toTranslateSets {
keys = keys[:0]

for k := range tt {
keys = append(keys, k)
}

if len(keys) == 0 {
continue
}
// translate keys from Pilosa
ids, err := b.client.TranslateRowKeys(b.headerMap[fieldName], keys)
if err != nil {
return errors.Wrap(err, "translating row keys")
}
if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil {
return errors.Wrap(err, "adding rows to cache")
}
rowIDSets := b.rowIDSets[fieldName]
for j, key := range keys {
rowID := ids[j]
for _, recordIdx := range tt[key] {
rowIDSets[recordIdx] = append(rowIDSets[recordIdx], rowID)
}
}
}

return nil
}

Expand Down Expand Up @@ -469,6 +537,9 @@ func (b *Batch) makeFragments() (fragments, error) {
}
frags := make(fragments)
for fname, rowIDs := range b.rowIDs {
if len(rowIDs) == 0 {
continue // this can happen when the values that came in for this field were string slices
}
field := b.headerMap[fname]
opts := field.Opts()
curShard := ^uint64(0) // impossible sentinel value for shard.
Expand Down Expand Up @@ -502,6 +573,45 @@ func (b *Batch) makeFragments() (fragments, error) {
}
}
}

for fname, rowIDSets := range b.rowIDSets {
field := b.headerMap[fname]
opts := field.Opts()
curShard := ^uint64(0) // impossible sentinel value for shard.
var curBM *roaring.Bitmap
for j := range b.ids {
col, rowIDs := b.ids[j], rowIDSets[j]
if len(rowIDs) == 0 {
continue
}
if col/shardWidth != curShard {
curShard = col / shardWidth
curBM = frags.GetOrCreate(curShard, fname, "")
}
// TODO this is super ugly, but we want to avoid setting
// bits on the standard view in the specific case when
// there isn't one. Should probably refactor this whole
// loop to be more general w.r.t. views. Also... tests for
// the NoStandardView case would be great.
if !(opts.Type() == pilosa.FieldTypeTime && opts.NoStandardView()) {
for _, row := range rowIDs {
curBM.DirectAdd(row*shardWidth + (col % shardWidth))
}
}
if opts.Type() == pilosa.FieldTypeTime {
views, err := b.times[j].views(opts.TimeQuantum())
if err != nil {
return nil, errors.Wrap(err, "calculating views")
}
for _, view := range views {
tbm := frags.GetOrCreate(curShard, fname, view)
for _, row := range rowIDs {
tbm.DirectAdd(row*shardWidth + (col % shardWidth))
}
}
}
}
}
return frags, nil
}

Expand All @@ -512,7 +622,6 @@ func (b *Batch) importValueData() error {
shardWidth = pilosa.DefaultShardWidth
}
eg := errgroup.Group{}

ids := make([]uint64, len(b.ids))
for field, values := range b.values {
// grow our temp ids slice to full length
Expand Down Expand Up @@ -584,10 +693,16 @@ func (b *Batch) reset() {
b.times = b.times[:0]
for fieldName, rowIDs := range b.rowIDs {
b.rowIDs[fieldName] = rowIDs[:0]
rowIDSet := b.rowIDSets[fieldName]
b.rowIDSets[fieldName] = rowIDSet[:0]
m := b.toTranslate[fieldName]
for k := range m {
delete(m, k) // TODO pool these slices
}
m = b.toTranslateSets[fieldName]
for k := range m {
delete(m, k)
}
}
for k := range b.toTranslateID {
delete(b.toTranslateID, k) // TODO pool these slices
Expand Down
108 changes: 108 additions & 0 deletions gpexp/importbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,114 @@ func TestImportBatchInts(t *testing.T) {
}
}

func TestStringSlice(t *testing.T) {
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
idx := schema.Index("test-string-slice")
fields := make([]*pilosa.Field, 1)
fields[0] = idx.Field("strslice", pilosa.OptFieldKeys(true), pilosa.OptFieldTypeSet(pilosa.CacheTypeRanked, 100))
err := client.SyncSchema(schema)
if err != nil {
t.Fatalf("syncing schema: %v", err)
}
defer func() {
err := client.DeleteIndex(idx)
if err != nil {
t.Logf("problem cleaning up from test: %v", err)
}
}()

trans := NewMapTranslator()
err = trans.AddRows("test-string-slice", "strslice", []string{"c", "d", "f"}, []uint64{9, 10, 13})
if err != nil {
t.Fatalf("adding to translator: %v", err)
}

b, err := NewBatch(client, 3, idx, fields, OptTranslator(trans))
if err != nil {
t.Fatalf("creating new batch: %v", err)
}

r := Row{Values: make([]interface{}, len(fields))}
r.ID = uint64(0)
r.Values[0] = []string{"a"}
err = b.Add(r)
if err != nil {
t.Fatalf("adding to batch: %v", err)
}
if got := b.toTranslateSets["strslice"]["a"]; !reflect.DeepEqual(got, []int{0}) {
t.Fatalf("expected []int{0}, got: %v", got)
}

r.ID = uint64(1)
r.Values[0] = []string{"a", "b", "c"}
err = b.Add(r)
if err != nil {
t.Fatalf("adding to batch: %v", err)
}
if got := b.toTranslateSets["strslice"]["a"]; !reflect.DeepEqual(got, []int{0, 1}) {
t.Fatalf("expected []int{0,1}, got: %v", got)
}
if got := b.toTranslateSets["strslice"]["b"]; !reflect.DeepEqual(got, []int{1}) {
t.Fatalf("expected []int{1}, got: %v", got)
}
if got, ok := b.toTranslateSets["strslice"]["c"]; ok {
t.Fatalf("should be nothing at c, got: %v", got)
}
if got := b.rowIDSets["strslice"][1]; !reflect.DeepEqual(got, []uint64{9}) {
t.Fatalf("expected c to map to rowID 9 but got %v", got)
}

r.ID = uint64(2)
r.Values[0] = []string{"d", "e", "f"}
err = b.Add(r)
if err != ErrBatchNowFull {
t.Fatalf("adding to batch: %v", err)
}
if got, ok := b.toTranslateSets["strslice"]["d"]; ok {
t.Fatalf("should be nothing at d, got: %v", got)
}
if got, ok := b.toTranslateSets["strslice"]["f"]; ok {
t.Fatalf("should be nothing at f, got: %v", got)
}
if got := b.toTranslateSets["strslice"]["e"]; !reflect.DeepEqual(got, []int{2}) {
t.Fatalf("expected []int{2}, got: %v", got)
}
if got := b.rowIDSets["strslice"][2]; !reflect.DeepEqual(got, []uint64{10, 13}) {
t.Fatalf("expected c to map to rowID 9 but got %v", got)
}

err = b.doTranslation()
if err != nil {
t.Fatalf("translating: %v", err)
}

if got := b.rowIDSets["strslice"][0]; !reflect.DeepEqual(got, []uint64{1}) {
t.Fatalf("after translation, rec 0: %v", got)
}
if got := b.rowIDSets["strslice"][1]; !reflect.DeepEqual(got, []uint64{9, 1, 2}) {
t.Fatalf("after translation, rec 1: %v", got)
}
if got := b.rowIDSets["strslice"][2]; !reflect.DeepEqual(got, []uint64{10, 13, 3}) {
t.Fatalf("after translation, rec 2: %v", got)
}

err = b.doImport()
if err != nil {
t.Fatalf("doing import: %v", err)
}

resp, err := client.Query(idx.BatchQuery(fields[0].Row("a")))
if err != nil {
t.Fatalf("querying: %v", err)
}
result := resp.Result()
if !reflect.DeepEqual(result.Row().Columns, []uint64{0, 1}) {
t.Fatalf("expected a to be [0,1], got %v", result.Row().Columns)
}

}

func TestBatches(t *testing.T) {
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
Expand Down

0 comments on commit 4129aee

Please sign in to comment.