From 994cf36c6e2379624c1a091e594810166b95d156 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Tue, 27 Aug 2019 22:28:09 -0500 Subject: [PATCH] move batch stuff to subpackage --- cmd/picsv/main.go | 7 +++-- importbatch.go => gpexp/importbatch.go | 29 ++++++++++--------- .../importbatch_test.go | 27 ++++++++--------- translator.go => gpexp/translator.go | 2 +- orm.go | 4 +++ 5 files changed, 38 insertions(+), 31 deletions(-) rename importbatch.go => gpexp/importbatch.go (95%) rename importbatch_test.go => gpexp/importbatch_test.go (95%) rename translator.go => gpexp/translator.go (99%) diff --git a/cmd/picsv/main.go b/cmd/picsv/main.go index 7fbc662..ba8b928 100644 --- a/cmd/picsv/main.go +++ b/cmd/picsv/main.go @@ -12,6 +12,7 @@ import ( "github.com/jaffee/commandeer" "github.com/pilosa/go-pilosa" + "github.com/pilosa/go-pilosa/gpexp" "github.com/pkg/errors" ) @@ -87,11 +88,11 @@ func (m *Main) Run() error { // this has a non-obvious dependence on the previous line... the fields are set up in the index which comes from the schema client.SyncSchema(schema) - batch, err := pilosa.NewBatch(client, m.BatchSize, index, fields) + batch, err := gpexp.NewBatch(client, m.BatchSize, index, fields) if err != nil { return errors.Wrap(err, "getting new batch") } - record := pilosa.Row{ + record := gpexp.Row{ Values: make([]interface{}, len(header)), } @@ -107,7 +108,7 @@ func (m *Main) Run() error { } } err := batch.Add(record) - if err == pilosa.ErrBatchNowFull { + if err == gpexp.ErrBatchNowFull { err := batch.Import() if err != nil { return errors.Wrap(err, "importing") diff --git a/importbatch.go b/gpexp/importbatch.go similarity index 95% rename from importbatch.go rename to gpexp/importbatch.go index 8b53710..d4a1150 100644 --- a/importbatch.go +++ b/gpexp/importbatch.go @@ -1,6 +1,7 @@ -package pilosa +package gpexp import ( + "github.com/pilosa/go-pilosa" "github.com/pilosa/pilosa/roaring" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -50,10 +51,10 @@ type RecordBatch interface { // // nil values are ignored. type Batch struct { - client *Client - index *Index - header []*Field - headerMap map[string]*Field + client *pilosa.Client + index *pilosa.Index + header []*pilosa.Field + headerMap map[string]*pilosa.Field // ids is a slice of length batchSize of record IDs ids []uint64 @@ -101,11 +102,11 @@ func OptTranslator(t Translator) BatchOption { // before returning ErrBatchNowFull. The positions of the Fields in // 'fields' correspond to the positions of values in the Row's Values // passed to Batch.Add(). -func NewBatch(client *Client, size int, index *Index, fields []*Field, opts ...BatchOption) (*Batch, error) { +func NewBatch(client *pilosa.Client, size int, index *pilosa.Index, fields []*pilosa.Field, opts ...BatchOption) (*Batch, error) { if len(fields) == 0 || size == 0 { return nil, errors.New("can't batch with no fields or batch size") } - headerMap := make(map[string]*Field, len(fields)) + headerMap := make(map[string]*pilosa.Field, len(fields)) rowIDs := make(map[string][]uint64) values := make(map[string][]int64) tt := make(map[string]map[string][]int) @@ -113,12 +114,12 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field, opts ...B headerMap[field.Name()] = field opts := field.Opts() switch opts.Type() { - case FieldTypeDefault, FieldTypeSet: + case pilosa.FieldTypeDefault, pilosa.FieldTypeSet: if opts.Keys() { tt[field.Name()] = make(map[string][]int) } rowIDs[field.Name()] = make([]uint64, 0, size) - case FieldTypeInt: + case pilosa.FieldTypeInt: values[field.Name()] = make([]int64, 0, size) } } @@ -211,7 +212,7 @@ func (b *Batch) Add(rec Row) error { case int64: b.values[field.Name()] = append(b.values[field.Name()], val) case nil: - if field.Opts().Type() == FieldTypeInt { + if field.Opts().Type() == pilosa.FieldTypeInt { b.values[field.Name()] = append(b.values[field.Name()], 0) clearIndexes, ok := b.clearValues[field.Name()] if !ok { @@ -357,9 +358,9 @@ func (b *Batch) doImport() error { var nilSentinel = ^uint64(0) func (b *Batch) makeFragments() fragments { - shardWidth := b.index.shardWidth + shardWidth := b.index.ShardWidth() if shardWidth == 0 { - shardWidth = DefaultShardWidth + shardWidth = pilosa.DefaultShardWidth } frags := make(fragments) for fname, rowIDs := range b.rowIDs { @@ -381,9 +382,9 @@ func (b *Batch) makeFragments() fragments { } func (b *Batch) importValueData() error { - shardWidth := b.index.shardWidth + shardWidth := b.index.ShardWidth() if shardWidth == 0 { - shardWidth = DefaultShardWidth + shardWidth = pilosa.DefaultShardWidth } eg := errgroup.Group{} diff --git a/importbatch_test.go b/gpexp/importbatch_test.go similarity index 95% rename from importbatch_test.go rename to gpexp/importbatch_test.go index 929ab49..f9ac887 100644 --- a/importbatch_test.go +++ b/gpexp/importbatch_test.go @@ -1,24 +1,25 @@ -package pilosa +package gpexp import ( "reflect" "strconv" "testing" + "github.com/pilosa/go-pilosa" "github.com/pkg/errors" ) // TODO test against cluster func TestBatches(t *testing.T) { - client := DefaultClient() - schema := NewSchema() + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() idx := schema.Index("gopilosatest-blah") - fields := make([]*Field, 4) - fields[0] = idx.Field("zero", OptFieldKeys(true)) - fields[1] = idx.Field("one", OptFieldKeys(true)) - fields[2] = idx.Field("two", OptFieldKeys(true)) - fields[3] = idx.Field("three", OptFieldTypeInt()) + fields := make([]*pilosa.Field, 4) + fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true)) + fields[1] = idx.Field("one", pilosa.OptFieldKeys(true)) + fields[2] = idx.Field("two", pilosa.OptFieldKeys(true)) + fields[3] = idx.Field("three", pilosa.OptFieldTypeInt()) err := client.SyncSchema(schema) if err != nil { t.Fatalf("syncing schema: %v", err) @@ -317,11 +318,11 @@ func TestBatches(t *testing.T) { } func TestBatchesStringIDs(t *testing.T) { - client := DefaultClient() - schema := NewSchema() - idx := schema.Index("gopilosatest-blah", OptIndexKeys(true)) - fields := make([]*Field, 1) - fields[0] = idx.Field("zero", OptFieldKeys(true)) + client := pilosa.DefaultClient() + schema := pilosa.NewSchema() + idx := schema.Index("gopilosatest-blah", pilosa.OptIndexKeys(true)) + fields := make([]*pilosa.Field, 1) + fields[0] = idx.Field("zero", pilosa.OptFieldKeys(true)) err := client.SyncSchema(schema) if err != nil { t.Fatalf("syncing schema: %v", err) diff --git a/translator.go b/gpexp/translator.go similarity index 99% rename from translator.go rename to gpexp/translator.go index aef126c..74785e5 100644 --- a/translator.go +++ b/gpexp/translator.go @@ -1,4 +1,4 @@ -package pilosa +package gpexp type Translator interface { GetCol(index, key string) (uint64, bool, error) diff --git a/orm.go b/orm.go index 465f74e..7dc3480 100644 --- a/orm.go +++ b/orm.go @@ -412,6 +412,10 @@ func NewIndex(name string) *Index { } } +func (idx *Index) ShardWidth() uint64 { + return idx.shardWidth +} + // Fields return a copy of the fields in this index func (idx *Index) Fields() map[string]*Field { result := make(map[string]*Field)