Skip to content

Commit

Permalink
Merge pull request #106 from ryanslade/reset-backoff-counter
Browse files Browse the repository at this point in the history
consumer: Reset backoff counter when no connections left.
  • Loading branch information
mreiferson committed Feb 3, 2015
2 parents c1b1bb2 + 324136f commit 59eba58
Showing 1 changed file with 27 additions and 26 deletions.
53 changes: 27 additions & 26 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,32 +660,7 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// start or continue backoff
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, func() {
var choice *Conn

atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
return
}
idx := r.rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
})
time.AfterFunc(backoffDuration, r.backoff)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)
Expand All @@ -697,6 +672,32 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
}
}

func (r *Consumer) backoff() {
atomic.StoreInt64(&r.backoffDuration, 0)

if atomic.LoadInt32(&r.stopFlag) == 1 {
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
switch {
case bytes.Equal(data, []byte("CLOSE_WAIT")):
Expand Down

0 comments on commit 59eba58

Please sign in to comment.