diff --git a/consumer.go b/consumer.go index 7a4203cb..0a26ceed 100644 --- a/consumer.go +++ b/consumer.go @@ -660,32 +660,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { // start or continue backoff backoffDuration := r.backoffDurationForCount(r.backoffCounter) atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - time.AfterFunc(backoffDuration, func() { - var choice *Conn - - atomic.StoreInt64(&r.backoffDuration, 0) - - // pick a random connection to test the waters - var i int - conns := r.conns() - if len(conns) == 0 { - return - } - idx := r.rng.Intn(len(conns)) - for _, c := range conns { - if i == idx { - choice = c - break - } - i++ - } - - r.log(LogLevelWarning, - "(%s) backoff timeout expired, sending RDY 1", - choice.String()) - // while in backoff only ever let 1 message at a time through - r.updateRDY(choice, 1) - }) + time.AfterFunc(backoffDuration, r.backoff) r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", backoffDuration.Seconds(), r.backoffCounter) @@ -697,6 +672,32 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { } } +func (r *Consumer) backoff() { + atomic.StoreInt64(&r.backoffDuration, 0) + + if atomic.LoadInt32(&r.stopFlag) == 1 { + return + } + + // pick a random connection to test the waters + conns := r.conns() + if len(conns) == 0 { + // backoff again + backoffDuration := 1 * time.Second + atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) + time.AfterFunc(backoffDuration, r.backoff) + return + } + idx := r.rng.Intn(len(conns)) + choice := conns[idx] + + r.log(LogLevelWarning, + "(%s) backoff timeout expired, sending RDY 1", + choice.String()) + // while in backoff only ever let 1 message at a time through + r.updateRDY(choice, 1) +} + func (r *Consumer) onConnResponse(c *Conn, data []byte) { switch { case bytes.Equal(data, []byte("CLOSE_WAIT")):