Skip to content

Commit

Permalink
Merge pull request #128 from jehiah/backoff_requeue_no_backoff_128
Browse files Browse the repository at this point in the history
Consumer stall after RequeueWithoutBackoff
  • Loading branch information
mreiferson committed Mar 26, 2015
2 parents 33e8317 + 8923105 commit da20c0d
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 122 deletions.
19 changes: 10 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
language: go
go:
- 1.2.2
- 1.3.1
- 1.4.2
env:
- NSQ_DOWNLOAD=nsq-0.2.24.linux-amd64.go1.2 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.24.linux-amd64.go1.2 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.27.linux-amd64.go1.2 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.27.linux-amd64.go1.2 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.28.linux-amd64.go1.2.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.28.linux-amd64.go1.2.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.30.linux-amd64.go1.3 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.30.linux-amd64.go1.3 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.31.linux-amd64.go1.3.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.31.linux-amd64.go1.3.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=386
install:
- go get github.com/bitly/go-simplejson
- go get github.com/mreiferson/go-snappystream
script:
- wget http://bitly-downloads.s3.amazonaws.com/nsq/$NSQ_DOWNLOAD.tar.gz
- tar zxvf $NSQ_DOWNLOAD.tar.gz
- sudo cp $NSQ_DOWNLOAD/bin/nsqd $NSQ_DOWNLOAD/bin/nsqlookupd /usr/local/bin
- export PATH=$NSQ_DOWNLOAD/bin:$PATH
- pushd $TRAVIS_BUILD_DIR
- ./test.sh
- popd
notifications:
email: false

sudo: false
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,14 @@ func (c *Conn) writeLoop() {
if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
if resp.backoff {
c.delegate.OnResume(c)
}
c.delegate.OnResume(c)
} else {
c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
c.delegate.OnMessageRequeued(c, resp.msg)
if resp.backoff {
c.delegate.OnBackoff(c)
} else {
c.delegate.OnContinue(c)
}
}

Expand Down Expand Up @@ -683,7 +683,7 @@ func (c *Conn) waitForCleanup() {
}

func (c *Conn) onMessageFinish(m *Message) {
c.msgResponseChan <- &msgResponse{m, Finish(m.ID), true, true}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
}

func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
Expand All @@ -695,7 +695,7 @@ func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
delay = c.config.MaxRequeueDelay
}
}
c.msgResponseChan <- &msgResponse{m, Requeue(m.ID, delay), false, backoff}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff}
}

func (c *Conn) onMessageTouch(m *Message) {
Expand Down
201 changes: 107 additions & 94 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ type ConsumerStats struct {

var instCount int64

type backoffSignal int

const (
backoffFlag backoffSignal = iota
continueFlag
resumeFlag
)

// Consumer is a high-level type to consume from NSQ.
//
// A Consumer instance is supplied a Handler that will be executed
Expand All @@ -82,6 +90,7 @@ type Consumer struct {
messagesRequeued uint64
totalRdyCount int64
backoffDuration int64
backoffCounter int32
maxInFlight int32

mtx sync.RWMutex
Expand All @@ -101,8 +110,7 @@ type Consumer struct {

needRDYRedistributed int32

backoffMtx sync.RWMutex
backoffCounter int32
backoffMtx sync.RWMutex

incomingMessages chan *Message

Expand Down Expand Up @@ -634,95 +642,15 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) {
}

func (r *Consumer) onConnBackoff(c *Conn) {
r.startStopContinueBackoff(c, false)
r.startStopContinueBackoff(c, backoffFlag)
}

func (r *Consumer) onConnResume(c *Conn) {
r.startStopContinueBackoff(c, true)
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if r.inBackoffBlock() {
return
}

// update backoff state
r.backoffMtx.Lock()
backoffUpdated := false
if success {
if r.backoffCounter > 0 {
r.backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if r.backoffCounter < maxBackoffCount {
r.backoffCounter++
backoffUpdated = true
}
}
r.backoffMtx.Unlock()

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(r.backoffCounter))
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
func (r *Consumer) onConnContinue(c *Conn) {
r.startStopContinueBackoff(c, continueFlag)
}

func (r *Consumer) backoff() {

if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
func (r *Consumer) onConnResume(c *Conn) {
r.startStopContinueBackoff(c, resumeFlag)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand Down Expand Up @@ -823,19 +751,104 @@ func (r *Consumer) onConnClose(c *Conn) {
}
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
r.backoffMtx.Lock()
if r.inBackoffTimeout() {
r.backoffMtx.Unlock()
return
}
defer r.backoffMtx.Unlock()

// update backoff state
backoffUpdated := false
backoffCounter := atomic.LoadInt32(&r.backoffCounter)
switch signal {
case resumeFlag:
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
case backoffFlag:
nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1)
if nextBackoff <= r.config.MaxBackoffDuration {
backoffCounter++
backoffUpdated = true
}
}
atomic.StoreInt32(&r.backoffCounter, backoffCounter)

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}

r.backoff(backoffDuration)
}
}

func (r *Consumer) backoff(d time.Duration) {
atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds())
time.AfterFunc(d, r.resume)
}

func (r *Consumer) resume() {
if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.backoff(time.Second)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())

// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.backoff(time.Second)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) inBackoff() bool {
r.backoffMtx.RLock()
backoffCounter := r.backoffCounter
r.backoffMtx.RUnlock()
return backoffCounter > 0
return atomic.LoadInt32(&r.backoffCounter) > 0
}

func (r *Consumer) inBackoffBlock() bool {
func (r *Consumer) inBackoffTimeout() bool {
return atomic.LoadInt64(&r.backoffDuration) > 0
}

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffBlock() {
if r.inBackoff() || r.inBackoffTimeout() {
return
}

Expand Down Expand Up @@ -932,7 +945,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
}

func (r *Consumer) redistributeRDY() {
if r.inBackoffBlock() {
if r.inBackoffTimeout() {
return
}

Expand Down
5 changes: 5 additions & 0 deletions delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type ConnDelegate interface {
// OnBackoff is called when the connection triggers a backoff state
OnBackoff(*Conn)

// OnContinue is called when the connection finishes a message without adjusting backoff state
OnContinue(*Conn)

// OnResume is called when the connection triggers a resume state
OnResume(*Conn)

Expand Down Expand Up @@ -109,6 +112,7 @@ func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onCo
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
Expand All @@ -126,6 +130,7 @@ func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnBackoff(c *Conn) {}
func (d *producerConnDelegate) OnContinue(c *Conn) {}
func (d *producerConnDelegate) OnResume(c *Conn) {}
func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) }
func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) }
Expand Down
Loading

0 comments on commit da20c0d

Please sign in to comment.