diff --git a/consumer.go b/consumer.go index 89053d5a..e661e119 100644 --- a/consumer.go +++ b/consumer.go @@ -66,6 +66,14 @@ type ConsumerStats struct { var instCount int64 +type backoffSignal int + +const ( + backoffFlag backoffSignal = iota + continueFlag + resumeFlag +) + // Consumer is a high-level type to consume from NSQ. // // A Consumer instance is supplied a Handler that will be executed @@ -633,13 +641,6 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) { atomic.AddUint64(&r.messagesRequeued, 1) } -type backoffSignal int -const ( - backoffFlag backoffSignal = iota - continueFlag - resumeFlag -) - func (r *Consumer) onConnBackoff(c *Conn) { r.startStopContinueBackoff(c, backoffFlag) } @@ -652,93 +653,6 @@ func (r *Consumer) onConnResume(c *Conn) { r.startStopContinueBackoff(c, resumeFlag) } -func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) { - // prevent many async failures/successes from immediately resulting in - // max backoff/normal rate (by ensuring that we dont continually incr/decr - // the counter during a backoff period) - r.backoffMtx.Lock() - defer r.backoffMtx.Unlock() - if r.inBackoffTimeout() { - - return - } - - // update backoff state - backoffUpdated := false - backoffCounter := atomic.LoadInt32(&r.backoffCounter) - switch signal { - case resumeFlag: - if backoffCounter > 0 { - backoffCounter-- - backoffUpdated = true - } - case backoffFlag: - nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1) - if nextBackoff <= r.config.MaxBackoffDuration { - backoffCounter++ - backoffUpdated = true - } - } - atomic.StoreInt32(&r.backoffCounter, backoffCounter) - - if r.backoffCounter == 0 && backoffUpdated { - // exit backoff - count := r.perConnMaxInFlight() - r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) - for _, c := range r.conns() { - r.updateRDY(c, count) - } - } else if r.backoffCounter > 0 { - // start or continue backoff - backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter)) - r.backoff(backoffDuration) - - r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", - backoffDuration.Seconds(), backoffCounter) - - // send RDY 0 immediately (to *all* connections) - for _, c := range r.conns() { - r.updateRDY(c, 0) - } - } -} - -func (r *Consumer) backoff(d time.Duration) { - atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds()) - time.AfterFunc(d, r.resume) -} - -func (r *Consumer) resume() { - if atomic.LoadInt32(&r.stopFlag) == 1 { - atomic.StoreInt64(&r.backoffDuration, 0) - return - } - - // pick a random connection to test the waters - conns := r.conns() - if len(conns) == 0 { - // backoff again - r.backoff(time.Second) - return - } - idx := r.rng.Intn(len(conns)) - choice := conns[idx] - - r.log(LogLevelWarning, - "(%s) backoff timeout expired, sending RDY 1", - choice.String()) - - // while in backoff only ever let 1 message at a time through - err := r.updateRDY(choice, 1) - if err != nil { - r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) - r.backoff(time.Second) - return - } - - atomic.StoreInt64(&r.backoffDuration, 0) -} - func (r *Consumer) onConnResponse(c *Conn, data []byte) { switch { case bytes.Equal(data, []byte("CLOSE_WAIT")): @@ -837,6 +751,92 @@ func (r *Consumer) onConnClose(c *Conn) { } } +func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) { + // prevent many async failures/successes from immediately resulting in + // max backoff/normal rate (by ensuring that we dont continually incr/decr + // the counter during a backoff period) + r.backoffMtx.Lock() + defer r.backoffMtx.Unlock() + if r.inBackoffTimeout() { + return + } + + // update backoff state + backoffUpdated := false + backoffCounter := atomic.LoadInt32(&r.backoffCounter) + switch signal { + case resumeFlag: + if backoffCounter > 0 { + backoffCounter-- + backoffUpdated = true + } + case backoffFlag: + nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1) + if nextBackoff <= r.config.MaxBackoffDuration { + backoffCounter++ + backoffUpdated = true + } + } + atomic.StoreInt32(&r.backoffCounter, backoffCounter) + + if r.backoffCounter == 0 && backoffUpdated { + // exit backoff + count := r.perConnMaxInFlight() + r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) + for _, c := range r.conns() { + r.updateRDY(c, count) + } + } else if r.backoffCounter > 0 { + // start or continue backoff + backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter)) + r.backoff(backoffDuration) + + r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", + backoffDuration.Seconds(), backoffCounter) + + // send RDY 0 immediately (to *all* connections) + for _, c := range r.conns() { + r.updateRDY(c, 0) + } + } +} + +func (r *Consumer) backoff(d time.Duration) { + atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds()) + time.AfterFunc(d, r.resume) +} + +func (r *Consumer) resume() { + if atomic.LoadInt32(&r.stopFlag) == 1 { + atomic.StoreInt64(&r.backoffDuration, 0) + return + } + + // pick a random connection to test the waters + conns := r.conns() + if len(conns) == 0 { + // backoff again + r.backoff(time.Second) + return + } + idx := r.rng.Intn(len(conns)) + choice := conns[idx] + + r.log(LogLevelWarning, + "(%s) backoff timeout expired, sending RDY 1", + choice.String()) + + // while in backoff only ever let 1 message at a time through + err := r.updateRDY(choice, 1) + if err != nil { + r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) + r.backoff(time.Second) + return + } + + atomic.StoreInt64(&r.backoffDuration, 0) +} + func (r *Consumer) inBackoff() bool { return atomic.LoadInt32(&r.backoffCounter) > 0 } diff --git a/mock_test.go b/mock_test.go index f633f01b..57814b69 100644 --- a/mock_test.go +++ b/mock_test.go @@ -181,7 +181,7 @@ func (h *testHandler) HandleMessage(message *Message) error { if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) { if message.Attempts > 1 { return nil - } + } message.RequeueWithoutBackoff(-1) return nil } @@ -271,11 +271,9 @@ func TestConsumerBackoff(t *testing.T) { } } - func TestConsumerRequeueNoBackoff(t *testing.T) { // logger := log.New(ioutil.Discard, "", log.LstdFlags) - msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} @@ -310,9 +308,9 @@ func TestConsumerRequeueNoBackoff(t *testing.T) { } select { - case <-n.exitChan: + case <-n.exitChan: log.Printf("clean exit") - case <- time.After(500 * time.Millisecond): + case <-time.After(500 * time.Millisecond): log.Printf("timeout") }