Skip to content

Commit

Permalink
consumer: Deal with case where no connections during backoff.
Browse files Browse the repository at this point in the history
This fixes the following scenario:
1. Consumer is connected to one NSQD host using ConnectToNSQD.
2. Consumer receives message and handler returns an error.
3. Consumer backs off.
4. DisconnectFromNSQD called before backoff closure wakes up.
5. Backoff closure wakes up, sees no connections, returns.
6. Consumer connects to NSQD host using ConnectToNSQD.
7. backoffCounter still > 0 so never sends RDY count (in
maybeUpdateRDY).
8. Consumer is now stuck and never sends RDY > 0.
  • Loading branch information
Ryan Slade committed Feb 3, 2015
1 parent c1b1bb2 commit 324136f
Showing 1 changed file with 27 additions and 26 deletions.
53 changes: 27 additions & 26 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,32 +660,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// start or continue backoff
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, func() {
var choice *Conn

atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
return
}
idx := r.rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
})
time.AfterFunc(backoffDuration, r.backoff)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)
Expand All @@ -697,6 +672,32 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
}
}

func (r *Consumer) backoff() {
atomic.StoreInt64(&r.backoffDuration, 0)

if atomic.LoadInt32(&r.stopFlag) == 1 {
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
switch {
case bytes.Equal(data, []byte("CLOSE_WAIT")):
Expand Down

0 comments on commit 324136f

Please sign in to comment.