Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make public interfaces implementable #184

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick(token token, keyspace string, table string) *Conn {
func (pool *hostConnPool) Pick(token Token, keyspace string, table string) *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

Expand Down
6 changes: 3 additions & 3 deletions connpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type ConnPicker interface {
Pick(token, string, string) *Conn
Pick(Token, string, string) *Conn
Put(*Conn)
Remove(conn *Conn)
InFlight() int
Expand Down Expand Up @@ -71,7 +71,7 @@ func (p *defaultConnPicker) Size() (int, int) {
return size, p.size - size
}

func (p *defaultConnPicker) Pick(token, string, string) *Conn {
func (p *defaultConnPicker) Pick(Token, string, string) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)

Expand Down Expand Up @@ -110,7 +110,7 @@ func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
// to the point where we have first connection.
type nopConnPicker struct{}

func (nopConnPicker) Pick(token, string, string) *Conn {
func (nopConnPicker) Pick(Token, string, string) *Conn {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func addTabletToTabletsList(tablets []*TabletInfo, tablet *TabletInfo) []*Tablet
}

// Search for place in tablets table for token starting from index l to index r
func findTabletForToken(tablets []*TabletInfo, token token, l int, r int) *TabletInfo {
func findTabletForToken(tablets []*TabletInfo, token Token, l int, r int) *TabletInfo {
for l < r {
var m int
if r*l > 0 {
Expand Down
8 changes: 4 additions & 4 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,20 +323,20 @@ type HostSelectionPolicy interface {
// selection policy.
type SelectedHost interface {
Info() *HostInfo
Token() token
Token() Token
Mark(error)
}

type selectedHost struct {
info *HostInfo
token token
token Token
}

func (host selectedHost) Info() *HostInfo {
return host.info
}

func (host selectedHost) Token() token {
func (host selectedHost) Token() Token {
return host.token
}

Expand Down Expand Up @@ -928,7 +928,7 @@ func (host selectedHostPoolHost) Info() *HostInfo {
return host.info
}

func (host selectedHostPoolHost) Token() token {
func (host selectedHostPoolHost) Token() Token {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ExecutableQuery interface {
Table() string
IsIdempotent() bool
IsLWT() bool
GetCustomPartitioner() partitioner
GetCustomPartitioner() Partitioner

withContext(context.Context) ExecutableQuery

Expand Down
4 changes: 2 additions & 2 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func newScyllaConnPicker(conn *Conn) *scyllaConnPicker {
}
}

func (p *scyllaConnPicker) Pick(t token, keyspace string, table string) *Conn {
func (p *scyllaConnPicker) Pick(t Token, keyspace string, table string) *Conn {
if len(p.conns) == 0 {
return nil
}
Expand Down Expand Up @@ -860,7 +860,7 @@ func ScyllaGetSourcePort(ctx context.Context) uint16 {

// Returns a partitioner specific to the table, or "nil"
// if the cluster-global partitioner should be used
func scyllaGetTablePartitioner(session *Session, keyspaceName, tableName string) (partitioner, error) {
func scyllaGetTablePartitioner(session *Session, keyspaceName, tableName string) (Partitioner, error) {
isCdc, err := scyllaIsCdcTable(session, keyspaceName, tableName)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions scylla_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ const (

type scyllaCDCPartitioner struct{}

var _ partitioner = scyllaCDCPartitioner{}
var _ Partitioner = scyllaCDCPartitioner{}

func (p scyllaCDCPartitioner) Name() string {
return scyllaCDCPartitionerName
}

func (p scyllaCDCPartitioner) Hash(partitionKey []byte) token {
func (p scyllaCDCPartitioner) Hash(partitionKey []byte) Token {
if len(partitionKey) < 8 {
// The key is too short to extract any sensible token,
// so return the min token instead
Expand Down Expand Up @@ -68,7 +68,7 @@ func (p scyllaCDCPartitioner) Hash(partitionKey []byte) token {
return int64Token(upperQword)
}

func (p scyllaCDCPartitioner) ParseString(str string) token {
func (p scyllaCDCPartitioner) ParseString(str string) Token {
return parseInt64Token(str)
}

Expand Down
16 changes: 8 additions & 8 deletions scylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestScyllaConnPickerPickNilToken(t *testing.T) {
s.conns = []*Conn{{
streams: streams.New(protoVersion4),
}}
if s.Pick(token(nil), "", "") != s.conns[0] {
if s.Pick(Token(nil), "", "") != s.conns[0] {
t.Fatal("expected connection")
}
})
Expand All @@ -33,7 +33,7 @@ func TestScyllaConnPickerPickNilToken(t *testing.T) {
s.conns = []*Conn{{
streams: streams.New(protoVersion4),
}}
if s.Pick(token(nil), "", "") != s.conns[0] {
if s.Pick(Token(nil), "", "") != s.conns[0] {
t.Fatal("expected connection")
}
})
Expand All @@ -42,20 +42,20 @@ func TestScyllaConnPickerPickNilToken(t *testing.T) {
s.conns = []*Conn{nil, {
streams: streams.New(protoVersion4),
}}
if s.Pick(token(nil), "", "") != s.conns[1] {
if s.Pick(Token(nil), "", "") != s.conns[1] {
t.Fatal("expected connection")
}
if s.Pick(token(nil), "", "") != s.conns[1] {
if s.Pick(Token(nil), "", "") != s.conns[1] {
t.Fatal("expected connection")
}
})

t.Run("multiple shards no conns", func(t *testing.T) {
s.conns = []*Conn{nil, nil}
if s.Pick(token(nil), "", "") != nil {
if s.Pick(Token(nil), "", "") != nil {
t.Fatal("expected nil")
}
if s.Pick(token(nil), "", "") != nil {
if s.Pick(Token(nil), "", "") != nil {
t.Fatal("expected nil")
}
})
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestScyllaRandomConnPIcker(t *testing.T) {
conns: []*Conn{nil, mockConn(1)},
}

if s.Pick(token(nil), "", "") == nil {
if s.Pick(Token(nil), "", "") == nil {
t.Fatal("expected connection")
}
})
Expand All @@ -187,7 +187,7 @@ func TestScyllaRandomConnPIcker(t *testing.T) {
defer wg.Done()
for i := 0; i < 3; i++ {
select {
case connCh <- s.Pick(token(nil), "", ""):
case connCh <- s.Pick(Token(nil), "", ""):
case <-ctx.Done():
}
}
Expand Down
10 changes: 5 additions & 5 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ type queryRoutingInfo struct {
lwt bool

// If not nil, represents a custom partitioner for the table.
partitioner partitioner
partitioner Partitioner

keyspace string

Expand All @@ -999,7 +999,7 @@ func (qri *queryRoutingInfo) isLWT() bool {
return qri.lwt
}

func (qri *queryRoutingInfo) getPartitioner() partitioner {
func (qri *queryRoutingInfo) getPartitioner() Partitioner {
qri.mu.RLock()
defer qri.mu.RUnlock()
return qri.partitioner
Expand Down Expand Up @@ -1310,7 +1310,7 @@ func (q *Query) IsLWT() bool {
return q.routingInfo.isLWT()
}

func (q *Query) GetCustomPartitioner() partitioner {
func (q *Query) GetCustomPartitioner() Partitioner {
return q.routingInfo.getPartitioner()
}

Expand Down Expand Up @@ -1933,7 +1933,7 @@ func (b *Batch) IsLWT() bool {
return b.routingInfo.isLWT()
}

func (b *Batch) GetCustomPartitioner() partitioner {
func (b *Batch) GetCustomPartitioner() Partitioner {
return b.routingInfo.getPartitioner()
}

Expand Down Expand Up @@ -2176,7 +2176,7 @@ type routingKeyInfo struct {
keyspace string
table string
lwt bool
partitioner partitioner
partitioner Partitioner
}

func (r *routingKeyInfo) String() string {
Expand Down
38 changes: 19 additions & 19 deletions token.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ import (
)

// a token partitioner
type partitioner interface {
type Partitioner interface {
Name() string
Hash([]byte) token
ParseString(string) token
Hash([]byte) Token
ParseString(string) Token
}

// a token
type token interface {
// a Token
type Token interface {
fmt.Stringer
Less(token) bool
Less(Token) bool
}

// murmur3 partitioner
Expand All @@ -36,13 +36,13 @@ func (p murmur3Partitioner) Name() string {
return "Murmur3Partitioner"
}

func (p murmur3Partitioner) Hash(partitionKey []byte) token {
func (p murmur3Partitioner) Hash(partitionKey []byte) Token {
h1 := murmur.Murmur3H1(partitionKey)
return int64Token(h1)
}

// murmur3 little-endian, 128-bit hash, but returns only h1
func (p murmur3Partitioner) ParseString(str string) token {
func (p murmur3Partitioner) ParseString(str string) Token {
return parseInt64Token(str)
}

Expand All @@ -58,7 +58,7 @@ func (m int64Token) String() string {
return strconv.FormatInt(int64(m), 10)
}

func (m int64Token) Less(token token) bool {
func (m int64Token) Less(token Token) bool {
return m < token.(int64Token)
}

Expand All @@ -70,20 +70,20 @@ func (p orderedPartitioner) Name() string {
return "OrderedPartitioner"
}

func (p orderedPartitioner) Hash(partitionKey []byte) token {
func (p orderedPartitioner) Hash(partitionKey []byte) Token {
// the partition key is the token
return orderedToken(partitionKey)
}

func (p orderedPartitioner) ParseString(str string) token {
func (p orderedPartitioner) ParseString(str string) Token {
return orderedToken(str)
}

func (o orderedToken) String() string {
return string(o)
}

func (o orderedToken) Less(token token) bool {
func (o orderedToken) Less(token Token) bool {
return o < token.(orderedToken)
}

Expand All @@ -98,7 +98,7 @@ func (r randomPartitioner) Name() string {
// 2 ** 128
var maxHashInt, _ = new(big.Int).SetString("340282366920938463463374607431768211456", 10)

func (p randomPartitioner) Hash(partitionKey []byte) token {
func (p randomPartitioner) Hash(partitionKey []byte) Token {
sum := md5.Sum(partitionKey)
val := new(big.Int)
val.SetBytes(sum[:])
Expand All @@ -110,7 +110,7 @@ func (p randomPartitioner) Hash(partitionKey []byte) token {
return (*randomToken)(val)
}

func (p randomPartitioner) ParseString(str string) token {
func (p randomPartitioner) ParseString(str string) Token {
val := new(big.Int)
val.SetString(str, 10)
return (*randomToken)(val)
Expand All @@ -120,12 +120,12 @@ func (r *randomToken) String() string {
return (*big.Int)(r).String()
}

func (r *randomToken) Less(token token) bool {
func (r *randomToken) Less(token Token) bool {
return -1 == (*big.Int)(r).Cmp((*big.Int)(token.(*randomToken)))
}

type hostToken struct {
token token
token Token
host *HostInfo
}

Expand All @@ -135,7 +135,7 @@ func (ht hostToken) String() string {

// a data structure for organizing the relationship between tokens and hosts
type tokenRing struct {
partitioner partitioner
partitioner Partitioner

// tokens map token range to primary replica.
// The elements in tokens are sorted by token ascending.
Expand Down Expand Up @@ -212,7 +212,7 @@ func (t *tokenRing) String() string {
//
// It returns two tokens. First is token that exactly corresponds to the partition key (and could be used to
// determine shard, for example), second token is the endToken that corresponds to the host.
func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) (host *HostInfo, token token, endToken token) {
func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) (host *HostInfo, token Token, endToken Token) {
if t == nil {
return nil, nil, nil
}
Expand All @@ -222,7 +222,7 @@ func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) (host *HostInfo,
return host, token, endToken
}

func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken token) {
func (t *tokenRing) GetHostForToken(token Token) (host *HostInfo, endToken Token) {
if t == nil || len(t.tokens) == 0 {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestRandomToken(t *testing.T) {
type intToken int

func (i intToken) String() string { return strconv.Itoa(int(i)) }
func (i intToken) Less(token token) bool { return i < token.(intToken) }
func (i intToken) Less(token Token) bool { return i < token.(intToken) }

// Test of the token ring implementation based on example at the start of this
// page of documentation:
Expand Down
4 changes: 2 additions & 2 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type hostTokens struct {
// token is end (inclusive) of token range these hosts belong to
token token
token Token
hosts []*HostInfo
}

Expand All @@ -24,7 +24,7 @@ func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].tok
func (h tokenRingReplicas) Len() int { return len(h) }
func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
func (h tokenRingReplicas) replicasFor(t Token) *hostTokens {
if len(h) == 0 {
return nil
}
Expand Down
Loading