Skip to content

Commit

Permalink
Merge pull request #208 from mreiferson/active-redistribute
Browse files Browse the repository at this point in the history
consumer: redistribute RDY when connections are active
  • Loading branch information
mreiferson authored Jun 3, 2017
2 parents b2d0b44 + 325128c commit 3c0361b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
7 changes: 4 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ type Config struct {
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
// Duration to wait for a message from an nsqd when in a state where RDY
// counts are re-distributed (e.g. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

Expand Down
8 changes: 8 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Conn struct {
maxRdyCount int64
rdyCount int64
lastRdyCount int64
lastRdyTimestamp int64
lastMsgTimestamp int64

mtx sync.Mutex
Expand Down Expand Up @@ -213,6 +214,9 @@ func (c *Conn) LastRDY() int64 {
func (c *Conn) SetRDY(rdy int64) {
atomic.StoreInt64(&c.rdyCount, rdy)
atomic.StoreInt64(&c.lastRdyCount, rdy)
if rdy > 0 {
atomic.StoreInt64(&c.lastRdyTimestamp, time.Now().UnixNano())
}
}

// MaxRDY returns the nsqd negotiated maximum
Expand All @@ -221,6 +225,10 @@ func (c *Conn) MaxRDY() int64 {
return c.maxRdyCount
}

func (c *Conn) LastRdyTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.lastRdyTimestamp))
}

// LastMessageTime returns a time.Time representing
// the time at which the last message was received
func (c *Conn) LastMessageTime() time.Time {
Expand Down
12 changes: 9 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,12 +994,18 @@ func (r *Consumer) redistributeRDY() {
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
lastRdyDuration := time.Now().Sub(c.LastRdyTime())
rdyCount := c.RDY()
r.log(LogLevelDebug, "(%s) rdy: %d (last message received %s)",
c.String(), rdyCount, lastMsgDuration)
if rdyCount > 0 && lastMsgDuration > r.config.LowRdyIdleTimeout {
r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
r.updateRDY(c, 0)
if rdyCount > 0 {
if lastMsgDuration > r.config.LowRdyIdleTimeout {
r.log(LogLevelDebug, "(%s) idle connection, giving up RDY", c.String())
r.updateRDY(c, 0)
} else if lastRdyDuration > r.config.LowRdyTimeout {
r.log(LogLevelDebug, "(%s) RDY timeout, giving up RDY", c.String())
r.updateRDY(c, 0)
}
}
possibleConns = append(possibleConns, c)
}
Expand Down

0 comments on commit 3c0361b

Please sign in to comment.