Skip to content

Commit

Permalink
consumer: give up RDY when conn exceeds LowRdyTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Apr 16, 2017
1 parent ebd4ccb commit 325128c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
7 changes: 4 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ type Config struct {
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
// Duration to wait for a message from an nsqd when in a state where RDY
// counts are re-distributed (e.g. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

Expand Down
12 changes: 9 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,12 +994,18 @@ func (r *Consumer) redistributeRDY() {
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
lastRdyDuration := time.Now().Sub(c.LastRdyTime())
rdyCount := c.RDY()
r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
c.String(), rdyCount, lastMsgDuration)
if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
r.updateRDY(c, 0)
if rdyCount > 0 {
if lastMsgDuration > r.config.LowRdyIdleTimeout {
r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
r.updateRDY(c, 0)
} else if lastRdyDuration > r.config.LowRdyTimeout {
r.log(LogLevelDebug, "(%s) RDY timeout, giving up RDY", c.String())
r.updateRDY(c, 0)
}
}
possibleConns = append(possibleConns, c)
}
Expand Down

0 comments on commit 325128c

Please sign in to comment.