Skip to content

Commit

Permalink
Merge pull request #5 from mreiferson/mif_5
Browse files Browse the repository at this point in the history
reader: goroutine safe max in flight
  • Loading branch information
jehiah committed Sep 10, 2013
2 parents 3d101e6 + d94edf9 commit 9b29b0e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
22 changes: 14 additions & 8 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (q *Reader) ConnectionMaxInFlight() int64 {
q.RLock()
defer q.RUnlock()

b := float64(q.maxInFlight)
b := float64(q.MaxInFlight())
s := b / float64(len(q.nsqConnections))
return int64(math.Min(math.Max(1, s), b))
}
Expand Down Expand Up @@ -332,14 +332,16 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) {
return
}

q.Lock()
if q.maxInFlight == maxInFlight {
q.Unlock()
return
}
q.maxInFlight = maxInFlight
q.Unlock()

q.RLock()
defer q.RUnlock()

for _, c := range q.nsqConnections {
c.tryUpdateRDY()
}
Expand All @@ -353,6 +355,8 @@ func (q *Reader) SetMaxBackoffDuration(duration time.Duration) {

// MaxInFlight returns the configured maximum number of messages to allow in-flight.
func (q *Reader) MaxInFlight() int {
q.RLock()
defer q.RUnlock()
return q.maxInFlight
}

Expand Down Expand Up @@ -535,9 +539,9 @@ func (q *Reader) ConnectToNSQ(addr string) error {
log.Printf("[%s] IDENTIFY response: %+v", connection, resp)

connection.maxRdyCount = resp.MaxRdyCount
if resp.MaxRdyCount < int64(q.maxInFlight) {
if resp.MaxRdyCount < int64(q.MaxInFlight()) {
log.Printf("[%s] max RDY count %d < reader max in flight %d, truncation possible",
connection, resp.MaxRdyCount, q.maxInFlight)
connection, resp.MaxRdyCount, q.MaxInFlight())
}

if resp.TLSv1 {
Expand Down Expand Up @@ -883,7 +887,7 @@ func (q *Reader) updateRDY(c *nsqConn, count int64) error {

// never exceed our global max in flight. truncate if possible.
// this could help a new connection get partial max-in-flight
maxPossibleRdy := int64(q.maxInFlight) - atomic.LoadInt64(&q.totalRdyCount) + atomic.LoadInt64(&c.rdyCount)
maxPossibleRdy := int64(q.MaxInFlight()) - atomic.LoadInt64(&q.totalRdyCount) + atomic.LoadInt64(&c.rdyCount)
if maxPossibleRdy > 0 && maxPossibleRdy < count {
count = maxPossibleRdy
}
Expand Down Expand Up @@ -928,11 +932,13 @@ func (q *Reader) redistributeRdyState() {
q.RLock()
l := len(q.nsqConnections)
q.RUnlock()
if l <= q.maxInFlight {

maxInFlight := q.MaxInFlight()
if l <= maxInFlight {
continue
}

log.Printf("redistributing ready state (%d conns > %d max_in_flight)", l, q.maxInFlight)
log.Printf("redistributing ready state (%d conns > %d max_in_flight)", l, maxInFlight)
q.RLock()
possibleConns := make([]*nsqConn, 0, len(q.nsqConnections))
for _, c := range q.nsqConnections {
Expand All @@ -948,7 +954,7 @@ func (q *Reader) redistributeRdyState() {
}
possibleConns = append(possibleConns, c)
}
availableMaxInFlight := int64(q.maxInFlight) - atomic.LoadInt64(&q.totalRdyCount)
availableMaxInFlight := int64(maxInFlight) - atomic.LoadInt64(&q.totalRdyCount)
for len(possibleConns) > 0 && availableMaxInFlight > 0 {
availableMaxInFlight--
i := rand.Int() % len(possibleConns)
Expand Down
8 changes: 4 additions & 4 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (w *Writer) Stop() {
// PublishAsync publishes a message body to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Writer eventually receives the response from `nsqd`, the supplied `doneChan`
// will receive a `WriterTransaction` instance with the supplied variadic arguments
// When the Writer eventually receives the response from `nsqd`, the supplied `doneChan`
// will receive a `WriterTransaction` instance with the supplied variadic arguments
// (and the response `FrameType`, `Data`, and `Error`)
func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTransaction, args ...interface{}) error {
return w.sendCommandAsync(Publish(topic, body), doneChan, args)
Expand All @@ -103,8 +103,8 @@ func (w *Writer) PublishAsync(topic string, body []byte, doneChan chan *WriterTr
// MultiPublishAsync publishes a slice of message bodies to the specified topic
// but does not wait for the response from `nsqd`.
//
// When the Writer eventually receives the response from `nsqd`, the supplied `doneChan`
// will receive a `WriterTransaction` instance with the supplied variadic arguments
// When the Writer eventually receives the response from `nsqd`, the supplied `doneChan`
// will receive a `WriterTransaction` instance with the supplied variadic arguments
// (and the response `FrameType`, `Data`, and `Error`)
func (w *Writer) MultiPublishAsync(topic string, body [][]byte, doneChan chan *WriterTransaction, args ...interface{}) error {
cmd, err := MultiPublish(topic, body)
Expand Down

0 comments on commit 9b29b0e

Please sign in to comment.