Skip to content

Commit

Permalink
Merge pull request #156 from jamesgroat/consumer-rng-race
Browse files Browse the repository at this point in the history
Add mutex around rng *rand.Rand to avoid data race on resume
  • Loading branch information
mreiferson committed Sep 2, 2015
2 parents b40b4e0 + 3c0c5ed commit 383c07c
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ type Consumer struct {
channel string
config Config

rng *rand.Rand
rngMtx sync.Mutex
rng *rand.Rand

needRDYRedistributed int32

Expand Down Expand Up @@ -369,8 +370,10 @@ func validatedLookupAddr(addr string) error {
func (r *Consumer) lookupdLoop() {
// add some jitter so that multiple consumers discovering the same topic,
// when restarted at the same time, dont all connect at once.
r.rngMtx.Lock()
jitter := time.Duration(int64(r.rng.Float64() *
r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
r.rngMtx.Unlock()
var ticker *time.Ticker

select {
Expand Down Expand Up @@ -830,7 +833,9 @@ func (r *Consumer) resume() {
r.backoff(time.Second)
return
}
r.rngMtx.Lock()
idx := r.rng.Intn(len(conns))
r.rngMtx.Unlock()
choice := conns[idx]

r.log(LogLevelWarning,
Expand Down Expand Up @@ -1006,7 +1011,9 @@ func (r *Consumer) redistributeRDY() {

for len(possibleConns) > 0 && availableMaxInFlight > 0 {
availableMaxInFlight--
r.rngMtx.Lock()
i := r.rng.Int() % len(possibleConns)
r.rngMtx.Unlock()
c := possibleConns[i]
// delete
possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)
Expand Down

0 comments on commit 383c07c

Please sign in to comment.