diff --git a/async_producer.go b/async_producer.go index 2ca97121a..45918e9ee 100644 --- a/async_producer.go +++ b/async_producer.go @@ -684,6 +684,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + closeBroker: make(chan struct{}), stopchan: make(chan struct{}), buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), @@ -724,6 +725,22 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { close(responses) }) + // use a dedicated goroutine to close the broker instead of inside handleError + // this is because the AsyncProduce callback inside the bridge is invoked from the broker + // responseReceiver goroutine and closing the broker requires for such goroutine to be finished + // therefore leading to a deadlock + go withRecover(func() { + for range bp.closeBroker { + if err := bp.broker.Close(); err != nil { + Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err) + } else { + Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID()) + } + } + // Signal that we are done + close(bp.stopchan) + }) + if p.conf.Producer.Retry.Max <= 0 { bp.abandoned = make(chan struct{}) } @@ -743,11 +760,12 @@ type brokerProducer struct { parent *asyncProducer broker *Broker - input chan *ProducerMessage - output chan<- *produceSet - responses <-chan *brokerProducerResponse - abandoned chan struct{} - stopchan chan struct{} + input chan *ProducerMessage + output chan<- *produceSet + responses <-chan *brokerProducerResponse + closeBroker chan struct{} + abandoned chan struct{} + stopchan chan struct{} buffer *produceSet timer <-chan time.Time @@ -830,10 +848,6 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } - case <-bp.stopchan: - Logger.Printf( - "producer/broker/%d run loop asked to stop\n", bp.broker.ID()) - return } if bp.timerFired || bp.buffer.readyToFlush() { @@ -854,10 +868,15 @@ func (bp *brokerProducer) shutdown() { } } close(bp.output) + // Drain responses from bridge goroutine for response := range bp.responses { bp.handleResponse(response) } - close(bp.stopchan) + // Ask for the closeBroker goroutine to stop + close(bp.closeBroker) + // And wait for it to be done + <-bp.stopchan + // No more brokerProducer related goroutine should be running Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } @@ -1028,7 +1047,12 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { default: Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) bp.parent.abandonBrokerConnection(bp.broker) - _ = bp.broker.Close() + // We only try to close the broker once + if bp.closing == nil { + // Request the closeBroker goroutine to close the broker for us + // because calling bp.broker.Close here can lead to a deadlock + bp.closeBroker <- struct{}{} + } bp.closing = err sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) diff --git a/async_producer_test.go b/async_producer_test.go index e571aa068..01a227d9b 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -643,6 +643,55 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { } } +// https://github.com/Shopify/sarama/issues/2129 +func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { + //Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + + // The seed broker only handles Metadata request + seedBroker.setHandler(func(req *request) (res encoderWithHeader) { + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) + return metadataLeader + }) + + // Simulate a slow broker by taking ~200ms to handle requests + // therefore triggering the read timeout and the retry logic + leader.setHandler(func(req *request) (res encoderWithHeader) { + time.Sleep(200 * time.Millisecond) + // Will likely not be read by the producer (read timeout) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + return prodSuccess + }) + + config := NewTestConfig() + // Use very short read to simulate read error on unresponsive broker + config.Net.ReadTimeout = 50 * time.Millisecond + // Flush every record to generate N in-flight Produce requests + config.Producer.Flush.Messages = 1 + config.Producer.Return.Successes = true + // Reduce retries to speed up the test while keeping the default backoff + config.Producer.Retry.Max = 1 + config.Net.MaxOpenRequests = 1 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + expectResults(t, producer, 0, 10) + + seedBroker.Close() + leader.Close() + closeProducer(t, producer) +} + func TestAsyncProducerOutOfRetries(t *testing.T) { t.Skip("Enable once bug #294 is fixed.") @@ -1249,9 +1298,11 @@ func TestBrokerProducerShutdown(t *testing.T) { addr: mockBroker.Addr(), id: mockBroker.BrokerID(), } - bp := producer.(*asyncProducer).newBrokerProducer(broker) + // Starts various goroutines in newBrokerProducer + bp := producer.(*asyncProducer).getBrokerProducer(broker) + // Initiate the shutdown of all of them + producer.(*asyncProducer).unrefBrokerProducer(broker, bp) - bp.shutdown() _ = producer.Close() mockBroker.Close() } diff --git a/broker.go b/broker.go index a22efcaca..c60e9a044 100644 --- a/broker.go +++ b/broker.go @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error) // When configured with RequiredAcks == NoResponse, the callback will not be invoked. // If an error is returned because the request could not be sent then the callback // will not be invoked either. +// +// Make sure not to Close the broker in the callback as it will lead to a deadlock. func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error { needAcks := request.RequiredAcks != NoResponse // Use a nil promise when no acks is required