Skip to content

Commit

Permalink
Merge pull request #127 from jnewmano/master
Browse files Browse the repository at this point in the history
when in backoff, make sure that the chosen connection isn't closing
  • Loading branch information
mreiferson committed Mar 24, 2015
2 parents edb5658 + 08a850b commit 33e8317
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
15 changes: 12 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,9 @@ func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
}

func (r *Consumer) backoff() {
atomic.StoreInt64(&r.backoffDuration, 0)

if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

Expand All @@ -713,7 +713,16 @@ func (r *Consumer) backoff() {
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand Down Expand Up @@ -865,7 +874,7 @@ exit:

func (r *Consumer) updateRDY(c *Conn, count int64) error {
if c.IsClosing() {
return nil
return ErrClosing
}

// never exceed the nsqd's configured max RDY count
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var ErrNotConnected = errors.New("not connected")
// made against a Producer that has been stopped
var ErrStopped = errors.New("stopped")

// ErrClosing is returned when a connection is closing
var ErrClosing = errors.New("closing")

// ErrAlreadyConnected is returned from ConnectToNSQD when already connected
var ErrAlreadyConnected = errors.New("already connected")

Expand Down

0 comments on commit 33e8317

Please sign in to comment.