diff --git a/consumer.go b/consumer.go index 2642ace8..68e1803e 100644 --- a/consumer.go +++ b/consumer.go @@ -106,7 +106,8 @@ type Consumer struct { channel string config Config - rng *rand.Rand + rngMtx sync.Mutex + rng *rand.Rand needRDYRedistributed int32 @@ -369,8 +370,10 @@ func validatedLookupAddr(addr string) error { func (r *Consumer) lookupdLoop() { // add some jitter so that multiple consumers discovering the same topic, // when restarted at the same time, dont all connect at once. + r.rngMtx.Lock() jitter := time.Duration(int64(r.rng.Float64() * r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval))) + r.rngMtx.Unlock() var ticker *time.Ticker select { @@ -830,7 +833,9 @@ func (r *Consumer) resume() { r.backoff(time.Second) return } + r.rngMtx.Lock() idx := r.rng.Intn(len(conns)) + r.rngMtx.Unlock() choice := conns[idx] r.log(LogLevelWarning, @@ -1006,7 +1011,9 @@ func (r *Consumer) redistributeRDY() { for len(possibleConns) > 0 && availableMaxInFlight > 0 { availableMaxInFlight-- + r.rngMtx.Lock() i := r.rng.Int() % len(possibleConns) + r.rngMtx.Unlock() c := possibleConns[i] // delete possibleConns = append(possibleConns[:i], possibleConns[i+1:]...)