Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redistribute RDY in high throughput, idle producer situations #277

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type Config struct {
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

// Enable RDY trading between producing nsqds for situations of high max-in-flight and idle nsqds
RDYTrading bool `opt:"rdy_trading"`

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Expand Down
89 changes: 89 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Consumer struct {
backoffCounter int32
maxInFlight int32

lastRDYTradeTimestamp int64

mtx sync.RWMutex

logger logger
Expand Down Expand Up @@ -958,6 +960,92 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
return nil
}

func (r *Consumer) LastRDYTradeTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&r.lastRDYTradeTimestamp))
}

func (r *Consumer) redistributeUnusedRDY() {
// RDYTrading
if !r.config.RDYTrading || time.Now().Sub(r.LastRDYTradeTime()) < time.Second * 60 {
return
}

// alright so we arrive here if there's no normal RDY redistribution (low maxinflight) going on
// what we now want to accomplish is move RDY away from idle conns / nsqd sources
// that way we can actually make use of maxinflight if only one source is producing at the moment
conns := r.conns()
if len(conns) < 2 {
return
}

// this should already be ensured by the location we're called from, but check again anyways
inBackoff := r.inBackoff()
inBackoffTimeout := r.inBackoffTimeout()
if inBackoff || inBackoffTimeout {
return
}

r.log(LogLevelDebug, "begin RDY trading...")

idleConns := make([]*Conn, 0, len(conns))
busyConns := make([]*Conn, 0, len(conns))

for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
lastRdyDuration := time.Now().Sub(c.LastRdyTime())

// if we did not get a message since last time we set RDY
if lastRdyDuration < lastMsgDuration {
idleConns = append(idleConns, c)
} else {
busyConns = append(busyConns, c)
}
}

// we need to have at least one in both sets so we can trade RDY
if len(idleConns) == 0 || len(busyConns) == 0 {
r.log(LogLevelDebug, " - no RDY trading possible (empty set)")

// we might have done trading before but then other nsqds became active
// so we should equalize again
for _, c := range busyConns {
if c.RDY() != r.perConnMaxInFlight() {
r.maybeUpdateRDY(c)
}
}
// we do busy conns first to get back any potential traded RDY
// after that it should work to reset the idle ones
for _, c := range idleConns {
if c.RDY() != r.perConnMaxInFlight() {
r.maybeUpdateRDY(c)
}
}
return
}

// begin trading and remember we did
atomic.StoreInt64(&r.lastRDYTradeTimestamp, time.Now().UnixNano())

// we will reduce idle conns to RDY 1 and distribute
reducedRDY := int64(0)

for _, c := range idleConns {
if c.RDY() > 1 {
r.log(LogLevelDebug, " - removing %d RDY from %s", c.RDY() - 1, c.String())
reducedRDY += c.RDY() - 1
r.updateRDY(c, 1)
}
}

// all busy conns get an equal RDY amount, in theory could be based on throughput but
// that's even harder to get right
busyConnRDY := reducedRDY / int64(len(busyConns))
for _, c := range busyConns {
r.log(LogLevelDebug, " - adding %d RDY to %s", busyConnRDY, c.String())
r.updateRDY(c, c.RDY() + busyConnRDY)
}
}

func (r *Consumer) redistributeRDY() {
if r.inBackoffTimeout() {
return
Expand All @@ -983,6 +1071,7 @@ func (r *Consumer) redistributeRDY() {
}

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
r.redistributeUnusedRDY()
return
}

Expand Down