From aa59e83883feb828e7c964a0342bf182cbc1f4ec Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Fri, 12 Sep 2014 15:49:25 -0400 Subject: [PATCH] consumer: send RDY before FIN/REQ --- conn.go | 14 +-- consumer.go | 242 ++++++++++++++++++++++++--------------------------- mock_test.go | 8 +- 3 files changed, 127 insertions(+), 137 deletions(-) diff --git a/conn.go b/conn.go index c443272e..13528119 100644 --- a/conn.go +++ b/conn.go @@ -537,13 +537,6 @@ func (c *Conn) writeLoop() { // Decrement this here so it is correct even if we can't respond to nsqd msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1) - err := c.WriteCommand(resp.cmd) - if err != nil { - c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err) - c.close() - continue - } - if resp.success { c.log(LogLevelDebug, "FIN %s", resp.msg.ID) c.delegate.OnMessageFinished(c, resp.msg) @@ -558,6 +551,13 @@ func (c *Conn) writeLoop() { } } + err := c.WriteCommand(resp.cmd) + if err != nil { + c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err) + c.close() + continue + } + if msgsInFlight == 0 && atomic.LoadInt32(&c.closeFlag) == 1 { c.close() diff --git a/consumer.go b/consumer.go index 11f0d309..735714f8 100644 --- a/consumer.go +++ b/consumer.go @@ -79,10 +79,12 @@ type Consumer struct { channel string config Config - backoffChan chan bool - rdyChan chan *Conn + rng *rand.Rand + needRDYRedistributed int32 - backoffCounter int32 + + backoffMtx sync.RWMutex + backoffCounter int32 incomingMessages chan *Message @@ -145,8 +147,8 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error connections: make(map[string]*Conn), lookupdRecheckChan: make(chan int, 1), - backoffChan: make(chan bool), - rdyChan: make(chan *Conn, 1), + + rng: rand.New(rand.NewSource(time.Now().UnixNano())), StopChan: make(chan int), exitChan: make(chan int), @@ -219,7 +221,7 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) { atomic.StoreInt32(&r.maxInFlight, int32(maxInFlight)) for _, c := range r.conns() { - r.rdyChan <- c + r.maybeUpdateRDY(c) } } @@ -296,10 +298,9 @@ func validatedLookupAddr(addr string) error { // poll all known lookup servers every LookupdPollInterval func (r *Consumer) lookupdLoop() { - var rng = rand.New(rand.NewSource(time.Now().UnixNano())) // add some jitter so that multiple consumers discovering the same topic, // when restarted at the same time, dont all connect at once. - jitter := time.Duration(int64(rng.Float64() * + jitter := time.Duration(int64(r.rng.Float64() * r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval))) ticker := time.NewTicker(r.config.LookupdPollInterval) @@ -478,7 +479,7 @@ func (r *Consumer) ConnectToNSQD(addr string) error { // pre-emptive signal to existing connections to lower their RDY count for _, c := range r.conns() { - r.rdyChan <- c + r.maybeUpdateRDY(c) } return nil @@ -488,7 +489,7 @@ func (r *Consumer) onConnMessage(c *Conn, msg *Message) { atomic.AddInt64(&r.totalRdyCount, -1) atomic.AddUint64(&r.messagesReceived, 1) r.incomingMessages <- msg - r.rdyChan <- c + r.maybeUpdateRDY(c) } func (r *Consumer) onConnMessageFinished(c *Conn, msg *Message) { @@ -500,11 +501,85 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) { } func (r *Consumer) onConnBackoff(c *Conn) { - r.backoffChan <- false + r.startStopContinueBackoff(c, false) } func (r *Consumer) onConnResume(c *Conn) { - r.backoffChan <- true + r.startStopContinueBackoff(c, true) +} + +func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) { + // prevent many async failures/successes from immediately resulting in + // max backoff/normal rate (by ensuring that we dont continually incr/decr + // the counter during a backoff period) + if r.inBackoffBlock() { + return + } + + // update backoff state + r.backoffMtx.Lock() + backoffUpdated := false + if success { + if r.backoffCounter > 0 { + r.backoffCounter-- + backoffUpdated = true + } + } else { + maxBackoffCount := int32(math.Max(1, math.Ceil( + math.Log2(r.config.MaxBackoffDuration.Seconds())))) + if r.backoffCounter < maxBackoffCount { + r.backoffCounter++ + backoffUpdated = true + } + } + r.backoffMtx.Unlock() + + if r.backoffCounter == 0 && backoffUpdated { + // exit backoff + count := r.perConnMaxInFlight() + r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) + for _, c := range r.conns() { + r.updateRDY(c, count) + } + } else if r.backoffCounter > 0 { + // start or continue backoff + backoffDuration := r.backoffDurationForCount(r.backoffCounter) + atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) + time.AfterFunc(backoffDuration, func() { + var choice *Conn + + atomic.StoreInt64(&r.backoffDuration, 0) + + // pick a random connection to test the waters + var i int + conns := r.conns() + if len(conns) == 0 { + return + } + idx := r.rng.Intn(len(conns)) + for _, c := range conns { + if i == idx { + choice = c + break + } + i++ + } + + r.log(LogLevelWarning, + "(%s) backoff timeout expired, sending RDY 1", + choice.String()) + // while in backoff only ever let 1 message at a time through + r.updateRDY(choice, 1) + }) + + r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", + backoffDuration.Seconds(), r.backoffCounter) + + // send RDY 0 immediately (to *all* connections) + for _, c := range r.conns() { + r.updateRDY(c, 0) + } + } } func (r *Consumer) onConnResponse(c *Conn, data []byte) { @@ -518,11 +593,9 @@ func (r *Consumer) onConnResponse(c *Conn, data []byte) { } } -func (r *Consumer) onConnError(c *Conn, data []byte) { -} +func (r *Consumer) onConnError(c *Conn, data []byte) {} -func (r *Consumer) onConnHeartbeat(c *Conn) { -} +func (r *Consumer) onConnHeartbeat(c *Conn) {} func (r *Consumer) onConnIOError(c *Conn, err error) { c.Close() @@ -606,123 +679,43 @@ func (r *Consumer) backoffDurationForCount(count int32) time.Duration { } func (r *Consumer) inBackoff() bool { - return atomic.LoadInt32(&r.backoffCounter) > 0 + r.backoffMtx.RLock() + backoffCounter := r.backoffCounter + r.backoffMtx.RUnlock() + return backoffCounter > 0 } func (r *Consumer) inBackoffBlock() bool { return atomic.LoadInt64(&r.backoffDuration) > 0 } -func (r *Consumer) rdyLoop() { - var rng = rand.New(rand.NewSource(time.Now().UnixNano())) - var backoffTimer *time.Timer - var backoffTimerChan <-chan time.Time - var backoffCounter int32 +func (r *Consumer) maybeUpdateRDY(conn *Conn) { + if r.inBackoff() || r.inBackoffBlock() { + return + } + + remain := conn.RDY() + lastRdyCount := conn.LastRDY() + count := r.perConnMaxInFlight() + // refill when at 1, or at 25%, or if connections have changed and we're imbalanced + if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { + r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", + conn, count, remain, lastRdyCount) + r.updateRDY(conn, count) + } else { + r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", + conn, count, remain, lastRdyCount) + } +} + +func (r *Consumer) rdyLoop() { redistributeTicker := time.NewTicker(5 * time.Second) for { select { - case <-backoffTimerChan: - var choice *Conn - - backoffTimer = nil - backoffTimerChan = nil - atomic.StoreInt64(&r.backoffDuration, 0) - - // pick a random connection to test the waters - var i int - conns := r.conns() - if len(conns) == 0 { - continue - } - idx := rng.Intn(len(conns)) - for _, c := range conns { - if i == idx { - choice = c - break - } - i++ - } - - r.log(LogLevelWarning, - "(%s) backoff timeout expired, sending RDY 1", - choice.String()) - // while in backoff only ever let 1 message at a time through - r.updateRDY(choice, 1) - case c := <-r.rdyChan: - if backoffTimer != nil || backoffCounter > 0 { - continue - } - - // send ready immediately - remain := c.RDY() - lastRdyCount := c.LastRDY() - count := r.perConnMaxInFlight() - // refill when at 1, or at 25%, or if connections have changed and we have too many RDY - if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { - r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", - c.String(), count, remain, lastRdyCount) - r.updateRDY(c, count) - } else { - r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", - c.String(), count, remain, lastRdyCount) - } - case success := <-r.backoffChan: - // prevent many async failures/successes from immediately resulting in - // max backoff/normal rate (by ensuring that we dont continually incr/decr - // the counter during a backoff period) - if backoffTimer != nil { - continue - } - - // update backoff state - backoffUpdated := false - if success { - if backoffCounter > 0 { - backoffCounter-- - backoffUpdated = true - } - } else { - maxBackoffCount := int32(math.Max(1, math.Ceil( - math.Log2(r.config.MaxBackoffDuration.Seconds())))) - if backoffCounter < maxBackoffCount { - backoffCounter++ - backoffUpdated = true - } - } - - if backoffUpdated { - atomic.StoreInt32(&r.backoffCounter, backoffCounter) - } - - // exit backoff - if backoffCounter == 0 && backoffUpdated { - count := r.perConnMaxInFlight() - r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count) - for _, c := range r.conns() { - r.updateRDY(c, count) - } - continue - } - - // start or continue backoff - if backoffCounter > 0 { - backoffDuration := r.backoffDurationForCount(backoffCounter) - atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds()) - backoffTimer = time.NewTimer(backoffDuration) - backoffTimerChan = backoffTimer.C - - r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0", - backoffDuration.Seconds(), backoffCounter) - - // send RDY 0 immediately (to *all* connections) - for _, c := range r.conns() { - r.updateRDY(c, 0) - } - } case <-redistributeTicker.C: - r.redistributeRDY(rng) + r.redistributeRDY() case <-r.exitChan: goto exit } @@ -730,9 +723,6 @@ func (r *Consumer) rdyLoop() { exit: redistributeTicker.Stop() - if backoffTimer != nil { - backoffTimer.Stop() - } r.log(LogLevelInfo, "rdyLoop exiting") r.wg.Done() } @@ -796,7 +786,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error { return nil } -func (r *Consumer) redistributeRDY(rng *rand.Rand) { +func (r *Consumer) redistributeRDY() { if r.inBackoffBlock() { return } @@ -839,7 +829,7 @@ func (r *Consumer) redistributeRDY(rng *rand.Rand) { for len(possibleConns) > 0 && availableMaxInFlight > 0 { availableMaxInFlight-- - i := rng.Int() % len(possibleConns) + i := r.rng.Int() % len(possibleConns) c := possibleConns[i] // delete possibleConns = append(possibleConns[:i], possibleConns[i+1:]...) diff --git a/mock_test.go b/mock_test.go index 8262bf55..9f62a005 100644 --- a/mock_test.go +++ b/mock_test.go @@ -242,17 +242,17 @@ func TestConsumerBackoff(t *testing.T) { fmt.Sprintf("FIN %s", msgIDGood), fmt.Sprintf("FIN %s", msgIDGood), "RDY 5", - fmt.Sprintf("REQ %s 0", msgIDBad), "RDY 0", - "RDY 1", fmt.Sprintf("REQ %s 0", msgIDBad), - "RDY 0", "RDY 1", - fmt.Sprintf("FIN %s", msgIDGood), "RDY 0", + fmt.Sprintf("REQ %s 0", msgIDBad), "RDY 1", + "RDY 0", fmt.Sprintf("FIN %s", msgIDGood), + "RDY 1", "RDY 5", + fmt.Sprintf("FIN %s", msgIDGood), } if len(n.got) != len(expected) { t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))