Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
interfacify Translator, add errors and batch addition
Browse files Browse the repository at this point in the history
also add the ability to set different implementations on a Batch. This
is basically gearing up for a shared implementation using an embedded
k/v store.
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 4d7a11e commit c33123a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 33 deletions.
5 changes: 4 additions & 1 deletion cmd/picsv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func (m *Main) Run() error {

// this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema
client.SyncSchema(schema)
batch := pilosa.NewBatch(client, m.BatchSize, index, fields)
batch, err := pilosa.NewBatch(client, m.BatchSize, index, fields)
if err != nil {
return errors.Wrap(err, "getting new batch")
}
record := pilosa.Row{
Values: make([]interface{}, len(header)),
}
Expand Down
42 changes: 33 additions & 9 deletions importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,26 @@ type Batch struct {
// slice of strings would probably work better.
toTranslateID map[string][]int

transCache *Translator
transCache Translator
}

type BatchOption func(b *Batch) error

func OptTranslator(t Translator) BatchOption {
return func(b *Batch) error {
b.transCache = t
return nil
}
}

// NewBatch initializes a new Batch object which will use the given
// Pilosa client, index, set of fields, and will take "size" records
// before returning ErrBatchNowFull. The positions of the Fields in
// 'fields' correspond to the positions of values in the Row's Values
// passed to Batch.Add().
func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
func NewBatch(client *Client, size int, index *Index, fields []*Field, opts ...BatchOption) (*Batch, error) {
if len(fields) == 0 || size == 0 {
panic("can't batch with no fields or batch size")
return nil, errors.New("can't batch with no fields or batch size")
}
headerMap := make(map[string]*Field, len(fields))
rowIDs := make(map[string][]uint64)
Expand All @@ -113,7 +122,7 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
values[field.Name()] = make([]int64, 0, size)
}
}
return &Batch{
b := &Batch{
client: client,
header: fields,
headerMap: headerMap,
Expand All @@ -124,8 +133,15 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
clearValues: make(map[string][]uint64),
toTranslate: tt,
toTranslateID: make(map[string][]int),
transCache: NewTranslator(),
transCache: NewMapTranslator(),
}
for _, opt := range opts {
err := opt(b)
if err != nil {
return nil, errors.Wrap(err, "applying options")
}
}
return b, nil
}

