From 4d7a11e6efc741710934649b4538b60aee433514 Mon Sep 17 00:00:00 2001 From: Matt Jaffee Date: Tue, 27 Aug 2019 21:05:47 -0500 Subject: [PATCH] move Translator out of client to ImportBatch --- client.go | 58 ++++++++++++++++++++++---------------------------- importbatch.go | 19 ++++++++--------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/client.go b/client.go index 90bc75d..d0cc24d 100644 --- a/client.go +++ b/client.go @@ -90,44 +90,37 @@ type Client struct { importLogEncoder encoder logLock sync.Mutex - // TODO replace this with something like BoltDB. Need better - // concurrent performance, less lock contention. Persistence might - // be a nice bonus too. - tlock sync.RWMutex - translator *Translator - // TODO shardNodes needs to be invalidated/updated when cluster topology changes. shardNodes shardNodes tick *time.Ticker } -func (c *Client) translateCol(index, key string) (uint64, bool) { - c.tlock.RLock() - v, b := c.translator.GetCol(index, key) - c.tlock.RUnlock() - return v, b -} - -func (c *Client) translateRow(index, field, key string) (uint64, bool) { - c.tlock.RLock() - v, b := c.translator.GetRow(index, field, key) - c.tlock.RUnlock() - return v, b -} - -func (c *Client) addTranslateCol(index, key string, value uint64) { - c.tlock.Lock() - c.translator.AddCol(index, key, value) - c.tlock.Unlock() -} - -func (c *Client) addTranslateRow(index, field, key string, value uint64) { - c.tlock.Lock() - c.translator.AddRow(index, field, key, value) - c.tlock.Unlock() -} +// func (c *Client) translateCol(index, key string) (uint64, bool) { +// c.tlock.RLock() +// v, b := c.translator.GetCol(index, key) +// c.tlock.RUnlock() +// return v, b +// } + +// func (c *Client) translateRow(index, field, key string) (uint64, bool) { +// c.tlock.RLock() +// v, b := c.translator.GetRow(index, field, key) +// c.tlock.RUnlock() +// return v, b +// } + +// func (c *Client) addTranslateCol(index, key string, value uint64) { +// c.tlock.Lock() +// c.translator.AddCol(index, key, value) +// c.tlock.Unlock() +// } + +// func (c *Client) addTranslateRow(index, field, key string, value uint64) { +// c.tlock.Lock() +// c.translator.AddRow(index, field, key, value) +// c.tlock.Unlock() +// } -// TODO unexport this, consider unexporting ImportValues, look for other candidates, put a note on translator about it being only used by batch, do something about shardNodes. func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) { uris, ok := c.shardNodes.Get(index, shard) if ok { @@ -238,7 +231,6 @@ func newClientWithOptions(options *ClientOptions) *Client { logger: log.New(os.Stderr, "go-pilosa ", log.Flags()), coordinatorLock: &sync.RWMutex{}, - translator: NewTranslator(), shardNodes: newShardNodes(), } if options.importLogWriter != nil { diff --git a/importbatch.go b/importbatch.go index 1e14e06..cb11ca4 100644 --- a/importbatch.go +++ b/importbatch.go @@ -83,6 +83,8 @@ type Batch struct { // each record has a different string ID. In that case, a simple // slice of strings would probably work better. toTranslateID map[string][]int + + transCache *Translator } // NewBatch initializes a new Batch object which will use the given @@ -122,14 +124,15 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch { clearValues: make(map[string][]uint64), toTranslate: tt, toTranslateID: make(map[string][]int), + transCache: NewTranslator(), } } // Row represents a single record which can be added to a RecordBatch. // // Note: it is not named "Record" because there is a conflict with -//another type in this package. This may be rectified by deprecating -//something or splitting packages in the future. +// another type in this package. This may be rectified by deprecating +// something or splitting packages in the future. type Row struct { ID interface{} Values []interface{} @@ -151,7 +154,7 @@ func (b *Batch) Add(rec Row) error { case uint64: b.ids = append(b.ids, rid) case string: - if colID, ok := b.client.translateCol(b.index.Name(), rid); ok { + if colID, ok := b.transCache.GetCol(b.index.Name(), rid); ok { b.ids = append(b.ids, colID) } else { ints, ok := b.toTranslateID[rid] @@ -172,7 +175,7 @@ func (b *Batch) Add(rec Row) error { case string: rowIDs := b.rowIDs[field.Name()] // translate val and append to b.rowIDs[i] - if rowID, ok := b.client.translateRow(b.index.Name(), field.Name(), val); ok { + if rowID, ok := b.transCache.GetRow(b.index.Name(), field.Name(), val); ok { b.rowIDs[field.Name()] = append(rowIDs, rowID) } else { ints, ok := b.toTranslate[field.Name()][val] @@ -254,15 +257,13 @@ func (b *Batch) doTranslation() error { if err != nil { return errors.Wrap(err, "translating col keys") } - b.client.tlock.Lock() for j, key := range keys { id := ids[j] for _, recordIdx := range b.toTranslateID[key] { b.ids[recordIdx] = id } - b.client.translator.AddCol(b.index.Name(), key, id) + b.transCache.AddCol(b.index.Name(), key, id) } - b.client.tlock.Unlock() } else { keys = make([]string, 0) } @@ -288,15 +289,13 @@ func (b *Batch) doTranslation() error { // fill out missing IDs in local batch records with translated IDs rows := b.rowIDs[fieldName] - b.client.tlock.Lock() for j, key := range keys { id := ids[j] for _, recordIdx := range tt[key] { rows[recordIdx] = id } - b.client.translator.AddRow(b.index.Name(), fieldName, key, id) + b.transCache.AddRow(b.index.Name(), fieldName, key, id) } - b.client.tlock.Unlock() } for _, idIndexes := range b.clearValues {