Skip to content

Commit

Permalink
Fix deadlock when closing Broker in brokerProducer (#2133)
Browse files Browse the repository at this point in the history
* Fix deadlock when closing Broker in brokerProducer

- add unit test to reproduce the deadlock by simulating a network error
- document possible deadlock when closing the Broker from an AsyncProduce
  callback when handling a response error
- add closeBroker goroutine and channel to asynchronously close a Broker
  once
- reuse the stopchan channel to signal that the closeBroker goroutine is
  done
- update TestBrokerProducerShutdown to check goroutine leak by closing
  the input vs the stopchan channel
- fixes #2129

* Address possible data race with the responses chan

WARNING: DATA RACE
Write at 0x00c0003421f0 by goroutine 71:
  runtime.closechan()
      runtime/chan.go:355 +0x0
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
      github.com/Shopify/sarama/async_producer.go:725 +0x1c4
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x74
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer·dwrap·15()
      github.com/Shopify/sarama/async_producer.go:695 +0x39

Previous read at 0x00c0003421f0 by goroutine 58:
  runtime.chansend()
      runtime/chan.go:158 +0x0
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.1.1()
      github.com/Shopify/sarama/async_producer.go:702 +0x125
  github.com/Shopify/sarama.(*Broker).AsyncProduce.func1()
      github.com/Shopify/sarama/broker.go:408 +0x1a9
  github.com/Shopify/sarama.(*responsePromise).handle()
      github.com/Shopify/sarama/broker.go:132 +0x1b8
  github.com/Shopify/sarama.(*Broker).responseReceiver()
      github.com/Shopify/sarama/broker.go:1040 +0x124
  github.com/Shopify/sarama.(*Broker).responseReceiver-fm()
      github.com/Shopify/sarama/broker.go:1032 +0x39
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x74
  github.com/Shopify/sarama.(*Broker).Open.func1·dwrap·22()
      github.com/Shopify/sarama/broker.go:244 +0x39

* Update unit test to use up to 5 in-flight requests

* Keep closing the broker synchronously but buffer pending responses

Closing the broker asynchronously fixes the deadlock but leads to a
race condition between opening the broker in client updateLeader.
This might result in a closed broker used by the new brokerProducer
and all produce requests will fail with ErrNotConnected.

* t.Parallel() TestAsyncProducer...ConcurrentRequest
  • Loading branch information
slaunay authored Feb 13, 2022
1 parent 6693712 commit 06513c1
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 11 deletions.
56 changes: 47 additions & 9 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
pending = make(chan *brokerProducerResponse)
responses = make(chan *brokerProducerResponse)
)

Expand All @@ -684,25 +685,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)

// minimal bridge to make the network response `select`able
go withRecover(func() {
// Use a wait group to know if we still have in flight requests
var wg sync.WaitGroup

for set := range bridge {
request := set.buildRequest()

// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
responses <- &brokerProducerResponse{
// Forward the response to make sure we do not block the responseReceiver
pending <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
wg.Done()
}
}(set)

Expand All @@ -721,7 +728,42 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
sendResponse(nil, nil)
}
}
close(responses)
// Wait for all in flight requests to close the pending channel safely
wg.Wait()
close(pending)
})

// In order to avoid a deadlock when closing the broker on network or malformed response error
// we use an intermediate channel to buffer and send pending responses in order
// This is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires such goroutine to be finished
go withRecover(func() {
buf := queue.New()
for {
if buf.Length() == 0 {
res, ok := <-pending
if !ok {
// We are done forwarding the last pending response
close(responses)
return
}
buf.Add(res)
}
// Send the head pending response or buffer another one
// so that we never block the callback
headRes := buf.Peek().(*brokerProducerResponse)
select {
case res, ok := <-pending:
if !ok {
continue
}
buf.Add(res)
continue
case responses <- headRes:
buf.Remove()
continue
}
}
})

if p.conf.Producer.Retry.Max <= 0 {
Expand All @@ -747,7 +789,6 @@ type brokerProducer struct {
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
stopchan chan struct{}

buffer *produceSet
timer <-chan time.Time
Expand Down Expand Up @@ -830,10 +871,6 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-bp.stopchan:
Logger.Printf(
"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
return
}

if bp.timerFired || bp.buffer.readyToFlush() {
Expand All @@ -854,10 +891,11 @@ func (bp *brokerProducer) shutdown() {
}
}
close(bp.output)
// Drain responses from the bridge goroutine
for response := range bp.responses {
bp.handleResponse(response)
}
close(bp.stopchan)
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

Expand Down
57 changes: 55 additions & 2 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func closeProducer(t *testing.T, p AsyncProducer) {
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
t.Helper()
expect := successes + errors
for expect > 0 {
select {
Expand Down Expand Up @@ -656,6 +657,56 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
t.Parallel()
//Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

// Simulate a slow broker by taking ~200ms to handle requests
// therefore triggering the read timeout and the retry logic
leader.setHandler(func(req *request) (res encoderWithHeader) {
time.Sleep(200 * time.Millisecond)
// Will likely not be read by the producer (read timeout)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

config := NewTestConfig()
// Use very short read to simulate read error on unresponsive broker
config.Net.ReadTimeout = 50 * time.Millisecond
// Flush every record to generate up to 5 in-flight Produce requests
// because config.Net.MaxOpenRequests defaults to 5
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
// Reduce retries to speed up the test while keeping the default backoff
config.Producer.Retry.Max = 1
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

expectResults(t, producer, 0, 10)

seedBroker.Close()
leader.Close()
closeProducer(t, producer)
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Parallel()
t.Skip("Enable once bug #294 is fixed.")
Expand Down Expand Up @@ -1272,9 +1323,11 @@ func TestBrokerProducerShutdown(t *testing.T) {
addr: mockBroker.Addr(),
id: mockBroker.BrokerID(),
}
bp := producer.(*asyncProducer).newBrokerProducer(broker)
// Starts various goroutines in newBrokerProducer
bp := producer.(*asyncProducer).getBrokerProducer(broker)
// Initiate the shutdown of all of them
producer.(*asyncProducer).unrefBrokerProducer(broker, bp)

bp.shutdown()
_ = producer.Close()
mockBroker.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error)
// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
// If an error is returned because the request could not be sent then the callback
// will not be invoked either.
//
// Make sure not to Close the broker in the callback as it will lead to a deadlock.
func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
needAcks := request.RequiredAcks != NoResponse
// Use a nil promise when no acks is required
Expand Down

0 comments on commit 06513c1

Please sign in to comment.