Skip to content

Commit

Permalink
consumer: send RDY before FIN/REQ
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Sep 12, 2014
1 parent 00dbee1 commit aa59e83
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 137 deletions.
14 changes: 7 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,6 @@ func (c *Conn) writeLoop() {
// Decrement this here so it is correct even if we can't respond to nsqd
msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}

if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
Expand All @@ -558,6 +551,13 @@ func (c *Conn) writeLoop() {
}
}

err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}

if msgsInFlight == 0 &&
atomic.LoadInt32(&c.closeFlag) == 1 {
c.close()
Expand Down
242 changes: 116 additions & 126 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ type Consumer struct {
channel string
config Config

backoffChan chan bool
rdyChan chan *Conn
rng *rand.Rand

needRDYRedistributed int32
backoffCounter int32

backoffMtx sync.RWMutex
backoffCounter int32

incomingMessages chan *Message

Expand Down Expand Up @@ -145,8 +147,8 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error
connections: make(map[string]*Conn),

lookupdRecheckChan: make(chan int, 1),
backoffChan: make(chan bool),
rdyChan: make(chan *Conn, 1),

rng: rand.New(rand.NewSource(time.Now().UnixNano())),

StopChan: make(chan int),
exitChan: make(chan int),
Expand Down Expand Up @@ -219,7 +221,7 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
atomic.StoreInt32(&r.maxInFlight, int32(maxInFlight))

for _, c := range r.conns() {
r.rdyChan <- c
r.maybeUpdateRDY(c)
}
}

Expand Down Expand Up @@ -296,10 +298,9 @@ func validatedLookupAddr(addr string) error {

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
jitter := time.Duration(int64(rng.Float64() *
jitter := time.Duration(int64(r.rng.Float64() *
r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
ticker := time.NewTicker(r.config.LookupdPollInterval)

Expand Down Expand Up @@ -478,7 +479,7 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.rdyChan <- c
r.maybeUpdateRDY(c)
}

return nil
Expand All @@ -488,7 +489,7 @@ func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.rdyChan <- c
r.maybeUpdateRDY(c)
}

func (r *Consumer) onConnMessageFinished(c *Conn, msg *Message) {
Expand All @@ -500,11 +501,85 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) {
}

func (r *Consumer) onConnBackoff(c *Conn) {
r.backoffChan <- false
r.startStopContinueBackoff(c, false)
}

func (r *Consumer) onConnResume(c *Conn) {
r.backoffChan <- true
r.startStopContinueBackoff(c, true)
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if r.inBackoffBlock() {
return
}

// update backoff state
r.backoffMtx.Lock()
backoffUpdated := false
if success {
if r.backoffCounter > 0 {
r.backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if r.backoffCounter < maxBackoffCount {
r.backoffCounter++
backoffUpdated = true
}
}
r.backoffMtx.Unlock()

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, func() {
var choice *Conn

atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
return
}
idx := r.rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
})

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand All @@ -518,11 +593,9 @@ func (r *Consumer) onConnResponse(c *Conn, data []byte) {
}
}

func (r *Consumer) onConnError(c *Conn, data []byte) {
}
func (r *Consumer) onConnError(c *Conn, data []byte) {}

func (r *Consumer) onConnHeartbeat(c *Conn) {
}
func (r *Consumer) onConnHeartbeat(c *Conn) {}

func (r *Consumer) onConnIOError(c *Conn, err error) {
c.Close()
Expand Down Expand Up @@ -606,133 +679,50 @@ func (r *Consumer) backoffDurationForCount(count int32) time.Duration {
}

func (r *Consumer) inBackoff() bool {
return atomic.LoadInt32(&r.backoffCounter) > 0
r.backoffMtx.RLock()
backoffCounter := r.backoffCounter
r.backoffMtx.RUnlock()
return backoffCounter > 0
}

func (r *Consumer) inBackoffBlock() bool {
return atomic.LoadInt64(&r.backoffDuration) > 0
}

func (r *Consumer) rdyLoop() {
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
var backoffTimer *time.Timer
var backoffTimerChan <-chan time.Time
var backoffCounter int32
func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffBlock() {
return
}

remain := conn.RDY()
lastRdyCount := conn.LastRDY()
count := r.perConnMaxInFlight()

// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
conn, count, remain, lastRdyCount)
}
}

func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(5 * time.Second)

for {
select {
case <-backoffTimerChan:
var choice *Conn

backoffTimer = nil
backoffTimerChan = nil
atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
continue
}
idx := rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
case c := <-r.rdyChan:
if backoffTimer != nil || backoffCounter > 0 {
continue
}

// send ready immediately
remain := c.RDY()
lastRdyCount := c.LastRDY()
count := r.perConnMaxInFlight()
// refill when at 1, or at 25%, or if connections have changed and we have too many RDY
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
c.String(), count, remain, lastRdyCount)
r.updateRDY(c, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
c.String(), count, remain, lastRdyCount)
}
case success := <-r.backoffChan:
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if backoffTimer != nil {
continue
}

// update backoff state
backoffUpdated := false
if success {
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if backoffCounter < maxBackoffCount {
backoffCounter++
backoffUpdated = true
}
}

if backoffUpdated {
atomic.StoreInt32(&r.backoffCounter, backoffCounter)
}

// exit backoff
if backoffCounter == 0 && backoffUpdated {
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
continue
}

// start or continue backoff
if backoffCounter > 0 {
backoffDuration := r.backoffDurationForCount(backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
backoffTimer = time.NewTimer(backoffDuration)
backoffTimerChan = backoffTimer.C

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
case <-redistributeTicker.C:
r.redistributeRDY(rng)
r.redistributeRDY()
case <-r.exitChan:
goto exit
}
}

exit:
redistributeTicker.Stop()
if backoffTimer != nil {
backoffTimer.Stop()
}
r.log(LogLevelInfo, "rdyLoop exiting")
r.wg.Done()
}
Expand Down Expand Up @@ -796,7 +786,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
return nil
}

func (r *Consumer) redistributeRDY(rng *rand.Rand) {
func (r *Consumer) redistributeRDY() {
if r.inBackoffBlock() {
return
}
Expand Down Expand Up @@ -839,7 +829,7 @@ func (r *Consumer) redistributeRDY(rng *rand.Rand) {

for len(possibleConns) > 0 && availableMaxInFlight > 0 {
availableMaxInFlight--
i := rng.Int() % len(possibleConns)
i := r.rng.Int() % len(possibleConns)
c := possibleConns[i]
// delete
possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
Expand Down
8 changes: 4 additions & 4 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,17 @@ func TestConsumerBackoff(t *testing.T) {
fmt.Sprintf("FIN %s", msgIDGood),
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 5",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 0",
"RDY 1",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 0",
"RDY 1",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 1",
"RDY 0",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 1",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
Expand Down

0 comments on commit aa59e83

Please sign in to comment.