Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
move Translator out of client to ImportBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 4cc60a1 commit 4d7a11e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
58 changes: 25 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,44 +90,37 @@ type Client struct {
importLogEncoder encoder
logLock sync.Mutex

// TODO replace this with something like BoltDB. Need better
// concurrent performance, less lock contention. Persistence might
// be a nice bonus too.
tlock sync.RWMutex
translator *Translator

// TODO shardNodes needs to be invalidated/updated when cluster topology changes.
shardNodes shardNodes
tick *time.Ticker
}

func (c *Client) translateCol(index, key string) (uint64, bool) {
c.tlock.RLock()
v, b := c.translator.GetCol(index, key)
c.tlock.RUnlock()
return v, b
}

func (c *Client) translateRow(index, field, key string) (uint64, bool) {
c.tlock.RLock()
v, b := c.translator.GetRow(index, field, key)
c.tlock.RUnlock()
return v, b
}

func (c *Client) addTranslateCol(index, key string, value uint64) {
c.tlock.Lock()
c.translator.AddCol(index, key, value)
c.tlock.Unlock()
}

func (c *Client) addTranslateRow(index, field, key string, value uint64) {
c.tlock.Lock()
c.translator.AddRow(index, field, key, value)
c.tlock.Unlock()
}
// func (c *Client) translateCol(index, key string) (uint64, bool) {
// c.tlock.RLock()
// v, b := c.translator.GetCol(index, key)
// c.tlock.RUnlock()
// return v, b
// }

// func (c *Client) translateRow(index, field, key string) (uint64, bool) {
// c.tlock.RLock()
// v, b := c.translator.GetRow(index, field, key)
// c.tlock.RUnlock()
// return v, b
// }

// func (c *Client) addTranslateCol(index, key string, value uint64) {
// c.tlock.Lock()
// c.translator.AddCol(index, key, value)
// c.tlock.Unlock()
// }

// func (c *Client) addTranslateRow(index, field, key string, value uint64) {
// c.tlock.Lock()
// c.translator.AddRow(index, field, key, value)
// c.tlock.Unlock()
// }

// TODO unexport this, consider unexporting ImportValues, look for other candidates, put a note on translator about it being only used by batch, do something about shardNodes.
func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
Expand Down Expand Up @@ -238,7 +231,6 @@ func newClientWithOptions(options *ClientOptions) *Client {
logger: log.New(os.Stderr, "go-pilosa ", log.Flags()),
coordinatorLock: &sync.RWMutex{},

translator: NewTranslator(),
shardNodes: newShardNodes(),
}
if options.importLogWriter != nil {
Expand Down
19 changes: 9 additions & 10 deletions importbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Batch struct {
// each record has a different string ID. In that case, a simple
// slice of strings would probably work better.
toTranslateID map[string][]int

transCache *Translator
}

// NewBatch initializes a new Batch object which will use the given
Expand Down Expand Up @@ -122,14 +124,15 @@ func NewBatch(client *Client, size int, index *Index, fields []*Field) *Batch {
clearValues: make(map[string][]uint64),
toTranslate: tt,
toTranslateID: make(map[string][]int),
transCache: NewTranslator(),
}
}

// Row represents a single record which can be added to a RecordBatch.
//
// Note: it is not named "Record" because there is a conflict with
//another type in this package. This may be rectified by deprecating
//something or splitting packages in the future.
// another type in this package. This may be rectified by deprecating
// something or splitting packages in the future.
type Row struct {
ID interface{}
Values []interface{}
Expand All @@ -151,7 +154,7 @@ func (b *Batch) Add(rec Row) error {
case uint64:
b.ids = append(b.ids, rid)
case string:
if colID, ok := b.client.translateCol(b.index.Name(), rid); ok {
if colID, ok := b.transCache.GetCol(b.index.Name(), rid); ok {
b.ids = append(b.ids, colID)
} else {
ints, ok := b.toTranslateID[rid]
Expand All @@ -172,7 +175,7 @@ func (b *Batch) Add(rec Row) error {
case string:
rowIDs := b.rowIDs[field.Name()]
// translate val and append to b.rowIDs[i]
if rowID, ok := b.client.translateRow(b.index.Name(), field.Name(), val); ok {
if rowID, ok := b.transCache.GetRow(b.index.Name(), field.Name(), val); ok {
b.rowIDs[field.Name()] = append(rowIDs, rowID)
} else {
ints, ok := b.toTranslate[field.Name()][val]
Expand Down Expand Up @@ -254,15 +257,13 @@ func (b *Batch) doTranslation() error {
if err != nil {
return errors.Wrap(err, "translating col keys")
}
b.client.tlock.Lock()
for j, key := range keys {
id := ids[j]
for _, recordIdx := range b.toTranslateID[key] {
b.ids[recordIdx] = id
}
b.client.translator.AddCol(b.index.Name(), key, id)
b.transCache.AddCol(b.index.Name(), key, id)
}
b.client.tlock.Unlock()
} else {
keys = make([]string, 0)
}
Expand All @@ -288,15 +289,13 @@ func (b *Batch) doTranslation() error {

// fill out missing IDs in local batch records with translated IDs
rows := b.rowIDs[fieldName]
b.client.tlock.Lock()
for j, key := range keys {
id := ids[j]
for _, recordIdx := range tt[key] {
rows[recordIdx] = id
}
b.client.translator.AddRow(b.index.Name(), fieldName, key, id)
b.transCache.AddRow(b.index.Name(), fieldName, key, id)
}
b.client.tlock.Unlock()
}

for _, idIndexes := range b.clearValues {
Expand Down

0 comments on commit 4d7a11e

Please sign in to comment.