Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
use shardnode cluster change detector w/ cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent a930004 commit 8e2f334
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 32 deletions.
48 changes: 17 additions & 31 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,37 +90,11 @@ type Client struct {
importLogEncoder encoder
logLock sync.Mutex

// TODO shardNodes needs to be invalidated/updated when cluster topology changes.
shardNodes shardNodes
tick *time.Ticker
done chan struct{}
}

// func (c *Client) translateCol(index, key string) (uint64, bool) {
// c.tlock.RLock()
// v, b := c.translator.GetCol(index, key)
// c.tlock.RUnlock()
// return v, b
// }

// func (c *Client) translateRow(index, field, key string) (uint64, bool) {
// c.tlock.RLock()
// v, b := c.translator.GetRow(index, field, key)
// c.tlock.RUnlock()
// return v, b
// }

// func (c *Client) addTranslateCol(index, key string, value uint64) {
// c.tlock.Lock()
// c.translator.AddCol(index, key, value)
// c.tlock.Unlock()
// }

// func (c *Client) addTranslateRow(index, field, key string, value uint64) {
// c.tlock.Lock()
// c.translator.AddRow(index, field, key, value)
// c.tlock.Unlock()
// }

func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
Expand All @@ -139,13 +113,22 @@ func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
}

func (c *Client) runChangeDetection() {
c.tick = time.NewTicker(time.Minute)

for range c.tick.C {
c.detectClusterChanges()
for {
select {
case <-c.tick.C:
c.detectClusterChanges()
case <-c.done:
return
}
}
}

func (c *Client) Close() error {
c.tick.Stop()
close(c.done)
return nil
}

// detectClusterChanges chooses a random index and shard from the
// shardNodes cache and deletes it. It then looks it up from Pilosa to
// see if it still matches, and if not it drops the whole cache.
Expand Down Expand Up @@ -232,6 +215,8 @@ func newClientWithOptions(options *ClientOptions) *Client {
coordinatorLock: &sync.RWMutex{},

shardNodes: newShardNodes(),
tick: time.NewTicker(time.Minute),
done: make(chan struct{}, 0),
}
if options.importLogWriter != nil {
c.importLogEncoder = newImportLogEncoder(options.importLogWriter)
Expand All @@ -245,6 +230,7 @@ func newClientWithOptions(options *ClientOptions) *Client {
c.minRetrySleepTime = 1 * time.Second
c.maxRetrySleepTime = 2 * time.Minute
c.importManager = newRecordImportManager(c)
go c.runChangeDetection()
return c

}
Expand Down
2 changes: 1 addition & 1 deletion client_internal_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestImportWithReplayErrors(t *testing.T) {

func TestDetectClusterChanges(t *testing.T) {
c := getClient()

defer c.Close()
c.shardNodes.data["blah"] = make(map[uint64][]*URI)
c.shardNodes.data["blah"][1] = []*URI{{scheme: "zzz"}}

Expand Down
Loading

0 comments on commit 8e2f334

Please sign in to comment.