Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
move batch stuff to subpackage
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 6d88b17 commit 994cf36
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 31 deletions.
7 changes: 4 additions & 3 deletions cmd/picsv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/jaffee/commandeer"
"github.com/pilosa/go-pilosa"
"github.com/pilosa/go-pilosa/gpexp"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -87,11 +88,11 @@ func (m *Main) Run() error {

// this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema
client.SyncSchema(schema)
batch, err := pilosa.NewBatch(client, m.BatchSize, index, fields)
batch, err := gpexp.NewBatch(client, m.BatchSize, index, fields)
if err != nil {
return errors.Wrap(err, "getting new batch")
}
record := pilosa.Row{
record := gpexp.Row{
Values: make([]interface{}, len(header)),
}

Expand All @@ -107,7 +108,7 @@ func (m *Main) Run() error {
}
}
err := batch.Add(record)
if err == pilosa.ErrBatchNowFull {
if err == gpexp.ErrBatchNowFull {
err := batch.Import()
if err != nil {
return errors.Wrap(err, "importing")
Expand Down
29 changes: 15 additions & 14 deletions importbatch.go → gpexp/importbatch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pilosa
package gpexp

import (
"github.com/pilosa/go-pilosa"
"github.com/pilosa/pilosa/roaring"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -50,10 +51,10 @@ type RecordBatch interface {
//
// nil values are ignored.
type Batch struct {
client *Client
index *Index
header []*Field
headerMap map[string]*Field
client *pilosa.Client
index *pilosa.Index
header []*pilosa.Field
headerMap map[string]*pilosa.Field

// ids is a slice of length batchSize of record IDs
ids []uint64
Expand Down Expand Up @@ -101,24 +102,24 @@ func OptTranslator(t Translator) BatchOption {
// before returning ErrBatchNowFull. The positions of the Fields in
// 'fields' correspond to the positions of values in the Row's Values
// passed to Batch.Add().
func NewBatch(client *Client, size int, index *Index, fields []*Field, opts ...BatchOption) (*Batch, error) {
func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pilosa.Field, opts ...BatchOption) (*Batch, error) {
if len(fields) == 0 || size == 0 {
return nil, errors.New("can't batch with no fields or batch size")
}
headerMap := make(map[string]*Field, len(fields))
headerMap := make(map[string]*pilosa.Field, len(fields))
rowIDs := make(map[string][]uint64)
values := make(map[string][]int64)
tt := make(map[string]map[string][]int)
for _, field := range fields {
headerMap[field.Name()] = field
opts := field.Opts()
switch opts.Type() {
case FieldTypeDefault, FieldTypeSet:
case pilosa.FieldTypeDefault, pilosa.FieldTypeSet:
if opts.Keys() {
tt[field.Name()] = make(map[string][]int)
}
rowIDs[field.Name()] = make([]uint64, 0, size)
case FieldTypeInt:
case pilosa.FieldTypeInt:
values[field.Name()] = make([]int64, 0, size)
}
}
Expand Down Expand Up @@ -211,7 +212,7 @@ func (b *Batch) Add(rec Row) error {
case int64:
b.values[field.Name()] = append(b.values[field.Name()], val)
case nil:
if field.Opts().Type() == FieldTypeInt {
if field.Opts().Type() == pilosa.FieldTypeInt {
b.values[field.Name()] = append(b.values[field.Name()], 0)
clearIndexes, ok := b.clearValues[field.Name()]
if !ok {
Expand Down Expand Up @@ -357,9 +358,9 @@ func (b *Batch) doImport() error {
var nilSentinel = ^uint64(0)

func (b *Batch) makeFragments() fragments {
shardWidth := b.index.shardWidth
shardWidth := b.index.ShardWidth()
if shardWidth == 0 {
shardWidth = DefaultShardWidth
shardWidth = pilosa.DefaultShardWidth
}
frags := make(fragments)
for fname, rowIDs := range b.rowIDs {
Expand All @@ -381,9 +382,9 @@ func (b *Batch) makeFragments() fragments {
}

func (b *Batch) importValueData() error {
shardWidth := b.index.shardWidth
shardWidth := b.index.ShardWidth()
if shardWidth == 0 {
shardWidth = DefaultShardWidth
shardWidth = pilosa.DefaultShardWidth
}

eg := errgroup.Group{}
Expand Down
27 changes: 14 additions & 13 deletions importbatch_test.go → gpexp/importbatch_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package pilosa
package gpexp

import (
"reflect"
"strconv"
"testing"

"github.com/pilosa/go-pilosa"
"github.com/pkg/errors"
)

// TODO test against cluster

func TestBatches(t *testing.T) {
client := DefaultClient()
schema := NewSchema()
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
idx := schema.Index("gopilosatest-blah")
fields := make([]*Field, 4)
fields[0] = idx.Field("zero", OptFieldKeys(true))
fields[1] = idx.Field("one", OptFieldKeys(true))
fields[2] = idx.Field("two", OptFieldKeys(true))
fields[3] = idx.Field("three", OptFieldTypeInt())
fields := make([]*pilosa.Field, 4)
fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true))
fields[1] = idx.Field("one", pilosa.OptFieldKeys(true))
fields[2] = idx.Field("two", pilosa.OptFieldKeys(true))
fields[3] = idx.Field("three", pilosa.OptFieldTypeInt())
err := client.SyncSchema(schema)
if err != nil {
t.Fatalf("syncing schema: %v", err)
Expand Down Expand Up @@ -317,11 +318,11 @@ func TestBatches(t *testing.T) {
}

func TestBatchesStringIDs(t *testing.T) {
client := DefaultClient()
schema := NewSchema()
idx := schema.Index("gopilosatest-blah", OptIndexKeys(true))
fields := make([]*Field, 1)
fields[0] = idx.Field("zero", OptFieldKeys(true))
client := pilosa.DefaultClient()
schema := pilosa.NewSchema()
idx := schema.Index("gopilosatest-blah", pilosa.OptIndexKeys(true))
fields := make([]*pilosa.Field, 1)
fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true))
err := client.SyncSchema(schema)
if err != nil {
t.Fatalf("syncing schema: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion translator.go → gpexp/translator.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pilosa
package gpexp

type Translator interface {
GetCol(index, key string) (uint64, bool, error)
Expand Down
4 changes: 4 additions & 0 deletions orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ func NewIndex(name string) *Index {
}
}

func (idx *Index) ShardWidth() uint64 {
return idx.shardWidth
}

// Fields return a copy of the fields in this index
func (idx *Index) Fields() map[string]*Field {
result := make(map[string]*Field)
Expand Down

0 comments on commit 994cf36

Please sign in to comment.