From 324136f4fab011955ba91141d5fe90a95b1837d2 Mon Sep 17 00:00:00 2001 From: Ryan Slade Date: Mon, 2 Feb 2015 14:20:38 +0000 Subject: [PATCH] consumer: Deal with case where no connections during backoff. This fixes the following scenario: 1. Consumer is connected to one NSQD host using ConnectToNSQD. 2. Consumer receives message and handler returns an error. 3. Consumer backs off. 4. DisconnectFromNSQD called before backoff closure wakes up. 5. Backoff closure wakes up, sees no connections, returns. 6. Consumer connects to NSQD host using ConnectToNSQD. 7. backoffCounter still > 0 so never sends RDY count (in maybeUpdateRDY). 8. Consumer is now stuck and never sends RDY > 0. --- consumer.go | 53 +++++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/consumer.go b/consumer.go index 7a4203cb..0a26ceed 100644 --- a/consumer.go +++ b/consumer.go @@ -660,32 +660,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { // start or continue backoff backoffDuration := r.backoffDurationForCount(r.backoffCounter) atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - time.AfterFunc(backoffDuration, func() { - var choice *Conn - - atomic.StoreInt64(&r.backoffDuration, 0) - - // pick a random connection to test the waters - var i int - conns := r.conns() - if len(conns) == 0 { - return - } - idx := r.rng.Intn(len(conns)) - for _, c := range conns { - if i == idx { - choice = c - break - } - i++ - } - - r.log(LogLevelWarning, - "(%s) backoff timeout expired, sending RDY 1", - choice.String()) - // while in backoff only ever let 1 message at a time through - r.updateRDY(choice, 1) - }) + time.AfterFunc(backoffDuration, r.backoff) r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", backoffDuration.Seconds(), r.backoffCounter) @@ -697,6 +672,32 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { } } +func (r *Consumer) backoff() { + atomic.StoreInt64(&r.backoffDuration, 0) + + if atomic.LoadInt32(&r.stopFlag) == 1 { + return + } + + // pick a random connection to test the waters + conns := r.conns() + if len(conns) == 0 { + // backoff again + backoffDuration := 1 * time.Second + atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) + time.AfterFunc(backoffDuration, r.backoff) + return + } + idx := r.rng.Intn(len(conns)) + choice := conns[idx] + + r.log(LogLevelWarning, + "(%s) backoff timeout expired, sending RDY 1", + choice.String()) + // while in backoff only ever let 1 message at a time through + r.updateRDY(choice, 1) +} + func (r *Consumer) onConnResponse(c *Conn, data []byte) { switch { case bytes.Equal(data, []byte("CLOSE_WAIT")):