Skip to content

Commit

Permalink
re-org/go fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 25, 2015
1 parent 4796be8 commit 31cb33f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 99 deletions.
188 changes: 94 additions & 94 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ type ConsumerStats struct {

var instCount int64

type backoffSignal int

const (
backoffFlag backoffSignal = iota
continueFlag
resumeFlag
)

// Consumer is a high-level type to consume from NSQ.
//
// A Consumer instance is supplied a Handler that will be executed
Expand Down Expand Up @@ -633,13 +641,6 @@ func (r *Consumer) onConnMessageRequeued(c *Conn, msg *Message) {
atomic.AddUint64(&r.messagesRequeued, 1)
}

type backoffSignal int
const (
backoffFlag backoffSignal = iota
continueFlag
resumeFlag
)

func (r *Consumer) onConnBackoff(c *Conn) {
r.startStopContinueBackoff(c, backoffFlag)
}
Expand All @@ -652,93 +653,6 @@ func (r *Consumer) onConnResume(c *Conn) {
r.startStopContinueBackoff(c, resumeFlag)
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
r.backoffMtx.Lock()
defer r.backoffMtx.Unlock()
if r.inBackoffTimeout() {

return
}

// update backoff state
backoffUpdated := false
backoffCounter := atomic.LoadInt32(&r.backoffCounter)
switch signal {
case resumeFlag:
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
case backoffFlag:
nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1)
if nextBackoff <= r.config.MaxBackoffDuration {
backoffCounter++
backoffUpdated = true
}
}
atomic.StoreInt32(&r.backoffCounter, backoffCounter)

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))
r.backoff(backoffDuration)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
}

func (r *Consumer) backoff(d time.Duration) {
atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds())
time.AfterFunc(d, r.resume)
}

func (r *Consumer) resume() {
if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.backoff(time.Second)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())

// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.backoff(time.Second)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) onConnResponse(c *Conn, data []byte) {
switch {
case bytes.Equal(data, []byte("CLOSE_WAIT")):
Expand Down Expand Up @@ -837,6 +751,92 @@ func (r *Consumer) onConnClose(c *Conn) {
}
}

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
// prevent many async failures/successes from immediately resulting in
// max backoff/normal rate (by ensuring that we dont continually incr/decr
// the counter during a backoff period)
r.backoffMtx.Lock()
defer r.backoffMtx.Unlock()
if r.inBackoffTimeout() {
return
}

// update backoff state
backoffUpdated := false
backoffCounter := atomic.LoadInt32(&r.backoffCounter)
switch signal {
case resumeFlag:
if backoffCounter > 0 {
backoffCounter--
backoffUpdated = true
}
case backoffFlag:
nextBackoff := r.config.BackoffStrategy.Calculate(int(backoffCounter) + 1)
if nextBackoff <= r.config.MaxBackoffDuration {
backoffCounter++
backoffUpdated = true
}
}
atomic.StoreInt32(&r.backoffCounter, backoffCounter)

if r.backoffCounter == 0 && backoffUpdated {
// exit backoff
count := r.perConnMaxInFlight()
r.log(LogLevelWarning, "exiting backoff, returning all to RDY %d", count)
for _, c := range r.conns() {
r.updateRDY(c, count)
}
} else if r.backoffCounter > 0 {
// start or continue backoff
backoffDuration := r.config.BackoffStrategy.Calculate(int(backoffCounter))
r.backoff(backoffDuration)

r.log(LogLevelWarning, "backing off for %.04f seconds (backoff level %d), setting all to RDY 0",
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
for _, c := range r.conns() {
r.updateRDY(c, 0)
}
}
}

func (r *Consumer) backoff(d time.Duration) {
atomic.StoreInt64(&r.backoffDuration, d.Nanoseconds())
time.AfterFunc(d, r.resume)
}

func (r *Consumer) resume() {
if atomic.LoadInt32(&r.stopFlag) == 1 {
atomic.StoreInt64(&r.backoffDuration, 0)
return
}

// pick a random connection to test the waters
conns := r.conns()
if len(conns) == 0 {
// backoff again
r.backoff(time.Second)
return
}
idx := r.rng.Intn(len(conns))
choice := conns[idx]

r.log(LogLevelWarning,
"(%s) backoff timeout expired, sending RDY 1",
choice.String())

// while in backoff only ever let 1 message at a time through
err := r.updateRDY(choice, 1)
if err != nil {
r.log(LogLevelWarning, "(%s) error updating RDY - %s", choice.String(), err)
r.backoff(time.Second)
return
}

atomic.StoreInt64(&r.backoffDuration, 0)
}

func (r *Consumer) inBackoff() bool {
return atomic.LoadInt32(&r.backoffCounter) > 0
}
Expand Down
8 changes: 3 additions & 5 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (h *testHandler) HandleMessage(message *Message) error {
if bytes.Equal(message.Body, []byte("requeue_no_backoff_1")) {
if message.Attempts > 1 {
return nil
}
}
message.RequeueWithoutBackoff(-1)
return nil
}
Expand Down Expand Up @@ -271,11 +271,9 @@ func TestConsumerBackoff(t *testing.T) {
}
}


func TestConsumerRequeueNoBackoff(t *testing.T) {
// logger := log.New(ioutil.Discard, "", log.LstdFlags)


msgIDGood := MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeue := MessageID{'r', 'e', 'q', 'v', 'b', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
msgIDRequeueNoBackoff := MessageID{'r', 'e', 'q', 'n', 'b', 'a', 'c', 'k', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
Expand Down Expand Up @@ -310,9 +308,9 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
}

select {
case <-n.exitChan:
case <-n.exitChan:
log.Printf("clean exit")
case <- time.After(500 * time.Millisecond):
case <-time.After(500 * time.Millisecond):
log.Printf("timeout")
}

Expand Down

0 comments on commit 31cb33f

Please sign in to comment.