Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: send RDY before FIN/REQ #83

Merged
merged 1 commit into from
Sep 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,6 @@ func (c *Conn) writeLoop() {
// Decrement this here so it is correct even if we can't respond to nsqd
msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)

err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}

if resp.success {
c.log(LogLevelDebug, "FIN %s", resp.msg.ID)
c.delegate.OnMessageFinished(c, resp.msg)
Expand All @@ -558,6 +551,13 @@ func (c *Conn) writeLoop() {
}
}

err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}

if msgsInFlight == 0 &&
atomic.LoadInt32(&c.closeFlag) == 1 {
c.close()
Expand Down
242 changes: 116 additions & 126 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ type Consumer struct {
channel string
config Config

backoffChan chan bool
rdyChan chan *Conn
rng *rand.Rand

needRDYRedistributed int32
backoffCounter int32

backoffMtx sync.RWMutex
backoffCounter int32

incomingMessages chan *Message

Expand Down Expand Up @@ -145,8 +147,8 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error
connections: make(map[string]*Conn),

lookupdRecheckChan: make(chan int, 1),
backoffChan: make(chan bool),
rdyChan: make(chan *Conn, 1),

rng: rand.New(rand.NewSource(time.Now().UnixNano())),

StopChan: make(chan int),
exitChan: make(chan int),
Expand Down Expand Up @@ -219,7 +221,7 @@ func (r *Consumer) ChangeMaxInFlight(maxInFlight int) {
atomic.StoreInt32(&r.maxInFlight, int32(maxInFlight))

for _, c := range r.conns() {
r.rdyChan <- c
r.maybeUpdateRDY(c)
}
}

Expand Down Expand Up @@ -296,10 +298,9 @@ func validatedLookupAddr(addr string) error {

// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
jitter := time.Duration(int64(rng.Float64() *
jitter := time.Duration(int64(r.rng.Float64() *
r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
ticker := time.NewTicker(r.config.LookupdPollInterval)

Expand Down Expand Up @@ -478,7 +479,7 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.rdyChan <- c
r.maybeUpdateRDY(c)
}

return nil
Expand All @@ -488,7 +489,7 @@ func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.rdyChan <- c
r.maybeUpdateRDY(c)
}

func (r *Consumer) onConnMessageFinished(c *Conn, msg *Message) {
Expand All @@ -500,11 +501,85 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) {
}

func (r *Consumer) onConnBackoff(c *Conn) {
r.backoffChan <- false
r.startStopContinueBackoff(c, false)
}

func (r *Consumer) onConnResume(c *Conn) {
r.backoffChan <- true
r.startStopContinueBackoff(c, true)
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, success bool) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if r.inBackoffBlock() {
return
}

// update backoff state
r.backoffMtx.Lock()
backoffUpdated := false
if success {
if r.backoffCounter > 0 {
r.backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if r.backoffCounter < maxBackoffCount {
r.backoffCounter++
backoffUpdated = true
}
}
r.backoffMtx.Unlock()

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.backoffDurationForCount(r.backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
time.AfterFunc(backoffDuration, func() {
var choice *Conn

atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
return
}
idx := r.rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
})

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), r.backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
Expand All @@ -518,11 +593,9 @@ func (r *Consumer) onConnResponse(c *Conn, data []byte) {
}
}

func (r *Consumer) onConnError(c *Conn, data []byte) {
}
func (r *Consumer) onConnError(c *Conn, data []byte) {}

func (r *Consumer) onConnHeartbeat(c *Conn) {
}
func (r *Consumer) onConnHeartbeat(c *Conn) {}

func (r *Consumer) onConnIOError(c *Conn, err error) {
c.Close()
Expand Down Expand Up @@ -606,133 +679,50 @@ func (r *Consumer) backoffDurationForCount(count int32) time.Duration {
}

func (r *Consumer) inBackoff() bool {
return atomic.LoadInt32(&r.backoffCounter) > 0
r.backoffMtx.RLock()
backoffCounter := r.backoffCounter
r.backoffMtx.RUnlock()
return backoffCounter > 0
}

func (r *Consumer) inBackoffBlock() bool {
return atomic.LoadInt64(&r.backoffDuration) > 0
}

func (r *Consumer) rdyLoop() {
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
var backoffTimer *time.Timer
var backoffTimerChan <-chan time.Time
var backoffCounter int32
func (r *Consumer) maybeUpdateRDY(conn *Conn) {
if r.inBackoff() || r.inBackoffBlock() {
return
}

remain := conn.RDY()
lastRdyCount := conn.LastRDY()
count := r.perConnMaxInFlight()

// refill when at 1, or at 25%, or if connections have changed and we're imbalanced
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
conn, count, remain, lastRdyCount)
}
}

func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(5 * time.Second)

for {
select {
case <-backoffTimerChan:
var choice *Conn

backoffTimer = nil
backoffTimerChan = nil
atomic.StoreInt64(&r.backoffDuration, 0)

// pick a random connection to test the waters
var i int
conns := r.conns()
if len(conns) == 0 {
continue
}
idx := rng.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())
// while in backoff only ever let 1 message at a time through
r.updateRDY(choice, 1)
case c := <-r.rdyChan:
if backoffTimer != nil || backoffCounter > 0 {
continue
}

// send ready immediately
remain := c.RDY()
lastRdyCount := c.LastRDY()
count := r.perConnMaxInFlight()
// refill when at 1, or at 25%, or if connections have changed and we have too many RDY
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
c.String(), count, remain, lastRdyCount)
r.updateRDY(c, count)
} else {
r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)",
c.String(), count, remain, lastRdyCount)
}
case success := <-r.backoffChan:
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
if backoffTimer != nil {
continue
}

// update backoff state
backoffUpdated := false
if success {
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
} else {
maxBackoffCount := int32(math.Max(1, math.Ceil(
math.Log2(r.config.MaxBackoffDuration.Seconds()))))
if backoffCounter < maxBackoffCount {
backoffCounter++
backoffUpdated = true
}
}

if backoffUpdated {
atomic.StoreInt32(&r.backoffCounter, backoffCounter)
}

// exit backoff
if backoffCounter == 0 && backoffUpdated {
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
continue
}

// start or continue backoff
if backoffCounter > 0 {
backoffDuration := r.backoffDurationForCount(backoffCounter)
atomic.StoreInt64(&r.backoffDuration, backoffDuration.Nanoseconds())
backoffTimer = time.NewTimer(backoffDuration)
backoffTimerChan = backoffTimer.C

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
case <-redistributeTicker.C:
r.redistributeRDY(rng)
r.redistributeRDY()
case <-r.exitChan:
goto exit
}
}

exit:
redistributeTicker.Stop()
if backoffTimer != nil {
backoffTimer.Stop()
}
r.log(LogLevelInfo, "rdyLoop exiting")
r.wg.Done()
}
Expand Down Expand Up @@ -796,7 +786,7 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
return nil
}

func (r *Consumer) redistributeRDY(rng *rand.Rand) {
func (r *Consumer) redistributeRDY() {
if r.inBackoffBlock() {
return
}
Expand Down Expand Up @@ -839,7 +829,7 @@ func (r *Consumer) redistributeRDY(rng *rand.Rand) {

for len(possibleConns) > 0 && availableMaxInFlight > 0 {
availableMaxInFlight--
i := rng.Int() % len(possibleConns)
i := r.rng.Int() % len(possibleConns)
c := possibleConns[i]
// delete
possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
Expand Down
8 changes: 4 additions & 4 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,17 @@ func TestConsumerBackoff(t *testing.T) {
fmt.Sprintf("FIN %s", msgIDGood),
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 5",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 0",
"RDY 1",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 0",
"RDY 1",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDBad),
"RDY 1",
"RDY 0",
fmt.Sprintf("FIN %s", msgIDGood),
"RDY 1",
"RDY 5",
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
Expand Down