Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
test and fix clearValues bug
Browse files Browse the repository at this point in the history
The test demonstrates how a nil integer value added to a Batch would
clear an existing integer value instead of ignoring it. This fixes it
by tracking which records have nil integer values for each field, and
removing those ids/values before importing.

In order to be able to execute import requests concurrently, while
re-using the same slices for ids and values for each import, we expose
encoding separately from importing in the Client, so that we can
encode serially reusing the same slices, and import the encoded data
concurrently.
  • Loading branch information
jaffee committed Sep 10, 2019
1 parent e40d84a commit 2dfc4b7
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 90 deletions.
30 changes: 26 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,20 +730,42 @@ func (c *Client) importValues(field *Field,
// index,field,shard on all nodes which should hold that shard. It
// assumes that the ids have been translated from keys if necessary
// and so tells Pilosa to ignore checking if the index uses column
// keys.
// keys. ImportValues wraps EncodeImportValues and DoImportValues —
// these are broken out and exported so that performance conscious
// users can re-use the same vals and ids byte buffers for local
// encoding, while performing the imports concurrently.
func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error {
path, data, err := c.EncodeImportValues(index, field, shard, vals, ids, clear)
if err != nil {
return errors.Wrap(err, "encoding import-values request")
}
err = c.DoImportValues(index, shard, path, data)
return errors.Wrap(err, "doing import values")
}

// EncodeImportValues computes the HTTP path and payload for an
// import-values request. It is typically followed by a call to
// DoImportValues.
func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) (path string, data []byte, err error) {
msg := &pbuf.ImportValueRequest{
Index: index,
Field: field,
Shard: shard,
ColumnIDs: ids,
Values: vals,
}
data, err := proto.Marshal(msg)
data, err = proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "marshaling to protobuf")
return "", nil, errors.Wrap(err, "marshaling to protobuf")
}
path := fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
return path, data, nil
}

