Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
support multiple of the same field in batch
Browse files Browse the repository at this point in the history
needed to convert rowIDs and toTranslate to map from field index
rather than field name... going to continue refactoring this to use a
map from index rather than a slice
  • Loading branch information
jaffee committed Oct 9, 2019
1 parent 6791c14 commit 1d062fd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 41 deletions.
52 changes: 27 additions & 25 deletions gpexp/importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Batch struct {
// ids is a slice of length batchSize of record IDs
ids []uint64

// rowIDs is a map of field names to slices of length batchSize
// which contain row IDs.
rowIDs map[string][]uint64
// rowIDs is a map of field index (in the header) to slices of
// length batchSize which contain row IDs.
rowIDs [][]uint64

// rowIDSets is a map from field name to a batchSize slice of
// slices of row IDs. When a given record can have more than one
Expand All @@ -83,7 +83,7 @@ type Batch struct {
// TODO support mutex and bool fields.

// for each field, keep a map of key to which record indexes that key mapped to
toTranslate map[string]map[string][]int
toTranslate []map[string][]int

// toTranslateSets is a map from field name to a map of string
// keys that need to be translated to sets of record indexes which
Expand Down Expand Up @@ -124,21 +124,21 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
return nil, errors.New("can't batch with no fields or batch size")
}
headerMap := make(map[string]*pilosa.Field, len(fields))
rowIDs := make(map[string][]uint64)
rowIDs := make([][]uint64, len(fields))
values := make(map[string][]int64)
tt := make(map[string]map[string][]int)
tt := make([]map[string][]int, len(fields))
ttSets := make(map[string]map[string][]int)
hasTime := false
for _, field := range fields {
for i, field := range fields {
headerMap[field.Name()] = field
opts := field.Opts()
switch typ := opts.Type(); typ {
case pilosa.FieldTypeDefault, pilosa.FieldTypeSet, pilosa.FieldTypeTime:
if opts.Keys() {
tt[field.Name()] = make(map[string][]int)
tt[i] = make(map[string][]int)
ttSets[field.Name()] = make(map[string][]int)
}
rowIDs[field.Name()] = make([]uint64, 0, size)
rowIDs[i] = make([]uint64, 0, size) // TODO make this on-demand when it gets used. could be a string array field.
hasTime = typ == pilosa.FieldTypeTime || hasTime
case pilosa.FieldTypeInt:
values[field.Name()] = make([]int64, 0, size)
Expand Down Expand Up @@ -316,23 +316,23 @@ func (b *Batch) Add(rec Row) error {
field := b.header[i]
switch val := rec.Values[i].(type) {
case string:
rowIDs := b.rowIDs[field.Name()]
rowIDs := b.rowIDs[i]
// translate val and append to b.rowIDs[i]
if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil {
return errors.Wrap(err, "translating row")
} else if ok {
b.rowIDs[field.Name()] = append(rowIDs, rowID)
b.rowIDs[i] = append(rowIDs, rowID)
} else {
ints, ok := b.toTranslate[field.Name()][val]
ints, ok := b.toTranslate[i][val]
if !ok {
ints = make([]int, 0)
}
ints = append(ints, curPos)
b.toTranslate[field.Name()][val] = ints
b.rowIDs[field.Name()] = append(rowIDs, 0)
b.toTranslate[i][val] = ints
b.rowIDs[i] = append(rowIDs, 0)
}
case uint64:
b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], val)
b.rowIDs[i] = append(b.rowIDs[i], val)
case int64:
b.values[field.Name()] = append(b.values[field.Name()], val)
case []string:
Expand Down Expand Up @@ -370,7 +370,7 @@ func (b *Batch) Add(rec Row) error {
b.nullIndices[field.Name()] = nullIndices

} else {
b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], nilSentinel)
b.rowIDs[i] = append(b.rowIDs[i], nilSentinel)
}
default:
return errors.Errorf("Val %v Type %[1]T is not currently supported. Use string, uint64 (row id), or int64 (integer value)", val)
Expand Down Expand Up @@ -440,7 +440,8 @@ func (b *Batch) doTranslation() error {
}

// translate row keys
for fieldName, tt := range b.toTranslate {
for i, tt := range b.toTranslate {
fieldName := b.header[i].Name()
keys = keys[:0]

// make a slice of keys
Expand All @@ -462,7 +463,7 @@ func (b *Batch) doTranslation() error {
}

// fill out missing IDs in local batch records with translated IDs
rows := b.rowIDs[fieldName]
rows := b.rowIDs[i]
for j, key := range keys {
id := ids[j]
for _, recordIdx := range tt[key] {
Expand Down Expand Up @@ -536,11 +537,11 @@ func (b *Batch) makeFragments() (fragments, error) {
shardWidth = pilosa.DefaultShardWidth
}
frags := make(fragments)
for fname, rowIDs := range b.rowIDs {
for i, rowIDs := range b.rowIDs {
if len(rowIDs) == 0 {
continue // this can happen when the values that came in for this field were string slices
}
field := b.headerMap[fname]
field := b.header[i]
opts := field.Opts()
curShard := ^uint64(0) // impossible sentinel value for shard.
var curBM *roaring.Bitmap
Expand All @@ -551,7 +552,7 @@ func (b *Batch) makeFragments() (fragments, error) {
}
if col/shardWidth != curShard {
curShard = col / shardWidth
curBM = frags.GetOrCreate(curShard, fname, "")
curBM = frags.GetOrCreate(curShard, field.Name(), "")
}
// TODO this is super ugly, but we want to avoid setting
// bits on the standard view in the specific case when
Expand All @@ -567,7 +568,7 @@ func (b *Batch) makeFragments() (fragments, error) {
return nil, errors.Wrap(err, "calculating views")
}
for _, view := range views {
tbm := frags.GetOrCreate(curShard, fname, view)
tbm := frags.GetOrCreate(curShard, field.Name(), view)
tbm.DirectAdd(row*shardWidth + (col % shardWidth))
}
}
Expand Down Expand Up @@ -694,11 +695,12 @@ func (b *Batch) importValueData() error {
func (b *Batch) reset() {
b.ids = b.ids[:0]
b.times = b.times[:0]
for fieldName, rowIDs := range b.rowIDs {
b.rowIDs[fieldName] = rowIDs[:0]
for i, rowIDs := range b.rowIDs {
fieldName := b.header[i].Name()
b.rowIDs[i] = rowIDs[:0]
rowIDSet := b.rowIDSets[fieldName]
b.rowIDSets[fieldName] = rowIDSet[:0]
m := b.toTranslate[fieldName]
m := b.toTranslate[i]
for k := range m {
delete(m, k) // TODO pool these slices
}
Expand Down
48 changes: 32 additions & 16 deletions gpexp/importbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ func TestBatches(t *testing.T) {

}

if len(b.toTranslate["zero"]) != 2 {
if len(b.toTranslate[0]) != 2 {
t.Fatalf("wrong number of keys in toTranslate[0]")
}
for k, ints := range b.toTranslate["zero"] {
for k, ints := range b.toTranslate[0] {
if k == "a" {
if !reflect.DeepEqual(ints, []int{0, 2, 4, 6}) {
t.Fatalf("wrong ints for key a in field zero: %v", ints)
Expand All @@ -262,10 +262,10 @@ func TestBatches(t *testing.T) {
t.Fatalf("unexpected nullIndices: %v", b.nullIndices["three"])
}

if len(b.toTranslate["one"]) != 2 {
t.Fatalf("wrong number of keys in toTranslate[\"one\"]")
if len(b.toTranslate[1]) != 2 {
t.Fatalf("wrong number of keys in toTranslate[1]")
}
for k, ints := range b.toTranslate["one"] {
for k, ints := range b.toTranslate[1] {
if k == "b" {
if !reflect.DeepEqual(ints, []int{0, 2, 4, 6, 8}) {
t.Fatalf("wrong ints for key b in field one: %v", ints)
Expand All @@ -280,10 +280,10 @@ func TestBatches(t *testing.T) {
}
}

if len(b.toTranslate["two"]) != 2 {
if len(b.toTranslate[2]) != 2 {
t.Fatalf("wrong number of keys in toTranslate[2]")
}
for k, ints := range b.toTranslate["two"] {
for k, ints := range b.toTranslate[2] {
if k == "c" {
if !reflect.DeepEqual(ints, []int{0, 2, 4, 6, 8}) {
t.Fatalf("wrong ints for key c in field two: %v", ints)
Expand Down Expand Up @@ -317,21 +317,25 @@ func TestBatches(t *testing.T) {
t.Fatalf("doing translation: %v", err)
}

for fname, rowIDs := range b.rowIDs {
for fidx, rowIDs := range b.rowIDs {
// we don't know which key will get translated first, but we do know the pattern
if fname == "zero" {
if fidx == 0 {
if !reflect.DeepEqual(rowIDs, []uint64{1, 2, 1, 2, 1, 2, 1, 2, nilSentinel, nilSentinel}) &&
!reflect.DeepEqual(rowIDs, []uint64{2, 1, 2, 1, 2, 1, 2, 1, nilSentinel, nilSentinel}) {
t.Fatalf("unexpected row ids for field %s: %v", fname, rowIDs)
t.Fatalf("unexpected row ids for field %d: %v", fidx, rowIDs)
}

} else if fname == "four" {
} else if fidx == 4 {
if !reflect.DeepEqual(rowIDs, []uint64{1, 1, 1, 1, 1, 1, 1, 1, nilSentinel, nilSentinel}) {
t.Fatalf("unexpected rowids for time field")
}
} else if fidx == 3 {
if len(rowIDs) != 0 {
t.Fatalf("expected no rowIDs for int field, but got: %v", rowIDs)
}
} else {
if !reflect.DeepEqual(rowIDs, []uint64{1, 2, 1, 2, 1, 2, 1, 2, 1, 1}) && !reflect.DeepEqual(rowIDs, []uint64{2, 1, 2, 1, 2, 1, 2, 1, 2, 2}) {
t.Fatalf("unexpected row ids for field %s: %v", fname, rowIDs)
t.Fatalf("unexpected row ids for field %d: %v", fidx, rowIDs)
}
}
}
Expand Down Expand Up @@ -378,10 +382,16 @@ func TestBatches(t *testing.T) {
t.Fatalf("doing import: %v", err)
}

for fname, rowIDs := range b.rowIDs {
for fidx, rowIDs := range b.rowIDs {
if fidx == 3 {
if len(rowIDs) != 0 {
t.Fatalf("expected no rowIDs for int field, but got: %v", rowIDs)
}
continue
}
// we don't know which key will get translated first, but we do know the pattern
if !reflect.DeepEqual(rowIDs, []uint64{1, 2, 1, 2, 1, 2, 1, 2, 1, 2}) && !reflect.DeepEqual(rowIDs, []uint64{2, 1, 2, 1, 2, 1, 2, 1, 2, 1}) {
t.Fatalf("unexpected row ids for field %s: %v", fname, rowIDs)
t.Fatalf("unexpected row ids for field %d: %v", fidx, rowIDs)
}
}

Expand Down Expand Up @@ -421,10 +431,16 @@ func TestBatches(t *testing.T) {
t.Fatalf("doing import: %v", err)
}

for fname, rowIDs := range b.rowIDs {
for fidx, rowIDs := range b.rowIDs {
// we don't know which key will get translated first, but we do know the pattern
if fidx == 3 {
if len(rowIDs) != 0 {
t.Fatalf("expected no rowIDs for int field, but got: %v", rowIDs)
}
continue
}
if !reflect.DeepEqual(rowIDs, []uint64{3, 4, 3, 4, 3, 4, 3, 4, 3, 4}) && !reflect.DeepEqual(rowIDs, []uint64{4, 3, 4, 3, 4, 3, 4, 3, 4, 3}) {
t.Fatalf("unexpected row ids for field %s: %v", fname, rowIDs)
t.Fatalf("unexpected row ids for field %d: %v", fidx, rowIDs)
}
}

Expand Down

0 comments on commit 1d062fd

Please sign in to comment.