Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when closing Broker in brokerProducer #2133

Merged
merged 5 commits into from
Feb 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
pending = make(chan *brokerProducerResponse)
responses = make(chan *brokerProducerResponse)
)

Expand All @@ -684,25 +685,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)

// minimal bridge to make the network response `select`able
go withRecover(func() {
// Use a wait group to know if we still have in flight requests
var wg sync.WaitGroup

for set := range bridge {
request := set.buildRequest()

// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
responses <- &brokerProducerResponse{
// Forward the response to make sure we do not block the responseReceiver
pending <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
wg.Done()
}
}(set)

Expand All @@ -721,7 +728,42 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
sendResponse(nil, nil)
}
}
close(responses)
// Wait for all in flight requests to close the pending channel safely
wg.Wait()
close(pending)
})

// In order to avoid a deadlock when closing the broker on network or malformed response error
// we use an intermediate channel to buffer and send pending responses in order
// This is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires such goroutine to be finished
go withRecover(func() {
buf := queue.New()
for {
if buf.Length() == 0 {
res, ok := <-pending
if !ok {
// We are done forwarding the last pending response
close(responses)
return
}
buf.Add(res)
}
// Send the head pending response or buffer another one
// so that we never block the callback
headRes := buf.Peek().(*brokerProducerResponse)
select {
case res, ok := <-pending:
if !ok {
continue
}
buf.Add(res)
continue
case responses <- headRes:
buf.Remove()
continue
}
}
})

if p.conf.Producer.Retry.Max <= 0 {
Expand All @@ -747,7 +789,6 @@ type brokerProducer struct {
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
stopchan chan struct{}

buffer *produceSet
timer <-chan time.Time
Expand Down Expand Up @@ -830,10 +871,6 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-bp.stopchan:
Logger.Printf(
"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
return
}

if bp.timerFired || bp.buffer.readyToFlush() {
Expand All @@ -854,10 +891,11 @@ func (bp *brokerProducer) shutdown() {
}
}
close(bp.output)
// Drain responses from the bridge goroutine
for response := range bp.responses {
bp.handleResponse(response)
}
close(bp.stopchan)
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

Expand Down
57 changes: 55 additions & 2 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func closeProducer(t *testing.T, p AsyncProducer) {
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
t.Helper()
expect := successes + errors
for expect > 0 {
select {
Expand Down Expand Up @@ -643,6 +644,56 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
t.Parallel()
//Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

// Simulate a slow broker by taking ~200ms to handle requests
// therefore triggering the read timeout and the retry logic
leader.setHandler(func(req *request) (res encoderWithHeader) {
time.Sleep(200 * time.Millisecond)
// Will likely not be read by the producer (read timeout)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

config := NewTestConfig()
// Use very short read to simulate read error on unresponsive broker
config.Net.ReadTimeout = 50 * time.Millisecond
// Flush every record to generate up to 5 in-flight Produce requests
// because config.Net.MaxOpenRequests defaults to 5
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
// Reduce retries to speed up the test while keeping the default backoff
config.Producer.Retry.Max = 1
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

expectResults(t, producer, 0, 10)

seedBroker.Close()
leader.Close()
closeProducer(t, producer)
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down Expand Up @@ -1249,9 +1300,11 @@ func TestBrokerProducerShutdown(t *testing.T) {
addr: mockBroker.Addr(),
id: mockBroker.BrokerID(),
}
bp := producer.(*asyncProducer).newBrokerProducer(broker)
// Starts various goroutines in newBrokerProducer
bp := producer.(*asyncProducer).getBrokerProducer(broker)
// Initiate the shutdown of all of them
producer.(*asyncProducer).unrefBrokerProducer(broker, bp)

bp.shutdown()
_ = producer.Close()
mockBroker.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error)
// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
// If an error is returned because the request could not be sent then the callback
// will not be invoked either.
//
// Make sure not to Close the broker in the callback as it will lead to a deadlock.
func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
needAcks := request.RequiredAcks != NoResponse
// Use a nil promise when no acks is required
Expand Down