// DoImportValues takes a path and data payload (normally from
// EncodeImportValues), logs the import, finds all nodes which own
// this shard, and concurrently imports to those nodes.
func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error {
c.logImport(index, path, shard, false, data)

uris, err := c.getURIsForShard(index, shard)
Expand Down
128 changes: 44 additions & 84 deletions gpexp/importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Batch struct {
// ids is a slice of length batchSize of record IDs
ids []uint64

// rowIDs is a slice of length len(Batch.header) which contains slices of length batchSize
// rowIDs is a map of field names to slices of length batchSize
// which contain row IDs.
rowIDs map[string][]uint64

// values holds the values for each record of an int field
Expand All @@ -70,19 +71,11 @@ type Batch struct {
// times holds a time for each record. (if any of the fields are time fields)
times []QuantizedTime

// clearValues holds a slice of indices into b.ids for each
// integer field which has nil values. After translation, these
// slices will be filled out with the actual column IDs those
// indices pertain to so that they can be cleared.
//
// TODO: This is actually a problem — a nil value doesn't mean
// "clear this value", it should mean "don't touch this value", so
// there is no way currently to update a record with int values
// without knowing all the int values, clearing them, or setting
// them to something else in the process.
clearValues map[string][]uint64
// nullIndices holds a slice of indices into b.ids for each
// integer field which has nil values.
nullIndices map[string][]uint64

// TODO, support timestamps, set fields with more than one value per record, mutex, and bool.
// TODO support mutex and bool fields.

// for each field, keep a map of key to which record indexes that key mapped to
toTranslate map[string]map[string][]int
Expand Down Expand Up @@ -147,7 +140,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
ids: make([]uint64, 0, size),
rowIDs: rowIDs,
values: values,
clearValues: make(map[string][]uint64),
nullIndices: make(map[string][]uint64),
toTranslate: tt,
toTranslateID: make(map[string][]int),
transCache: NewMapTranslator(),
Expand All @@ -164,11 +157,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi
return b, nil
}

// Row represents a single record which can be added to a RecordBatch.
//
// Note: it is not named "Record" because there is a conflict with
// another type in this package. This may be rectified by deprecating
// something or splitting packages in the future.
// Row represents a single record which can be added to a Batch.
type Row struct {
ID interface{}
Values []interface{}
Expand Down Expand Up @@ -316,12 +305,12 @@ func (b *Batch) Add(rec Row) error {
case nil:
if field.Opts().Type() == pilosa.FieldTypeInt {
b.values[field.Name()] = append(b.values[field.Name()], 0)
clearIndexes, ok := b.clearValues[field.Name()]
nullIndices, ok := b.nullIndices[field.Name()]
if !ok {
clearIndexes = make([]uint64, 0)
nullIndices = make([]uint64, 0)
}
clearIndexes = append(clearIndexes, uint64(len(b.ids)-1))
b.clearValues[field.Name()] = clearIndexes
nullIndices = append(nullIndices, uint64(len(b.ids)-1))
b.nullIndices[field.Name()] = nullIndices

} else {
b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], nilSentinel)
Expand Down Expand Up @@ -425,11 +414,6 @@ func (b *Batch) doTranslation() error {
}
}

for _, idIndexes := range b.clearValues {
for i, index := range idIndexes {
idIndexes[i] = b.ids[index]
}
}
return nil
}

Expand Down Expand Up @@ -511,77 +495,53 @@ func (b *Batch) importValueData() error {
if shardWidth == 0 {
shardWidth = pilosa.DefaultShardWidth
}

eg := errgroup.Group{}
curShard := b.ids[0] / shardWidth
startIdx := 0
for i := 1; i <= len(b.ids); i++ {
// when i==len(b.ids) we ensure that the import logic gets run
// by making a fake shard once we're past the last ID
recordID := (curShard + 2) * shardWidth
if i < len(b.ids) {
recordID = b.ids[i]
}
if recordID/shardWidth != curShard {
endIdx := i
ids := b.ids[startIdx:endIdx]
for field, values := range b.values {
field := field
shard := curShard
vslice := values[startIdx:endIdx]
eg.Go(func() error {
err := b.client.ImportValues(b.index.Name(), field, shard, vslice, ids, false)
return errors.Wrapf(err, "importing values for %s", field)
})
}
startIdx = i
curShard = recordID / shardWidth
}
}

err := eg.Wait()
if err != nil {
return errors.Wrap(err, "importing value data")
}

// Now we clear any values for which we got a nil.
//
// TODO we need an endpoint which lets us set and clear
// transactionally... this is kind of a hack.
maxLen := 0
for _, ids := range b.clearValues {
if len(ids) > maxLen {
maxLen = len(ids)
ids := make([]uint64, len(b.ids))
for field, values := range b.values {
// grow our temp ids slice to full length
ids = ids[:len(b.ids)]
// copy orig ids back in
copy(ids, b.ids)

// trim out null values from ids and values.
nullIndices := b.nullIndices[field]
for i, nullIndex := range nullIndices {
nullIndex -= uint64(i) // offset the index by the number of items removed so far
ids = append(ids[:nullIndex], ids[nullIndex+1:]...)
values = append(values[:nullIndex], values[nullIndex+1:]...)
}
}
eg = errgroup.Group{}
values := make([]int64, 0, maxLen)
for field, ids := range b.clearValues {
// TODO maybe sort ids here
curShard := b.ids[0] / shardWidth

// now do imports by shard
curShard := ids[0] / shardWidth
startIdx := 0
for i := 1; i <= len(ids); i++ {
recordID := (curShard + 2) * shardWidth
var recordID uint64
if i < len(ids) {
recordID = b.ids[i]
recordID = ids[i]
} else {
recordID = (curShard + 2) * shardWidth
}

if recordID/shardWidth != curShard {
endIdx := i
idSlice := ids[startIdx:endIdx]
values := values[:len(idSlice)]
field := field
shard := curShard
field := field
path, data, err := b.client.EncodeImportValues(b.index.Name(), field, shard, values[startIdx:endIdx], ids[startIdx:endIdx], false)
if err != nil {
return errors.Wrap(err, "encoding import values")
}
eg.Go(func() error {
err := b.client.ImportValues(b.index.Name(), field, shard, values, idSlice, true)
return errors.Wrap(err, "clearing values")
err := b.client.DoImportValues(b.index.Name(), shard, path, data)
return errors.Wrapf(err, "importing values for %s", field)
})
startIdx = i
curShard = recordID / shardWidth
}
}
}

return errors.Wrap(eg.Wait(), "importing clear value data")
err := eg.Wait()
return errors.Wrap(err, "importing value data")
}

// reset is called at the end of importing to ready the batch for the
Expand All @@ -602,8 +562,8 @@ func (b *Batch) reset() {
for k := range b.values {
delete(b.values, k) // TODO pool these slices
}
for k := range b.clearValues {
delete(b.clearValues, k) // TODO pool these slices
for k := range b.nullIndices {
delete(b.nullIndices, k) // TODO pool these slices
}
}

Expand Down
64 changes: 62 additions & 2 deletions gpexp/importbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,66 @@ import (

// TODO test against cluster

func TestImportBatchInts(t *testing.T) {
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
idx := schema.Index("gopilosatest-blah")
field := idx.Field("anint", pilosa.OptFieldTypeInt())
err := client.SyncSchema(schema)
if err != nil {
t.Fatalf("syncing schema: %v", err)
}

b, err := NewBatch(client, 3, idx, []*pilosa.Field{field})
if err != nil {
t.Fatalf("getting batch: %v", err)
}

r := Row{Values: make([]interface{}, 1)}

for i := uint64(0); i < 3; i++ {
r.ID = i
r.Values[0] = int64(i)
err := b.Add(r)
if err != nil && err != ErrBatchNowFull {
t.Fatalf("adding to batch: %v", err)
}
}
err = b.Import()
if err != nil {
t.Fatalf("importing: %v", err)
}

r.ID = uint64(0)
r.Values[0] = nil
err = b.Add(r)
if err != nil {
t.Fatalf("adding after import: %v", err)
}
r.ID = uint64(1)
r.Values[0] = int64(7)
err = b.Add(r)
if err != nil {
t.Fatalf("adding second after import: %v", err)
}

err = b.Import()
if err != nil {
t.Fatalf("second import: %v", err)
}

resp, err := client.Query(idx.BatchQuery(field.Equals(0), field.Equals(7), field.Equals(2)))
if err != nil {
t.Fatalf("querying: %v", err)
}

for i, result := range resp.Results() {
if !reflect.DeepEqual(result.Row().Columns, []uint64{uint64(i)}) {
t.Errorf("expected %v for %d, but got %v", []uint64{uint64(i)}, i, result.Row().Columns)
}
}
}

func TestBatches(t *testing.T) {
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
Expand Down Expand Up @@ -90,8 +150,8 @@ func TestBatches(t *testing.T) {
if !reflect.DeepEqual(b.values["three"], []int64{99, -10, 99, -10, 99, -10, 99, -10, 0}) {
t.Fatalf("unexpected values: %v", b.values["three"])
}
if !reflect.DeepEqual(b.clearValues["three"], []uint64{8}) {
t.Fatalf("unexpected clearValues: %v", b.clearValues["three"])
if !reflect.DeepEqual(b.nullIndices["three"], []uint64{8}) {
t.Fatalf("unexpected nullIndices: %v", b.nullIndices["three"])
}

if len(b.toTranslate["one"]) != 2 {
Expand Down

0 comments on commit 2dfc4b7

Please sign in to comment.