Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.

Commit

Permalink
add locks on translator cache, shardNodes invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
jaffee committed Aug 28, 2019
1 parent 2fa57bd commit eed88d3
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 61 deletions.
82 changes: 79 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,45 @@ type Client struct {
importLogEncoder encoder
logLock sync.Mutex

// TODO make this threadsafe using key translation cache on client using embedded K/V store.
// TODO replace this with something like BoltDB. Need better
// concurrent performance, less lock contention. Persistence might
// be a nice bonus too.
tlock sync.RWMutex
translator *Translator

// TODO shardNodes needs to be invalidated/updated when cluster topology changes.
shardNodes shardNodes
tick *time.Ticker
}

func (c *Client) GetURIsForShard(index string, shard uint64) ([]*URI, error) {
func (c *Client) translateCol(index, key string) (uint64, bool) {
c.tlock.RLock()
v, b := c.translator.GetCol(index, key)
c.tlock.RUnlock()
return v, b
}

func (c *Client) translateRow(index, field, key string) (uint64, bool) {
c.tlock.RLock()
v, b := c.translator.GetRow(index, field, key)
c.tlock.RUnlock()
return v, b
}

func (c *Client) addTranslateCol(index, key string, value uint64) {
c.tlock.Lock()
c.translator.AddCol(index, key, value)
c.tlock.Unlock()
}

func (c *Client) addTranslateRow(index, field, key string, value uint64) {
c.tlock.Lock()
c.translator.AddRow(index, field, key, value)
c.tlock.Unlock()
}

// TODO unexport this, consider unexporting ImportValues, look for other candidates, put a note on translator about it being only used by batch, do something about shardNodes.
func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
uris, ok := c.shardNodes.Get(index, shard)
if ok {
return uris, nil
Expand All @@ -114,6 +145,51 @@ func (c *Client) GetURIsForShard(index string, shard uint64) ([]*URI, error) {
return uris, nil
}

func (c *Client) runChangeDetection() {
c.tick = time.NewTicker(time.Minute)

for _ = range c.tick.C {
c.detectClusterChanges()
}
}

// detectClusterChanges chooses a random index and shard from the
// shardNodes cache and deletes it. It then looks it up from Pilosa to
// see if it still matches, and if not it drops the whole cache.
func (c *Client) detectClusterChanges() {
c.shardNodes.mu.Lock()
// we rely on Go's random map iteration order to get a random
// element. If it doesn't end up being random, it shouldn't
// actually matter.
for index, shardMap := range c.shardNodes.data {
for shard, uris := range shardMap {
delete(shardMap, shard)
c.shardNodes.data[index] = shardMap
c.shardNodes.mu.Unlock()
newURIs, err := c.getURIsForShard(index, shard) // refetch URIs from server.
if err != nil {
c.logger.Printf("problem invalidating shard node cache: %v", err)
return
}
if len(uris) != len(newURIs) {
c.logger.Printf("invalidating shard node cache old: %s, new: %s", URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
for i := range uris {
u1, u2 := uris[i], newURIs[i]
if *u1 != *u2 {
c.logger.Printf("invalidating shard node cache, uri mismatch at %d old: %s, new: %s", i, URIs(uris), URIs(newURIs))
c.shardNodes.Invalidate()
return
}
}
break
}
break
}
}

// DefaultClient creates a client with the default address and options.
func DefaultClient() *Client {
return newClientWithCluster(NewClusterWithHost(DefaultURI()), nil)
Expand Down Expand Up @@ -692,7 +768,7 @@ func (c *Client) ImportValues(index, field string, shard uint64, vals []int64, i
path := fmt.Sprintf("/index/%s/field/%s/import?clear=%s&ignoreKeyCheck=true", index, field, strconv.FormatBool(clear))
c.logImport(index, path, shard, false, data)

uris, err := c.GetURIsForShard(index, shard)
uris, err := c.getURIsForShard(index, shard)
if err != nil {
return errors.Wrap(err, "getting uris")
}
Expand Down
9 changes: 9 additions & 0 deletions client_internal_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ func TestImportWithReplayErrors(t *testing.T) {
t.Fatal("import replay hanging when no schema created")
}
}

func TestDetectClusterChanges(t *testing.T) {
c := getClient()

c.shardNodes.data["blah"] = make(map[uint64][]*URI)
c.shardNodes.data["blah"][1] = []*URI{&URI{scheme: "zzz"}}

c.detectClusterChanges()
}
61 changes: 6 additions & 55 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,64 +1,15 @@
module github.com/pilosa/go-pilosa

require (
cloud.google.com/go v0.40.0 // indirect
github.com/OneOfOne/xxhash v1.2.5 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/armon/go-radix v1.0.0 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190618135430-ff7011eec365 // indirect
github.com/golang/mock v1.3.1 // indirect
github.com/golang/protobuf v1.3.1
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/gorilla/handlers v1.4.0 // indirect
github.com/gorilla/mux v1.7.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-retryablehttp v0.5.4 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/protobuf v1.3.2
github.com/jaffee/commandeer v0.1.1-0.20190726022955-4d43b78ebc4e
github.com/kisielk/errcheck v1.2.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/kr/pty v1.1.5 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/miekg/dns v1.1.14 // indirect
github.com/opentracing/opentracing-go v1.1.0
github.com/pelletier/go-toml v1.4.0 // indirect
github.com/pilosa/demo-taxi v0.0.0-20190604185441-6b6ef983bff7 // indirect
github.com/pilosa/pilosa v1.3.1
github.com/pilosa/tools v0.0.0-20190810124639-ee77232ff3aa // indirect
github.com/pkg/errors v0.8.1
github.com/posener/complete v1.2.1 // indirect
github.com/prometheus/common v0.6.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 // indirect
github.com/rogpeppe/fastuuid v1.1.0 // indirect
github.com/russross/blackfriday v2.0.0+incompatible // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.5 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.4.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/ugorji/go v1.1.5-pre // indirect
go.etcd.io/bbolt v1.3.3 // indirect
go.opencensus.io v0.22.0 // indirect
golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56 // indirect
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522 // indirect
golang.org/x/image v0.0.0-20190618124811-92942e4437e2 // indirect
golang.org/x/mobile v0.0.0-20190607214518-6fa95d984e88 // indirect
golang.org/x/mod v0.1.0 // indirect
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20190618155005-516e3c20635f // indirect
golang.org/x/tools v0.0.0-20190618163018-fdf1049a943a // indirect
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/genproto v0.0.0-20190611190212-a7e196e89fd3 // indirect
google.golang.org/grpc v1.21.1 // indirect
honnef.co/go/tools v0.0.0-20190614002413-cb51c254f01b // indirect
)
Loading

0 comments on commit eed88d3

Please sign in to comment.