// Row represents a single record which can be added to a RecordBatch.
Expand Down Expand Up @@ -154,7 +170,9 @@ func (b *Batch) Add(rec Row) error {
case uint64:
b.ids = append(b.ids, rid)
case string:
if colID, ok := b.transCache.GetCol(b.index.Name(), rid); ok {
if colID, ok, err := b.transCache.GetCol(b.index.Name(), rid); err != nil {
return errors.Wrap(err, "translating column")
} else if ok {
b.ids = append(b.ids, colID)
} else {
ints, ok := b.toTranslateID[rid]
Expand All @@ -175,7 +193,9 @@ func (b *Batch) Add(rec Row) error {
case string:
rowIDs := b.rowIDs[field.Name()]
// translate val and append to b.rowIDs[i]
if rowID, ok := b.transCache.GetRow(b.index.Name(), field.Name(), val); ok {
if rowID, ok, err := b.transCache.GetRow(b.index.Name(), field.Name(), val); err != nil {
return errors.Wrap(err, "translating row")
} else if ok {
b.rowIDs[field.Name()] = append(rowIDs, rowID)
} else {
ints, ok := b.toTranslate[field.Name()][val]
Expand Down Expand Up @@ -257,12 +277,14 @@ func (b *Batch) doTranslation() error {
if err != nil {
return errors.Wrap(err, "translating col keys")
}
if err := b.transCache.AddCols(b.index.Name(), keys, ids); err != nil {
return errors.Wrap(err, "adding cols to cache")
}
for j, key := range keys {
id := ids[j]
for _, recordIdx := range b.toTranslateID[key] {
b.ids[recordIdx] = id
}
b.transCache.AddCol(b.index.Name(), key, id)
}
} else {
keys = make([]string, 0)
Expand All @@ -286,6 +308,9 @@ func (b *Batch) doTranslation() error {
if err != nil {
return errors.Wrap(err, "translating row keys")
}
if err := b.transCache.AddRows(b.index.Name(), fieldName, keys, ids); err != nil {
return errors.Wrap(err, "adding rows to cache")
}

// fill out missing IDs in local batch records with translated IDs
rows := b.rowIDs[fieldName]
Expand All @@ -294,7 +319,6 @@ func (b *Batch) doTranslation() error {
for _, recordIdx := range tt[key] {
rows[recordIdx] = id
}
b.transCache.AddRow(b.index.Name(), fieldName, key, id)
}
}

Expand Down
10 changes: 8 additions & 2 deletions importbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func TestBatches(t *testing.T) {
t.Logf("problem cleaning up from test: %v", err)
}
}()
b := NewBatch(client, 10, idx, fields)
b, err := NewBatch(client, 10, idx, fields)
if err != nil {
t.Fatalf("getting new batch: %v", err)
}
r := Row{Values: make([]interface{}, 4)}

for i := 0; i < 9; i++ {
Expand Down Expand Up @@ -330,7 +333,10 @@ func TestBatchesStringIDs(t *testing.T) {
}
}()

b := NewBatch(client, 3, idx, fields)
b, err := NewBatch(client, 3, idx, fields)
if err != nil {
t.Fatalf("getting new batch: %v", err)
}

r := Row{Values: make([]interface{}, 1)}

Expand Down
59 changes: 38 additions & 21 deletions translator.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package pilosa

type Translator struct {
type Translator interface {
GetCol(index, key string) (uint64, bool, error)
GetRow(index, field, key string) (uint64, bool, error)
AddCols(index string, keys []string, values []uint64) error
AddRows(index, field string, keys []string, values []uint64) error
}

// MapTranslator implements Translator using in-memory maps. It is not
// threadsafe.
type MapTranslator struct {
indexes map[string]map[string]uint64
fields map[indexfield]map[string]uint64
}

func NewTranslator() *Translator {
return &Translator{
func NewMapTranslator() *MapTranslator {
return &MapTranslator{
indexes: make(map[string]map[string]uint64),
fields: make(map[indexfield]map[string]uint64),
}
Expand All @@ -17,38 +26,46 @@ type indexfield struct {
field string
}

func (t *Translator) GetCol(index, key string) (uint64, bool) {
func (t *MapTranslator) GetCol(index, key string) (uint64, bool, error) {
if idx, ok := t.indexes[index]; ok {
if val, ok := idx[key]; ok {
return val, true
return val, true, nil
}
}
return 0, false
return 0, false, nil
}

func (t *Translator) AddCol(index, key string, value uint64) {
idx, ok := t.indexes[index]
if !ok {
idx = make(map[string]uint64)
func (t *MapTranslator) AddCols(index string, keys []string, values []uint64) error {
for i := range keys {
key, value := keys[i], values[i]
idxMap, ok := t.indexes[index]
if !ok {
idxMap = make(map[string]uint64)
}
idxMap[key] = value
t.indexes[index] = idxMap
}
idx[key] = value
t.indexes[index] = idx
return nil
}

func (t *Translator) GetRow(index, field, key string) (uint64, bool) {
func (t *MapTranslator) GetRow(index, field, key string) (uint64, bool, error) {
if fld, ok := t.fields[indexfield{index: index, field: field}]; ok {
if val, ok := fld[key]; ok {
return val, true
return val, true, nil
}
}
return 0, false
return 0, false, nil
}

func (t *Translator) AddRow(index, field, key string, value uint64) {
keys, ok := t.fields[indexfield{index: index, field: field}]
if !ok {
keys = make(map[string]uint64)
func (t *MapTranslator) AddRows(index, field string, keys []string, values []uint64) error {
for i := range keys {
key, value := keys[i], values[i]
keyMap, ok := t.fields[indexfield{index: index, field: field}]
if !ok {
keyMap = make(map[string]uint64)
}
keyMap[key] = value
t.fields[indexfield{index: index, field: field}] = keyMap
}
keys[key] = value
t.fields[indexfield{index: index, field: field}] = keys
return nil
}

0 comments on commit c33123a

Please sign in to comment.