Skip to content

Commit

Permalink
Merge pull request #168 from sylwiaszunejko/avoid_slow_replicas
Browse files Browse the repository at this point in the history
Add LOAD_BALANCING_POLICY_SLOW_AVOIDANCE funtionality
  • Loading branch information
avelanarius authored May 21, 2024
2 parents 7f9ad1c + 26c61ef commit 7f7905d
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 0 deletions.
20 changes: 20 additions & 0 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
}
}

func (p *policyConnPool) InFlight() int {
p.mu.RLock()
count := 0
for _, pool := range p.hostConnPools {
count += pool.InFlight()
}
p.mu.RUnlock()

return count
}

func (p *policyConnPool) Size() int {
p.mu.RLock()
count := 0
Expand Down Expand Up @@ -348,6 +359,15 @@ func (pool *hostConnPool) Size() int {
return size
}

// Size returns the number of connections currently active in the pool
func (pool *hostConnPool) InFlight() int {
pool.mu.RLock()
defer pool.mu.RUnlock()

size := pool.connPicker.InFlight()
return size
}

// Close the connection pool
func (pool *hostConnPool) Close() {
pool.mu.Lock()
Expand Down
10 changes: 10 additions & 0 deletions connpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type ConnPicker interface {
Pick(token, string, string) *Conn
Put(*Conn)
Remove(conn *Conn)
InFlight() int
Size() (int, int)
Close()

Expand Down Expand Up @@ -60,6 +61,11 @@ func (p *defaultConnPicker) Close() {
}
}

func (p *defaultConnPicker) InFlight() int {
size := len(p.conns)
return size
}

func (p *defaultConnPicker) Size() (int, int) {
size := len(p.conns)
return size, p.size - size
Expand Down Expand Up @@ -114,6 +120,10 @@ func (nopConnPicker) Put(*Conn) {
func (nopConnPicker) Remove(conn *Conn) {
}

func (nopConnPicker) InFlight() int {
return 0
}

func (nopConnPicker) Size() (int, int) {
// Return 1 to make hostConnPool to try to establish a connection.
// When first connection is established hostConnPool replaces nopConnPicker
Expand Down
5 changes: 5 additions & 0 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool {
return h != nil && h.State() == NodeUp
}

func (h *HostInfo) IsBusy(s *Session) bool {
pool, ok := s.pool.getPool(h)
return ok && h != nil && pool.InFlight() >= MAX_IN_FLIGHT_THRESHOLD
}

func (h *HostInfo) HostnameAndPort() string {
h.mu.Lock()
defer h.mu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) {
func (s *IDGenerator) Available() int {
return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
}

func (s *IDGenerator) InUse() int {
return int(atomic.LoadInt32(&s.inuseStreams))
}
30 changes: 30 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,17 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
}
}

// AvoidSlowReplicas enabled avoiding slow replicas
//
// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas
// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD requests in flight
func AvoidSlowReplicas(max_in_flight_threshold int) func(policy *tokenAwareHostPolicy) {
return func(t *tokenAwareHostPolicy) {
t.avoidSlowReplicas = true
MAX_IN_FLIGHT_THRESHOLD = max_in_flight_threshold
}
}

// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
//
// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
Expand Down Expand Up @@ -424,6 +435,8 @@ type clusterMeta struct {
tokenRing *tokenRing
}

var MAX_IN_FLIGHT_THRESHOLD int = 10

type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
Expand All @@ -443,6 +456,8 @@ type tokenAwareHostPolicy struct {

// Experimental, this interface and use may change
tablets cowTabletList

avoidSlowReplicas bool
}

func (t *tokenAwareHostPolicy) Init(s *Session) {
Expand Down Expand Up @@ -687,6 +702,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

if s := qry.GetSession(); s != nil && t.avoidSlowReplicas {
healthyReplicas := make([]*HostInfo, 0, len(replicas))
unhealthyReplicas := make([]*HostInfo, 0, len(replicas))

for _, h := range replicas {
if h.IsBusy(s) {
unhealthyReplicas = append(unhealthyReplicas, h)
} else {
healthyReplicas = append(healthyReplicas, h)
}
}

replicas = append(healthyReplicas, unhealthyReplicas...)
}

var (
fallbackIter NextHost
i, j, k int
Expand Down
10 changes: 10 additions & 0 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,16 @@ func (p *scyllaConnPicker) Remove(conn *Conn) {
}
}

func (p *scyllaConnPicker) InFlight() int {
result := 0
for _, conn := range p.conns {
if conn != nil {
result = result + (conn.streams.InUse())
}
}
return result
}

func (p *scyllaConnPicker) Size() (int, int) {
return p.nrConns, p.nrShards - p.nrConns
}
Expand Down

0 comments on commit 7f7905d

Please sign in to comment.