Skip to content

Commit

Permalink
consumer: don't decrement RDY or refill
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 7, 2019
1 parent 1ae5925 commit 3722c37
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 19 deletions.
7 changes: 3 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type Conn struct {
messagesInFlight int64
maxRdyCount int64
rdyCount int64
lastRdyCount int64
lastRdyTimestamp int64
lastMsgTimestamp int64

Expand Down Expand Up @@ -207,13 +206,12 @@ func (c *Conn) RDY() int64 {

// LastRDY returns the previously set RDY count
func (c *Conn) LastRDY() int64 {
return atomic.LoadInt64(&c.lastRdyCount)
return atomic.LoadInt64(&c.rdyCount)
}

// SetRDY stores the specified RDY count
func (c *Conn) SetRDY(rdy int64) {
atomic.StoreInt64(&c.rdyCount, rdy)
atomic.StoreInt64(&c.lastRdyCount, rdy)
if rdy > 0 {
atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano())
}
Expand All @@ -225,6 +223,8 @@ func (c *Conn) MaxRDY() int64 {
return c.maxRdyCount
}

// LastRdyTime returns the time of the last non-zero RDY
// update for this connection
func (c *Conn) LastRdyTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp))
}
Expand Down Expand Up @@ -523,7 +523,6 @@ func (c *Conn) readLoop() {
msg.Delegate = delegate
msg.NSQDAddress = c.String()

atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())

Expand Down
19 changes: 4 additions & 15 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *Consumer) perConnMaxInFlight() int64 {
// before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (r *Consumer) IsStarved() bool {
for _, conn := range r.conns() {
threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85)
threshold := int64(float64(conn.RDY()) * 0.85)
inFlight := atomic.LoadInt64(&conn.messagesInFlight)
if inFlight >= threshold && inFlight > 0 && !conn.IsClosing() {
return true
Expand Down Expand Up @@ -642,7 +642,6 @@ func (r *Consumer) DisconnectFromNSQLookupd(addr string) error {
}

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
}
Expand Down Expand Up @@ -877,19 +876,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {
return
}

remain := conn.RDY()
lastRdyCount := conn.LastRDY()
count := r.perConnMaxInFlight()

// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
conn, count, remain, lastRdyCount)
}
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
}

func (r *Consumer) rdyLoop() {
Expand Down Expand Up @@ -959,7 +948,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
return nil
}

atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count)
atomic.AddInt64(&r.totalRdyCount, count-c.RDY())
c.SetRDY(count)
err := c.WriteCommand(Ready(int(count)))
if err != nil {
Expand Down

0 comments on commit 3722c37

Please sign in to comment.