diff --git a/conn.go b/conn.go index 315148ee..c1b31859 100644 --- a/conn.go +++ b/conn.go @@ -52,7 +52,6 @@ type Conn struct { messagesInFlight int64 maxRdyCount int64 rdyCount int64 - lastRdyCount int64 lastRdyTimestamp int64 lastMsgTimestamp int64 @@ -207,13 +206,12 @@ func (c *Conn) RDY() int64 { // LastRDY returns the previously set RDY count func (c *Conn) LastRDY() int64 { - return atomic.LoadInt64(&c.lastRdyCount) + return atomic.LoadInt64(&c.rdyCount) } // SetRDY stores the specified RDY count func (c *Conn) SetRDY(rdy int64) { atomic.StoreInt64(&c.rdyCount, rdy) - atomic.StoreInt64(&c.lastRdyCount, rdy) if rdy > 0 { atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano()) } @@ -225,6 +223,8 @@ func (c *Conn) MaxRDY() int64 { return c.maxRdyCount } +// LastRdyTime returns the time of the last non-zero RDY +// update for this connection func (c *Conn) LastRdyTime() time.Time { return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp)) } @@ -523,7 +523,6 @@ func (c *Conn) readLoop() { msg.Delegate = delegate msg.NSQDAddress = c.String() - atomic.AddInt64(&c.rdyCount, -1) atomic.AddInt64(&c.messagesInFlight, 1) atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano()) diff --git a/consumer.go b/consumer.go index 3bc784c6..e2603447 100644 --- a/consumer.go +++ b/consumer.go @@ -264,7 +264,7 @@ func (r *Consumer) perConnMaxInFlight() int64 { // before being able to receive more messages (ie. RDY count of 0 and not exiting) func (r *Consumer) IsStarved() bool { for _, conn := range r.conns() { - threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85) + threshold := int64(float64(conn.RDY()) * 0.85) inFlight := atomic.LoadInt64(&conn.messagesInFlight) if inFlight >= threshold && inFlight > 0 && !conn.IsClosing() { return true @@ -642,7 +642,6 @@ func (r *Consumer) DisconnectFromNSQLookupd(addr string) error { } func (r *Consumer) onConnMessage(c *Conn, msg *Message) { - atomic.AddInt64(&r.totalRdyCount, -1) atomic.AddUint64(&r.messagesReceived, 1) r.incomingMessages <- msg } @@ -877,19 +876,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) { return } - remain := conn.RDY() - lastRdyCount := conn.LastRDY() count := r.perConnMaxInFlight() - - // refill when at 1, or at 25%, or if connections have changed and we're imbalanced - if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { - r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", - conn, count, remain, lastRdyCount) - r.updateRDY(conn, count) - } else { - r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", - conn, count, remain, lastRdyCount) - } + r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count) + r.updateRDY(conn, count) } func (r *Consumer) rdyLoop() { @@ -959,7 +948,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error { return nil } - atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count) + atomic.AddInt64(&r.totalRdyCount, count-c.RDY()) c.SetRDY(count) err := c.WriteCommand(Ready(int(count))) if err != nil {