From 68bc37811b17d5855e34d47c374b6f60418915dc Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Tue, 24 Mar 2015 16:53:31 -0400 Subject: [PATCH] better handle RequeueWithoutBackoff after in backoff * expand locking to cover updating backoffDuration * use atomic ops for backoffCounter * set RDY 0 before setting backoff timer (for cases when timer is 0) * clarify/consolidated backoff/resume functions --- conn.go | 10 +-- consumer.go | 201 +++++++++++++++++++++++++++------------------------ delegates.go | 5 ++ mock_test.go | 108 +++++++++++++++++++++++---- test.sh | 2 +- 5 files changed, 213 insertions(+), 113 deletions(-) diff --git a/conn.go b/conn.go index 74b6774a..3d7ea7f1 100644 --- a/conn.go +++ b/conn.go @@ -563,14 +563,14 @@ func (c *Conn) writeLoop() { if resp.success { c.log(LogLevelDebug, "FIN %s", resp.msg.ID) c.delegate.OnMessageFinished(c, resp.msg) - if resp.backoff { - c.delegate.OnResume(c) - } + c.delegate.OnResume(c) } else { c.log(LogLevelDebug, "REQ %s", resp.msg.ID) c.delegate.OnMessageRequeued(c, resp.msg) if resp.backoff { c.delegate.OnBackoff(c) + } else { + c.delegate.OnContinue(c) } } @@ -683,7 +683,7 @@ func (c *Conn) waitForCleanup() { } func (c *Conn) onMessageFinish(m *Message) { - c.msgResponseChan <- &msgResponse{m, Finish(m.ID), true, true} + c.msgResponseChan <- &msgResponse{msg: m, cmd: Finish(m.ID), success: true} } func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) { @@ -695,7 +695,7 @@ func (c *Conn) onMessageRequeue(m *Message, delay time.Duration, backoff bool) { delay = c.config.MaxRequeueDelay } } - c.msgResponseChan <- &msgResponse{m, Requeue(m.ID, delay), false, backoff} + c.msgResponseChan <- &msgResponse{msg: m, cmd: Requeue(m.ID, delay), success: false, backoff: backoff} } func (c *Conn) onMessageTouch(m *Message) { diff --git a/consumer.go b/consumer.go index 9e750aed..4b0f3432 100644 --- a/consumer.go +++ b/consumer.go @@ -66,6 +66,14 @@ type ConsumerStats struct { var instCount int64 +type backoffSignal int + +const ( + backoffFlag backoffSignal = iota + continueFlag + resumeFlag +) + // Consumer is a high-level type to consume from NSQ. // // A Consumer instance is supplied a Handler that will be executed @@ -82,6 +90,7 @@ type Consumer struct { messagesRequeued uint64 totalRdyCount int64 backoffDuration int64 + backoffCounter int32 maxInFlight int32 mtx sync.RWMutex @@ -101,8 +110,7 @@ type Consumer struct { needRDYRedistributed int32 - backoffMtx sync.RWMutex - backoffCounter int32 + backoffMtx sync.RWMutex incomingMessages chan *Message @@ -634,95 +642,15 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) { } func (r *Consumer) onConnBackoff(c *Conn) { - r.startStopContinueBackoff(c, false) + r.startStopContinueBackoff(c, backoffFlag) } -func (r *Consumer) onConnResume(c *Conn) { - r.startStopContinueBackoff(c, true) -} - -func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { - // prevent many async failures/successes from immediately resulting in - // max backoff/normal rate (by ensuring that we dont continually incr/decr - // the counter during a backoff period) - if r.inBackoffBlock() { - return - } - - // update backoff state - r.backoffMtx.Lock() - backoffUpdated := false - if success { - if r.backoffCounter > 0 { - r.backoffCounter-- - backoffUpdated = true - } - } else { - maxBackoffCount := int32(math.Max(1, math.Ceil( - math.Log2(r.config.MaxBackoffDuration.Seconds())))) - if r.backoffCounter < maxBackoffCount { - r.backoffCounter++ - backoffUpdated = true - } - } - r.backoffMtx.Unlock() - - if r.backoffCounter == 0 && backoffUpdated { - // exit backoff - count := r.perConnMaxInFlight() - r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) - for _, c := range r.conns() { - r.updateRDY(c, count) - } - } else if r.backoffCounter > 0 { - // start or continue backoff - backoffDuration := r.config.BackoffStrategy.Calculate(int(r.backoffCounter)) - atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - time.AfterFunc(backoffDuration, r.backoff) - - r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", - backoffDuration.Seconds(), r.backoffCounter) - - // send RDY 0 immediately (to *all* connections) - for _, c := range r.conns() { - r.updateRDY(c, 0) - } - } +func (r *Consumer) onConnContinue(c *Conn) { + r.startStopContinueBackoff(c, continueFlag) } -func (r *Consumer) backoff() { - - if atomic.LoadInt32(&r.stopFlag) == 1 { - atomic.StoreInt64(&r.backoffDuration, 0) - return - } - - // pick a random connection to test the waters - conns := r.conns() - if len(conns) == 0 { - // backoff again - backoffDuration := 1 * time.Second - atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - time.AfterFunc(backoffDuration, r.backoff) - return - } - idx := r.rng.Intn(len(conns)) - choice := conns[idx] - - r.log(LogLevelWarning, - "(%s) backoff timeout expired, sending RDY 1", - choice.String()) - // while in backoff only ever let 1 message at a time through - err := r.updateRDY(choice, 1) - if err != nil { - r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) - backoffDuration := 1 * time.Second - atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - time.AfterFunc(backoffDuration, r.backoff) - return - } - - atomic.StoreInt64(&r.backoffDuration, 0) +func (r *Consumer) onConnResume(c *Conn) { + r.startStopContinueBackoff(c, resumeFlag) } func (r *Consumer) onConnResponse(c *Conn, data []byte) { @@ -823,19 +751,104 @@ func (r *Consumer) onConnClose(c *Conn) { } } +func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) { + // prevent many async failures/successes from immediately resulting in + // max backoff/normal rate (by ensuring that we dont continually incr/decr + // the counter during a backoff period) + r.backoffMtx.Lock() + if r.inBackoffTimeout() { + r.backoffMtx.Unlock() + return + } + defer r.backoffMtx.Unlock() + + // update backoff state + backoffUpdated := false + backoffCounter := atomic.LoadInt32(&r.backoffCounter) + switch signal { + case resumeFlag: + if backoffCounter > 0 { + backoffCounter-- + backoffUpdated = true + } + case backoffFlag: + nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1) + if nextBackoff <= r.config.MaxBackoffDuration { + backoffCounter++ + backoffUpdated = true + } + } + atomic.StoreInt32(&r.backoffCounter, backoffCounter) + + if r.backoffCounter == 0 && backoffUpdated { + // exit backoff + count := r.perConnMaxInFlight() + r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) + for _, c := range r.conns() { + r.updateRDY(c, count) + } + } else if r.backoffCounter > 0 { + // start or continue backoff + backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter)) + + r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", + backoffDuration.Seconds(), backoffCounter) + + // send RDY 0 immediately (to *all* connections) + for _, c := range r.conns() { + r.updateRDY(c, 0) + } + + r.backoff(backoffDuration) + } +} + +func (r *Consumer) backoff(d time.Duration) { + atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds()) + time.AfterFunc(d, r.resume) +} + +func (r *Consumer) resume() { + if atomic.LoadInt32(&r.stopFlag) == 1 { + atomic.StoreInt64(&r.backoffDuration, 0) + return + } + + // pick a random connection to test the waters + conns := r.conns() + if len(conns) == 0 { + // backoff again + r.backoff(time.Second) + return + } + idx := r.rng.Intn(len(conns)) + choice := conns[idx] + + r.log(LogLevelWarning, + "(%s) backoff timeout expired, sending RDY 1", + choice.String()) + + // while in backoff only ever let 1 message at a time through + err := r.updateRDY(choice, 1) + if err != nil { + r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err) + r.backoff(time.Second) + return + } + + atomic.StoreInt64(&r.backoffDuration, 0) +} + func (r *Consumer) inBackoff() bool { - r.backoffMtx.RLock() - backoffCounter := r.backoffCounter - r.backoffMtx.RUnlock() - return backoffCounter > 0 + return atomic.LoadInt32(&r.backoffCounter) > 0 } -func (r *Consumer) inBackoffBlock() bool { +func (r *Consumer) inBackoffTimeout() bool { return atomic.LoadInt64(&r.backoffDuration) > 0 } func (r *Consumer) maybeUpdateRDY(conn *Conn) { - if r.inBackoff() || r.inBackoffBlock() { + if r.inBackoff() || r.inBackoffTimeout() { return } @@ -932,7 +945,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error { } func (r *Consumer) redistributeRDY() { - if r.inBackoffBlock() { + if r.inBackoffTimeout() { return } diff --git a/delegates.go b/delegates.go index 6bfd94f0..11225781 100644 --- a/delegates.go +++ b/delegates.go @@ -81,6 +81,9 @@ type ConnDelegate interface { // OnBackoff is called when the connection triggers a backoff state OnBackoff(*Conn) + // OnContinue is called when the connection finishes a message without adjusting backoff state + OnContinue(*Conn) + // OnResume is called when the connection triggers a resume state OnResume(*Conn) @@ -109,6 +112,7 @@ func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message) { d.r.onCo func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) } func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) } func (d *consumerConnDelegate) OnBackoff(c *Conn) { d.r.onConnBackoff(c) } +func (d *consumerConnDelegate) OnContinue(c *Conn) { d.r.onConnContinue(c) } func (d *consumerConnDelegate) OnResume(c *Conn) { d.r.onConnResume(c) } func (d *consumerConnDelegate) OnIOError(c *Conn, err error) { d.r.onConnIOError(c, err) } func (d *consumerConnDelegate) OnHeartbeat(c *Conn) { d.r.onConnHeartbeat(c) } @@ -126,6 +130,7 @@ func (d *producerConnDelegate) OnMessage(c *Conn, m *Message) {} func (d *producerConnDelegate) OnMessageFinished(c *Conn, m *Message) {} func (d *producerConnDelegate) OnMessageRequeued(c *Conn, m *Message) {} func (d *producerConnDelegate) OnBackoff(c *Conn) {} +func (d *producerConnDelegate) OnContinue(c *Conn) {} func (d *producerConnDelegate) OnResume(c *Conn) {} func (d *producerConnDelegate) OnIOError(c *Conn, err error) { d.w.onConnIOError(c, err) } func (d *producerConnDelegate) OnHeartbeat(c *Conn) { d.w.onConnHeartbeat(c) } diff --git a/mock_test.go b/mock_test.go index 63bf9bec..57814b69 100644 --- a/mock_test.go +++ b/mock_test.go @@ -174,39 +174,50 @@ func framedResponse(frameType int32, data []byte) []byte { type testHandler struct{} func (h *testHandler) HandleMessage(message *Message) error { + if bytes.Equal(message.Body, []byte("requeue")) { + message.Requeue(-1) + return nil + } + if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) { + if message.Attempts > 1 { + return nil + } + message.RequeueWithoutBackoff(-1) + return nil + } if bytes.Equal(message.Body, []byte("bad")) { return errors.New("bad") } return nil } +func frameMessage(m *Message) []byte { + var b bytes.Buffer + m.WriteTo(&b) + return b.Bytes() +} + func TestConsumerBackoff(t *testing.T) { logger := log.New(ioutil.Discard, "", log.LstdFlags) - var mgood bytes.Buffer msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgGood := NewMessage(msgIDGood, []byte("good")) - msgGood.WriteTo(&mgood) - msgBytesGood := mgood.Bytes() - var mbad bytes.Buffer msgIDBad := MessageID{'z', 'x', 'c', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} msgBad := NewMessage(msgIDBad, []byte("bad")) - msgBad.WriteTo(&mbad) - msgBytesBad := mbad.Bytes() script := []instruction{ // SUB instruction{0, FrameTypeResponse, []byte("OK")}, // IDENTIFY instruction{0, FrameTypeResponse, []byte("OK")}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesGood}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesGood}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesGood}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesBad}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesBad}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesGood}, - instruction{20 * time.Millisecond, FrameTypeMessage, msgBytesGood}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgBad)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgBad)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, // needed to exit test instruction{200 * time.Millisecond, -1, []byte("exit")}, } @@ -259,3 +270,74 @@ func TestConsumerBackoff(t *testing.T) { } } } + +func TestConsumerRequeueNoBackoff(t *testing.T) { + // logger := log.New(ioutil.Discard, "", log.LstdFlags) + + msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'} + + msgGood := NewMessage(msgIDGood, []byte("good")) + msgRequeue := NewMessage(msgIDRequeue, []byte("requeue")) + msgRequeueNoBackoff := NewMessage(msgIDRequeueNoBackoff, []byte("requeue_no_backoff_1")) + + script := []instruction{ + // SUB + instruction{0, FrameTypeResponse, []byte("OK")}, + // IDENTIFY + instruction{0, FrameTypeResponse, []byte("OK")}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeue)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgRequeueNoBackoff)}, + instruction{20 * time.Millisecond, FrameTypeMessage, frameMessage(msgGood)}, + // needed to exit test + instruction{100 * time.Millisecond, -1, []byte("exit")}, + } + n := newMockNSQD(script) + + topicName := "test_requeue" + strconv.Itoa(int(time.Now().Unix())) + config := NewConfig() + config.MaxInFlight = 1 + config.BackoffMultiplier = 10 * time.Millisecond + q, _ := NewConsumer(topicName, "ch", config) + // q.SetLogger(logger, LogLevelDebug) + q.AddHandler(&testHandler{}) + err := q.ConnectToNSQD(n.tcpAddr.String()) + if err != nil { + t.Fatalf(err.Error()) + } + + select { + case <-n.exitChan: + log.Printf("clean exit") + case <-time.After(500 * time.Millisecond): + log.Printf("timeout") + } + + for i, r := range n.got { + log.Printf("%d: %s", i, r) + } + + expected := []string{ + "IDENTIFY", + "SUB " + topicName + " ch", + "RDY 1", + "RDY 1", + "RDY 0", + fmt.Sprintf("REQ %s 0", msgIDRequeue), + "RDY 1", + "RDY 0", + fmt.Sprintf("REQ %s 0", msgIDRequeueNoBackoff), + "RDY 1", + "RDY 1", + fmt.Sprintf("FIN %s", msgIDGood), + } + if len(n.got) != len(expected) { + t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected)) + } + for i, r := range n.got { + if string(r) != expected[i] { + t.Fatalf("cmd %d bad %s != %s", i, r, expected[i]) + } + } +} diff --git a/test.sh b/test.sh index da161380..351135a9 100755 --- a/test.sh +++ b/test.sh @@ -40,4 +40,4 @@ cleanup() { } trap cleanup INT TERM EXIT -go test -v -timeout 15s +go test -v -timeout 60s