From 6b85c47d32cb5f435a74610adfaa07b891b2aadc Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Mon, 18 May 2015 15:10:04 -0400 Subject: [PATCH] Fix another hypothetical wait group issue Don't increment and immediately decrement the wait group on a message which we just return with ErrShuttingDown. Otherwise: ``` prod.AsyncClose() prod.Input() <- msg ``` 1. shutdown() goroutine calls Wait() 2. while in the call to Wait(), Go switches contexts 3. the message is sent on Input(), is received, and causes the wait counter to increment from 0 while being waited on Where the WaitGroup docs say "calls with a positive delta that occur when the counter is zero must happen before a Wait". --- async_producer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index 6378ae5c5..aced88631 100644 --- a/async_producer.go +++ b/async_producer.go @@ -224,11 +224,18 @@ func (p *asyncProducer) topicDispatcher() { p.inFlight.Done() continue } else if msg.retries == 0 { - p.inFlight.Add(1) if shuttingDown { - p.returnError(msg, ErrShuttingDown) + // we can't just call returnError here because that decrements the wait group, + // which hasn't been incremented yet for this message, and shouldn't be + pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown} + if p.conf.Producer.Return.Errors { + p.errors <- pErr + } else { + Logger.Println(pErr) + } continue } + p.inFlight.Add(1) } if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||