-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix deadlock when closing Broker in brokerProducer #2133
Fix deadlock when closing Broker in brokerProducer #2133
Conversation
- add unit test to reproduce the deadlock by simulating a network error - document possible deadlock when closing the Broker from an AsyncProduce callback when handling a response error - add closeBroker goroutine and channel to asynchronously close a Broker once - reuse the stopchan channel to signal that the closeBroker goroutine is done - update TestBrokerProducerShutdown to check goroutine leak by closing the input vs the stopchan channel - fixes IBM#2129
WARNING: DATA RACE Write at 0x00c0003421f0 by goroutine 71: runtime.closechan() runtime/chan.go:355 +0x0 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1() github.com/Shopify/sarama/async_producer.go:725 +0x1c4 github.com/Shopify/sarama.withRecover() github.com/Shopify/sarama/utils.go:43 +0x74 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer·dwrap·15() github.com/Shopify/sarama/async_producer.go:695 +0x39 Previous read at 0x00c0003421f0 by goroutine 58: runtime.chansend() runtime/chan.go:158 +0x0 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.1.1() github.com/Shopify/sarama/async_producer.go:702 +0x125 github.com/Shopify/sarama.(*Broker).AsyncProduce.func1() github.com/Shopify/sarama/broker.go:408 +0x1a9 github.com/Shopify/sarama.(*responsePromise).handle() github.com/Shopify/sarama/broker.go:132 +0x1b8 github.com/Shopify/sarama.(*Broker).responseReceiver() github.com/Shopify/sarama/broker.go:1040 +0x124 github.com/Shopify/sarama.(*Broker).responseReceiver-fm() github.com/Shopify/sarama/broker.go:1032 +0x39 github.com/Shopify/sarama.withRecover() github.com/Shopify/sarama/utils.go:43 +0x74 github.com/Shopify/sarama.(*Broker).Open.func1·dwrap·22() github.com/Shopify/sarama/broker.go:244 +0x39
The race detector found a data race that is not directly linked to the deadlock but can lead to a panic when sending to a closed channel. Here is the details of the data race:
And the new commit/version to test again: require (
github.com/Shopify/sarama v1.31.1
)
replace (
github.com/Shopify/sarama => github.com/slaunay/sarama v1.31.2-0.20220209005248-6c70a6c230a4
) Two tests are failing randomly but they might be flaky:
https://github.com/Shopify/sarama/runs/5118582681?check_suite_focus=true#step:8:30402
|
@slaunay, Thank you for the quick fix:) @@ -675,7 +675,7 @@
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
- responses = make(chan *brokerProducerResponse)
+ responses = make(chan *brokerProducerResponse, 1)
)
bp := &brokerProducer We will keep monitoring our instances and hope that the deadlock issue is resolved completely. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, will let @dnwe take a look too.
As always — excellent writeup! Thanks for contributing ❤️
I highly recommend adding a retract for 1.31.0 and 1.31.1 in your go.mod file. Because it already caused 2 incidents in our production environment, so can happen to other users as well. |
Thanks for testing the fix that fast, not sure if production is the best environment to test such changes 😬 but it will be useful to have real functional testing. Your proposed fix does workaround the deadlock indeed when Here is the updated test case from commit 8f92872 (using
A better fix would then be: @@ -675,7 +675,7 @@
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
- responses = make(chan *brokerProducerResponse)
+ responses = make(chan *brokerProducerResponse, p.conf.Net.MaxOpenRequests)
)
bp := &brokerProducer but using this approach we are leaking some internal from the
I have never used |
@slaunay it does look like Against main I can re-run It may just be that the old test was relying on some timing constraints that are no longer valid and it should have always been able to cope with these errors on the chan, but we should dig into it just to make sure |
@slaunay
Below is the sarama log |
@hxiaodon aside, unrelated to this issue, but from your log you might want to tune your consumer config (e.g., MaxProcessingTime) as it looks like your consumers are frequently taking too long to read from the |
@hxiaodon also, is that I'm just trying to narrow things down for a recreate attempt, but I noticed from the output that it seems to suggest you're connecting to three different 3-broker clusters here? client/brokers registered new broker #1 at 10.7.12.101:9092 client/brokers registered new broker #1 at 10.7.22.79:9092 client/brokers registered new broker #1 at 172.17.133.191:9092 |
Yeah~, thanks for reminding, it's another story which I will solve
It's a single application, my program try to access different kafka clusters, and I think the following cluster lead to the hang problem My understanding is that the sarama producer side has no heartbeat(periodically metadata refresh) , and the broker connection will be closed after default 10 mins with no TCP data |
Thanks, we can simulate that in the functional tests using toxiproxy |
This should be released as v1.32.2 to avoid users to upgrade to these faulty versions until IBM#2133 is merged.
Closing the broker asynchronously fixes the deadlock but leads to a race condition between opening the broker in client updateLeader. This might result in a closed broker used by the new brokerProducer and all produce requests will fail with ErrNotConnected.
@dnwe I was able to reproduce the failure (when running a high number of Here is the output of the
So I believe that closing the The result is similar to buffering the
@hxiaodon It is hard to tell but I don't see a
That's right, a broker can drop the TCP connection if it has been idle for a while ( The Would you be able to test the last commit, I believe the following dependency can be used: require (
github.com/Shopify/sarama v1.31.1
)
replace (
github.com/Shopify/sarama => github.com/slaunay/sarama v1.31.2-0.20220211051606-f1bc44e541ee
) |
@slaunay Sure, I will have a try~
And I' m also curious about the backtrace at that time. Anyway, I will monitor my application with your latest fix again |
@slaunay . After running about half a day, my application with your latest fix hang again, although I'm confused by the following goroutine, because net.Conn readtimeout is only 30s (Does it mean that the broker's readFull func is repeatedly invoked every 30s, responseReceiver's other code branch will never be executed?)
FYI, I removed my application code's backtrace from the goroutine stack, and keep other goroutines there, if you want to full goroutine include my application code , I will attach it. Below is another backtrace of the above application, we restarted another app with the workaround fix(it keep working and does not hang) in same consumer group, and the rebalance is triggered, another app with the workaround fix now consume all topic partitions |
@hxiaodon Those stack traces do not show the original issue caused by a deadlock.
suggests that a success is trying to be forwarded to an invocation of
This looks like another legitimate blocking read from the socket. So reading is not something that will happens every 30 seconds per se, except in the odd case where you end up sending one Looking at the Sarama applicative logs, it seems like the retry logic works as expected and new
It's not clear to me if what you are saying is that the issue does not occur anymore with the latest fix applied (f1bc44e). But here is what I can see from the capture (similar to the stack trace above): One
One
This synchronization would explain why the It's really difficult to know for sure if what you are experiencing is linked to the deadlock we are trying to fix as the @eafzali Would you be able to test the fix in your application to know if your producer recovers properly when you run the scenario described in #2129 now? The same
|
Yeah, I think we did not reach an agreement on it 😅 , and your detailed explanation for my confusion sounds reasonable. I will have a try with 1.30.1 at my prod env and with v1.31.2-0.20220211051606-f1bc44e541e fix at my local env. |
@slaunay I'd suggest we go ahead and merge this PR now and we can follow-up any further problems on the individual issues? Thanks again for all your hard work tracking down the problems |
Thank you @slaunay, I can confirm that it has fixed the issue I reported. |
@slaunay
Full kafka config is here kafka starting command is:
create a topic with 48 partitions
Now all infra things are ready, and it's time to prepare the producer code, it's also very simple, core logic is no more than 20 lines(feel free to ignore the Chinese comments since I copy and paste some of them roughly from the online application code )
I started the program at 2022/02/14 19:16:29 ,and the producer hang at 2022-02-14 20:54:23 because I could not get any message with following command:
This is the full program log which include the sarama output At the hang point, I found that the kafka shell get a {null null} value
Full logs are here And this time the full backtrace is very short
Hope for your reply, thank! |
Thanks a lot @eafzali for testing the fix.
Thanks for providing those details. Here are some things I found:
Now what is really interesting is that the The
Now such "empty"
As It would be great to confirm this is the case and ideally have a simple unit test for that scenario. @hxiaodon Would you mind creating another issue? |
@slaunay Sure, I will create another issue to track this problem |
Bugfix
A regression was introduced in #2094 (Sarama
1.31.0
) and can result in a deadlock inside theAsyncProducer
(and indirectly theSyncProducer
) when handlingProduce
response error (typically network errors, see #2129).The root cause is an
AsyncProduce
callback used to pipelineProduce
requests must not close theBroker
directly.Indeed, closing the
Broker
blocks till theresponseReceiver
goroutine is done yet theresponseReceiver
goroutine might be blocked invoking a callback.In this specific case, we are not directly closing the
Broker
in the callback but from:brokerProducer
run
goroutine when callinghandleError
after receiving an error producingbrokerProducer
bridge
goroutine can be blocked if trying to send another response to therun
goroutinebroker
responseReceiver
goroutine is blocked invoking the callback defined in thebridge
goroutineI believe this happens when more than one
Produce
request is in flight to a given broker and we are getting a missing or malformed response which is why it was not detected by the current test suite.We have been using
Broker.AsyncProduce
and callbacks without issues in production ourselves but this is because we do not rely on theAsyncProducer
and also made sure not to close theBroker
in a blocking path.A simple fix would be to close the broker asynchronously when we receive a response error:
But that approach might leak such a goroutine once the
brokerProducer
is abandonned.So the proposed fix consists of:
brokerProducer
to receive a request to close theBroker
Broker
(e.g. network error)brokerProducer
Changes
Broker
from anAsyncProduce
callback when handling a response errorcloseBroker
goroutine and channel to asynchronously close aBroker
and only oncestopchan
channel to signal that the closeBroker goroutine is doneTestBrokerProducerShutdown
to check goroutine leak by closing theinput
vs thestopchan
channelTesting done
I added a new unit test to check that the proposed fix works and possibly detect a similar regression in the future.
Here is the run using
1.31.1
(result in a deadlock):And from 59dd565 (proposed fix):
I was able to get the functional tests to pass as well with 100% of the new code covered.
But because concurrency is hard, it would be great to have @eafzali and @hxiaodon apply the fix and confirm that it is working for them too.
Just remember that the
AsyncProducer
can take a while to recover with all the retry logic.I believe the following dependency can be used to test it out in a project: