Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer stall after RequeueWithoutBackoff #128

Merged
merged 2 commits into from
Mar 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
language: go
go:
- 1.2.2
- 1.3.1
- 1.4.2
env:
- NSQ_DOWNLOAD=nsq-0.2.24.linux-amd64.go1.2 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.24.linux-amd64.go1.2 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.27.linux-amd64.go1.2 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.27.linux-amd64.go1.2 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.28.linux-amd64.go1.2.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.28.linux-amd64.go1.2.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.30.linux-amd64.go1.3 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.30.linux-amd64.go1.3 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.2.31.linux-amd64.go1.3.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.2.31.linux-amd64.go1.3.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.0.linux-amd64.go1.3.3 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.1.linux-amd64.go1.4.1 GOARCH=386
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=amd64
- NSQ_DOWNLOAD=nsq-0.3.2.linux-amd64.go1.4.1 GOARCH=386
install:
- go get github.com/bitly/go-simplejson
- go get github.com/mreiferson/go-snappystream
script:
- wget http://bitly-downloads.s3.amazonaws.com/nsq/$NSQ_DOWNLOAD.tar.gz
- tar zxvf $NSQ_DOWNLOAD.tar.gz
- sudo cp $NSQ_DOWNLOAD/bin/nsqd $NSQ_DOWNLOAD/bin/nsqlookupd /usr/local/bin
- export PATH=$NSQ_DOWNLOAD/bin:$PATH
- pushd $TRAVIS_BUILD_DIR
- ./test.sh
- popd
notifications:
email: false

sudo: false
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,14 @@ func (c *Conn) writeLoop() {
if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
if resp.backoff {
c.delegate.OnResume(c)
}
c.delegate.OnResume(c)
} else {
c.log(LogLevelDebug, "REQ %s", resp.msg.ID)
c.delegate.OnMessageRequeued(c, resp.msg)
if resp.backoff {
c.delegate.OnBackoff(c)
} else {
c.delegate.OnContinue(c)
}
}

Expand Down Expand Up @@ -683,7 +683,7 @@ func (c *Conn) waitForCleanup() {
}

func (c *Conn) onMessageFinish(m *Message) {
c.msgResponseChan <- &msgResponse{m, Finish(m.ID), true, true}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true}
}

func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
Expand All @@ -695,7 +695,7 @@ func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) {
delay = c.config.MaxRequeueDelay
}
}
c.msgResponseChan <- &msgResponse{m, Requeue(m.ID, delay), false, backoff}
c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff}
}

func (c *Conn) onMessageTouch(m *Message) {
Expand Down
201 changes: 107 additions & 94 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ type ConsumerStats struct {

var instCount int64

type backoffSignal int

const (
backoffFlag backoffSignal = iota
continueFlag
resumeFlag
)

// Consumer is a high-level type to consume from NSQ.
//
// A Consumer instance is supplied a Handler that will be executed
Expand All @@ -82,6 +90,7 @@ type Consumer struct {
messagesRequeued uint64
totalRdyCount int64
backoffDuration int64
backoffCounter int32
maxInFlight int32

mtx sync.RWMutex
Expand All @@ -101,8 +110,7 @@ type Consumer struct {

needRDYRedistributed int32

backoffMtx sync.RWMutex
backoffCounter int32
backoffMtx sync.RWMutex

incomingMessages chan *Message

Expand Down Expand Up @@ -634,95 +642,15 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) {
}

func (r *Consumer) onConnBackoff(c *Conn) {
r.startStopContinueBackoff(c, false)
r.startStopContinueBackoff(c, backoffFlag)
}

func (r *Consumer) onConnResume(c *Conn) {
r.startStopContinueBackoff(c, true)
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if r.inBackoffBlock() {
return
}

// update backoff state
r.backoffMtx.Lock()
backoffUpdated := false
if success {
if r.backoffCounter > 0 {
r.backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if r.backoffCounter < maxBackoffCount {
r.backoffCounter++
backoffUpdated = true
}
}
r.backoffMtx.Unlock()

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(r.backoffCounter))
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
func (r *Consumer) onConnContinue(c *Conn) {
r.startStopContinueBackoff(c, continueFlag)
}

func (r *Consumer) backoff() {

if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
backoffDuration := 1 * time.Second
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, r.backoff)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
func (r *Consumer) onConnResume(c *Conn) {
r.startStopContinueBackoff(c, resumeFlag)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand Down Expand Up @@ -823,19 +751,104 @@ func (r *Consumer) onConnClose(c *Conn) {
}
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
r.backoffMtx.Lock()
if r.inBackoffTimeout() {
r.backoffMtx.Unlock()
return
}
defer r.backoffMtx.Unlock()

// update backoff state
backoffUpdated := false
backoffCounter := atomic.LoadInt32(&r.backoffCounter)
switch signal {
case resumeFlag:
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
case backoffFlag:
nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1)
if nextBackoff <= r.config.MaxBackoffDuration {
backoffCounter++
backoffUpdated = true
}
}
atomic.StoreInt32(&r.backoffCounter, backoffCounter)

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}

r.backoff(backoffDuration)
}
}

func (r *Consumer) backoff(d time.Duration) {
atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds())
time.AfterFunc(d, r.resume)
}

func (r *Consumer) resume() {
if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.backoff(time.Second)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())

// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.backoff(time.Second)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) inBackoff() bool {
r.backoffMtx.RLock()
backoffCounter := r.backoffCounter
r.backoffMtx.RUnlock()
return backoffCounter > 0
return atomic.LoadInt32(&r.backoffCounter) > 0
}

func (r *Consumer) inBackoffBlock() bool {
func (r *Consumer) inBackoffTimeout() bool {
return atomic.LoadInt64(&r.backoffDuration) > 0
}

func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffBlock() {
if r.inBackoff() || r.inBackoffTimeout() {
return
}

Expand Down Expand Up @@ -932,7 +945,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
}

func (r *Consumer) redistributeRDY() {
if r.inBackoffBlock() {
if r.inBackoffTimeout() {
return
}

Expand Down
5 changes: 5 additions & 0 deletions delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type ConnDelegate interface {
// OnBackoff is called when the connection triggers a backoff state
OnBackoff(*Conn)

// OnContinue is called when the connection finishes a message without adjusting backoff state
OnContinue(*Conn)

// OnResume is called when the connection triggers a resume state
OnResume(*Conn)

Expand Down Expand Up @@ -109,6 +112,7 @@ func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onCo
func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) }
func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) }
func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) }
func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) }
func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) }
Expand All @@ -126,6 +130,7 @@ func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {}
func (d *producerConnDelegate) OnBackoff(c *Conn) {}
func (d *producerConnDelegate) OnContinue(c *Conn) {}
func (d *producerConnDelegate) OnResume(c *Conn) {}
func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) }
func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) }
Expand Down
Loading