From 5bdf7f875b19d90da8c649ac0912e4bf6906ba7f Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Tue, 10 Sep 2019 13:00:53 -0500 Subject: [PATCH] test and fix clearValues bug The test demonstrates how a nil integer value added to a Batch would clear an existing integer value instead of ignoring it. This fixes it by tracking which records have nil integer values for each field, and removing those ids/values before importing. In order to be able to execute import requests concurrently, while re-using the same slices for ids and values for each import, we expose encoding separately from importing in the Client, so that we can encode serially reusing the same slices, and import the encoded data concurrently. --- client.go | 30 +++++++-- gpexp/importbatch.go | 126 +++++++++++++------------------------- gpexp/importbatch_test.go | 64 ++++++++++++++++++- 3 files changed, 131 insertions(+), 89 deletions(-) diff --git a/client.go b/client.go index 5d84220..f20c3a4 100644 --- a/client.go +++ b/client.go @@ -730,8 +730,23 @@ func (c *Client) importValues(field *Field, // index,field,shard on all nodes which should hold that shard. It // assumes that the ids have been translated from keys if necessary // and so tells Pilosa to ignore checking if the index uses column -// keys. +// keys. ImportValues wraps EncodeImportValues and DoImportValues — +// these are broken out and exported so that performance conscious +// users can re-use the same vals and ids byte buffers for local +// encoding, while performing the imports concurrently. func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) error { + path, data, err := c.EncodeImportValues(index, field, shard, vals, ids, clear) + if err != nil { + return errors.Wrap(err, "encoding import-values request") + } + err = c.DoImportValues(index, shard, path, data) + return errors.Wrap(err, "doing import values") +} + +// EncodeImportValues computes the HTTP path and payload for an +// import-values request. It is typically followed by a call to +// DoImportValues. +func (c *Client) EncodeImportValues(index, field string, shard uint64, vals []int64, ids []uint64, clear bool) (path string, data []byte, err error) { msg := &pbuf.ImportValueRequest{ Index: index, Field: field, @@ -739,11 +754,18 @@ func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, i ColumnIDs: ids, Values: vals, } - data, err := proto.Marshal(msg) + data, err = proto.Marshal(msg) if err != nil { - return errors.Wrap(err, "marshaling to protobuf") + return "", nil, errors.Wrap(err, "marshaling to protobuf") } - path := fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear)) + path = fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear)) + return path, data, nil +} + +// DoImportValues takes a path and data payload (normally from +// EncodeImportValues), logs the import, finds all nodes which own +// this shard, and concurrently imports to those nodes. +func (c *Client) DoImportValues(index string, shard uint64, path string, data []byte) error { c.logImport(index, path, shard, false, data) uris, err := c.getURIsForShard(index, shard) diff --git a/gpexp/importbatch.go b/gpexp/importbatch.go index 47151c0..9b52967 100644 --- a/gpexp/importbatch.go +++ b/gpexp/importbatch.go @@ -61,7 +61,8 @@ type Batch struct { // ids is a slice of length batchSize of record IDs ids []uint64 - // rowIDs is a slice of length len(Batch.header) which contains slices of length batchSize + // rowIDs is a map of field names to slices of length batchSize + // which contain row IDs. rowIDs map[string][]uint64 // values holds the values for each record of an int field @@ -70,17 +71,9 @@ type Batch struct { // times holds a time for each record. (if any of the fields are time fields) times []QuantizedTime - // clearValues holds a slice of indices into b.ids for each - // integer field which has nil values. After translation, these - // slices will be filled out with the actual column IDs those - // indices pertain to so that they can be cleared. - // - // TODO: This is actually a problem — a nil value doesn't mean - // "clear this value", it should mean "don't touch this value", so - // there is no way currently to update a record with int values - // without knowing all the int values, clearing them, or setting - // them to something else in the process. - clearValues map[string][]uint64 + // nullIndices holds a slice of indices into b.ids for each + // integer field which has nil values. + nullIndices map[string][]uint64 // TODO, support timestamps, set fields with more than one value per record, mutex, and bool. @@ -147,7 +140,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi ids: make([]uint64, 0, size), rowIDs: rowIDs, values: values, - clearValues: make(map[string][]uint64), + nullIndices: make(map[string][]uint64), toTranslate: tt, toTranslateID: make(map[string][]int), transCache: NewMapTranslator(), @@ -164,11 +157,7 @@ func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pi return b, nil } -// Row represents a single record which can be added to a RecordBatch. -// -// Note: it is not named "Record" because there is a conflict with -// another type in this package. This may be rectified by deprecating -// something or splitting packages in the future. +// Row represents a single record which can be added to a Batch. type Row struct { ID interface{} Values []interface{} @@ -316,12 +305,12 @@ func (b *Batch) Add(rec Row) error { case nil: if field.Opts().Type() == pilosa.FieldTypeInt { b.values[field.Name()] = append(b.values[field.Name()], 0) - clearIndexes, ok := b.clearValues[field.Name()] + nullIndices, ok := b.nullIndices[field.Name()] if !ok { - clearIndexes = make([]uint64, 0) + nullIndices = make([]uint64, 0) } - clearIndexes = append(clearIndexes, uint64(len(b.ids)-1)) - b.clearValues[field.Name()] = clearIndexes + nullIndices = append(nullIndices, uint64(len(b.ids)-1)) + b.nullIndices[field.Name()] = nullIndices } else { b.rowIDs[field.Name()] = append(b.rowIDs[field.Name()], nilSentinel) @@ -425,11 +414,6 @@ func (b *Batch) doTranslation() error { } } - for _, idIndexes := range b.clearValues { - for i, index := range idIndexes { - idIndexes[i] = b.ids[index] - } - } return nil } @@ -511,77 +495,53 @@ func (b *Batch) importValueData() error { if shardWidth == 0 { shardWidth = pilosa.DefaultShardWidth } - eg := errgroup.Group{} - curShard := b.ids[0] / shardWidth - startIdx := 0 - for i := 1; i <= len(b.ids); i++ { - // when i==len(b.ids) we ensure that the import logic gets run - // by making a fake shard once we're past the last ID - recordID := (curShard + 2) * shardWidth - if i < len(b.ids) { - recordID = b.ids[i] - } - if recordID/shardWidth != curShard { - endIdx := i - ids := b.ids[startIdx:endIdx] - for field, values := range b.values { - field := field - shard := curShard - vslice := values[startIdx:endIdx] - eg.Go(func() error { - err := b.client.ImportValues(b.index.Name(), field, shard, vslice, ids, false) - return errors.Wrapf(err, "importing values for %s", field) - }) - } - startIdx = i - curShard = recordID / shardWidth - } - } - - err := eg.Wait() - if err != nil { - return errors.Wrap(err, "importing value data") - } - // Now we clear any values for which we got a nil. - // - // TODO we need an endpoint which lets us set and clear - // transactionally... this is kind of a hack. - maxLen := 0 - for _, ids := range b.clearValues { - if len(ids) > maxLen { - maxLen = len(ids) + ids := make([]uint64, len(b.ids)) + for field, values := range b.values { + // grow our temp ids slice to full length + ids = ids[:len(b.ids)] + // copy orig ids back in + copy(ids, b.ids) + + // trim out null values from ids and values. + nullIndices := b.nullIndices[field] + for i, nullIndex := range nullIndices { + nullIndex -= uint64(i) // offset the index by the number of items removed so far + ids = append(ids[:nullIndex], ids[nullIndex+1:]...) + values = append(values[:nullIndex], values[nullIndex+1:]...) } - } - eg = errgroup.Group{} - values := make([]int64, 0, maxLen) - for field, ids := range b.clearValues { - // TODO maybe sort ids here - curShard := b.ids[0] / shardWidth + + // now do imports by shard + curShard := ids[0] / shardWidth startIdx := 0 for i := 1; i <= len(ids); i++ { - recordID := (curShard + 2) * shardWidth + var recordID uint64 if i < len(ids) { - recordID = b.ids[i] + recordID = ids[i] + } else { + recordID = (curShard + 2) * shardWidth } + if recordID/shardWidth != curShard { endIdx := i - idSlice := ids[startIdx:endIdx] - values := values[:len(idSlice)] - field := field shard := curShard + field := field + path, data, err := b.client.EncodeImportValues(b.index.Name(), field, shard, values[startIdx:endIdx], ids[startIdx:endIdx], false) + if err != nil { + return errors.Wrap(err, "encoding import values") + } eg.Go(func() error { - err := b.client.ImportValues(b.index.Name(), field, shard, values, idSlice, true) - return errors.Wrap(err, "clearing values") + err := b.client.DoImportValues(b.index.Name(), shard, path, data) + return errors.Wrapf(err, "importing values for %s", field) }) startIdx = i curShard = recordID / shardWidth } } } - - return errors.Wrap(eg.Wait(), "importing clear value data") + err := eg.Wait() + return errors.Wrap(err, "importing value data") } // reset is called at the end of importing to ready the batch for the @@ -602,8 +562,8 @@ func (b *Batch) reset() { for k := range b.values { delete(b.values, k) // TODO pool these slices } - for k := range b.clearValues { - delete(b.clearValues, k) // TODO pool these slices + for k := range b.nullIndices { + delete(b.nullIndices, k) // TODO pool these slices } } diff --git a/gpexp/importbatch_test.go b/gpexp/importbatch_test.go index db94fab..6e3f6ec 100644 --- a/gpexp/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -12,6 +12,66 @@ import ( // TODO test against cluster +func TestImportBatchInts(t *testing.T) { + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("gopilosatest-blah") + field := idx.Field("anint", pilosa.OptFieldTypeInt()) + err := client.SyncSchema(schema) + if err != nil { + t.Fatalf("syncing schema: %v", err) + } + + b, err := NewBatch(client, 3, idx, []*pilosa.Field{field}) + if err != nil { + t.Fatalf("getting batch: %v", err) + } + + r := Row{Values: make([]interface{}, 1)} + + for i := uint64(0); i < 3; i++ { + r.ID = i + r.Values[0] = int64(i) + err := b.Add(r) + if err != nil && err != ErrBatchNowFull { + t.Fatalf("adding to batch: %v", err) + } + } + err = b.Import() + if err != nil { + t.Fatalf("importing: %v", err) + } + + r.ID = uint64(0) + r.Values[0] = nil + err = b.Add(r) + if err != nil { + t.Fatalf("adding after import: %v", err) + } + r.ID = uint64(1) + r.Values[0] = int64(7) + err = b.Add(r) + if err != nil { + t.Fatalf("adding second after import: %v", err) + } + + err = b.Import() + if err != nil { + t.Fatalf("second import: %v", err) + } + + resp, err := client.Query(idx.BatchQuery(field.Equals(0), field.Equals(7), field.Equals(2))) + if err != nil { + t.Fatalf("querying: %v", err) + } + + for i, result := range resp.Results() { + if !reflect.DeepEqual(result.Row().Columns, []uint64{uint64(i)}) { + t.Errorf("expected %v for %d, but got %v", []uint64{uint64(i)}, i, result.Row().Columns) + } + } +} + func TestBatches(t *testing.T) { client := pilosa.DefaultClient() schema := pilosa.NewSchema() @@ -90,8 +150,8 @@ func TestBatches(t *testing.T) { if !reflect.DeepEqual(b.values["three"], []int64{99, -10, 99, -10, 99, -10, 99, -10, 0}) { t.Fatalf("unexpected values: %v", b.values["three"]) } - if !reflect.DeepEqual(b.clearValues["three"], []uint64{8}) { - t.Fatalf("unexpected clearValues: %v", b.clearValues["three"]) + if !reflect.DeepEqual(b.nullIndices["three"], []uint64{8}) { + t.Fatalf("unexpected nullIndices: %v", b.nullIndices["three"]) } if len(b.toTranslate["one"]) != 2 {