Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
documentation for batch importer
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 802f882 commit 9634954
Showing 1 changed file with 57 additions and 5 deletions.
62 changes: 57 additions & 5 deletions importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,39 @@ import (
// but perhaps we could have a special type indicating that a bit or
// value should explicitly be cleared?

// RecordBatch is a Pilosa ingest interface designed to allow for
// maximum throughput on common workloads. Users should call Add()
// with a Row object until it returns ErrBatchNowFull, at which time
// they should call Import(), and then repeat.
//
// Add will not modify or otherwise retain the Row once it returns, so
// it is recommended that callers reuse the same Row with repeated
// calls to Add, just modifying its values appropriately in between
// calls. This avoids allocating a new slice of Values for each
// inserted Row.
//
// The supported types of the values in Row.Values are implementation
// defined. Similarly, the supported types for Row.ID are
// implementation defined.
type RecordBatch interface {
Add(Row) error
Import() error
}

// Batch implements RecordBatch.
//
// It supports Values of type string, uint64, int64, or nil. The
// following table describes what Pilosa field each type of value must
// map to. Fields are set up when calling "NewBatch".
//
// | type | pilosa field type | options |
// |--------+-------------------+-----------|
// | string | set | keys=true |
// | uint64 | set | any |
// | int64 | int | any |
// | nil | any | |
//
// nil values are ignored.
type Batch struct {
client *Client
index *Index
Expand All @@ -31,10 +64,10 @@ type Batch struct {
// values holds the values for each record of an int field
values map[string][]int64

// clearValues holds a slice of indexes into b.ids for each
// clearValues holds a slice of indices into b.ids for each
// integer field which has nil values. After translation, these
// slices will be filled out with the actual column IDs those
// indexes pertain to so that they can be cleared.
// indices pertain to so that they can be cleared.
clearValues map[string][]uint64

// TODO, support timestamps, set fields with more than one value per record, mutex, and bool.
Expand All @@ -52,6 +85,11 @@ type Batch struct {
toTranslateID map[string][]int
}

// NewBatch initializes a new Batch object which will use the given
// Pilosa client, index, set of fields, and will take "size" records
// before returning ErrBatchNowFull. The positions of the Fields in
// 'fields' correspond to the positions of values in the Row's Values
// passed to Batch.Add().
func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
if len(fields) == 0 || size == 0 {
panic("can't batch with no fields or batch size")
Expand Down Expand Up @@ -87,6 +125,11 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
}
}

// Row represents a single record which can be added to a RecordBatch.
//
// Note: it is not named "Record" because there is a conflict with
//another type in this package. This may be rectified by deprecating
//something or splitting packages in the future.
type Row struct {
ID interface{}
Values []interface{}
Expand Down Expand Up @@ -167,17 +210,27 @@ func (b *Batch) Add(rec Row) error {
return nil
}

// ErrBatchNowFull, similar to io.EOF, is a marker error to notify the
// user of a batch that it is time to call Import.
var ErrBatchNowFull = errors.New("batch is now full - you cannot add any more records (though the one you just added was accepted)")

// ErrBatchAlreadyFull is a real error saying that Batch.Add did not
// complete because the batch was full.
var ErrBatchAlreadyFull = errors.New("batch was already full, record was rejected")

// Import does all necessary key translation and then imports the
// batch data into Pilosa. It readies itself for the next set of
// records by clearing internal structures without releasing the
// associated memory.
func (b *Batch) Import() error {
// first we need to translate the toTranslate, then fill out the missing row IDs
err := b.doTranslation()
if err != nil {
return errors.Wrap(err, "doing Translation")
}

// create bitmaps out of each field in b.rowIDs and import
// create bitmaps out of each field in b.rowIDs and import. Also
// import int data.
err = b.doImport()
if err != nil {
return errors.Wrap(err, "doing import")
Expand Down Expand Up @@ -242,11 +295,10 @@ func (b *Batch) doTranslation() error {
}
}

for field, idIndexes := range b.clearValues {
for _, idIndexes := range b.clearValues {
for i, index := range idIndexes {
idIndexes[i] = b.ids[index]
}
b.clearValues[field] = idIndexes // TODO this line should be unnecessary?? If it is necessary, is it OK to modify b.clearValues while iterating over it?
}
return nil
}
Expand Down

0 comments on commit 9634954

Please sign in to comment.