Skip to content

Commit

Permalink
Fix deadlock when closing Broker in brokerProducer
Browse files Browse the repository at this point in the history
- add unit test to reproduce the deadlock by simulating a network error
- document possible deadlock when closing the Broker from an AsyncProduce
  callback when handling a response error
- add closeBroker goroutine and channel to asynchronously close a Broker
  once
- reuse the stopchan channel to signal that the closeBroker goroutine is
  done
- update TestBrokerProducerShutdown to check goroutine leak by closing
  the input vs the stopchan channel
- fixes #2129
  • Loading branch information
slaunay committed Feb 8, 2022
1 parent a5fdd87 commit 59dd565
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 13 deletions.
46 changes: 35 additions & 11 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
closeBroker: make(chan struct{}),
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
Expand Down Expand Up @@ -724,6 +725,22 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
close(responses)
})

// use a dedicated goroutine to close the broker instead of inside handleError
// this is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires for such goroutine to be finished
// therefore leading to a deadlock
go withRecover(func() {
for range bp.closeBroker {
if err := bp.broker.Close(); err != nil {
Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err)
} else {
Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID())
}
}
// Signal that we are done
close(bp.stopchan)
})

if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
Expand All @@ -743,11 +760,12 @@ type brokerProducer struct {
parent *asyncProducer
broker *Broker

input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
stopchan chan struct{}
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
closeBroker chan struct{}
abandoned chan struct{}
stopchan chan struct{}

buffer *produceSet
timer <-chan time.Time
Expand Down Expand Up @@ -830,10 +848,6 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-bp.stopchan:
Logger.Printf(
"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
return
}

if bp.timerFired || bp.buffer.readyToFlush() {
Expand All @@ -854,10 +868,15 @@ func (bp *brokerProducer) shutdown() {
}
}
close(bp.output)
// Drain responses from bridge goroutine
for response := range bp.responses {
bp.handleResponse(response)
}
close(bp.stopchan)
// Ask for the closeBroker goroutine to stop
close(bp.closeBroker)
// And wait for it to be done
<-bp.stopchan
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

Expand Down Expand Up @@ -1028,7 +1047,12 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
default:
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
// We only try to close the broker once
if bp.closing == nil {
// Request the closeBroker goroutine to close the broker for us
// because calling bp.broker.Close here can lead to a deadlock
bp.closeBroker <- struct{}{}
}
bp.closing = err
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
Expand Down
55 changes: 53 additions & 2 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,55 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
//Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

// Simulate a slow broker by taking ~200ms to handle requests
// therefore triggering the read timeout and the retry logic
leader.setHandler(func(req *request) (res encoderWithHeader) {
time.Sleep(200 * time.Millisecond)
// Will likely not be read by the producer (read timeout)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

config := NewTestConfig()
// Use very short read to simulate read error on unresponsive broker
config.Net.ReadTimeout = 50 * time.Millisecond
// Flush every record to generate N in-flight Produce requests
config.Producer.Flush.Messages = 1
config.Producer.Return.Successes = true
// Reduce retries to speed up the test while keeping the default backoff
config.Producer.Retry.Max = 1
config.Net.MaxOpenRequests = 1
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

expectResults(t, producer, 0, 10)

seedBroker.Close()
leader.Close()
closeProducer(t, producer)
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down Expand Up @@ -1249,9 +1298,11 @@ func TestBrokerProducerShutdown(t *testing.T) {
addr: mockBroker.Addr(),
id: mockBroker.BrokerID(),
}
bp := producer.(*asyncProducer).newBrokerProducer(broker)
// Starts various goroutines in newBrokerProducer
bp := producer.(*asyncProducer).getBrokerProducer(broker)
// Initiate the shutdown of all of them
producer.(*asyncProducer).unrefBrokerProducer(broker, bp)

bp.shutdown()
_ = producer.Close()
mockBroker.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error)
// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
// If an error is returned because the request could not be sent then the callback
// will not be invoked either.
//
// Make sure not to Close the broker in the callback as it will lead to a deadlock.
func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
needAcks := request.RequiredAcks != NoResponse
// Use a nil promise when no acks is required
Expand Down

0 comments on commit 59dd565

Please sign in to comment.