-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support request pipelining in AsyncProducer #2094
Support request pipelining in AsyncProducer #2094
Conversation
- introduce Broker.AsyncProduce with a callback to have a mostly non blocking way to produce to a single broker - refactor Broker.send and Broker.responseReceiver to support callback - factorize throttleTime metrics into Broker.updateThrottleMetric - add Config.Produce.Pipeline parameter (defaults to false for backward compatibility) - honor MaxOpenRequests when using AsyncProducer and conf.Produce.Pipeline is enabled - add unit and functional tests
@slaunay thank you for such a fantastic write-up and explanation I had forgotten this PR when I went ahead and reviewed+merged #1686 which was an old PR that was seemingly trying to solve the same problem. Please could you rebase your PR atop the latest main. Feel free to resolve the conflicts by replacing the changes of #1686 with your own. Then I will go ahead and review and merge. |
@slaunay oh, one other thing I was going to suggest is that we just go ahead and make pipelining the default behaviour. I think it's correct (as you mention) that we should match the Java client and as long as we flag it in the release notes and point out that MaxOpenRequests 1 can be used to restore the old behaviour we should be fine. |
I saw that PR but it looked like it was stalled possibly because it tends to leak the I used commit fba46c9 (includes #1686) to build
That makes sense to me too as long as users read the Changelog before upgrading to a new release 😉. That being said using
I think it is a separate issue that has always been there because the synchronization (using a buffered channel as a semaphore) happens after writing the request to the socket.
Just to be clear, after resolving conflicts do you want me to?
|
@slaunay yep exactly right. Thanks again for the great contribution |
I updated Here are up to date performance results using
The results are consistent with the initial changes. I made sure most of the changes have close to full coverage but I have been running into flaky test from On top of using more than one in flight request now (in case one fails but others succeed), there is no guarantee that the Using a single synchronous call to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent, thanks again!
Note sure I saw this mentioned anywhere, but seeing 16 requests in flight above: note that with idempotency, max requests in flight should not be more than 5. It's not well documented, but Kafka internally uses only a 5-wide idempotent deduplication window per partition. |
The In that particular scenario we write to 128 partitions lead by 5 brokers using the default On the other scenarios, to show the impact of the changes we are relying on a single partition (
FWIU to use the producer idempotency feature you either need in Java land to set at least:
It actually looks like Sarama currently forces you to use 1.:
It is not well documented indeed and quickly looking at the Kafka source code, it seems to me that the restriction is done on the client vs the broker:
I did test more than 5 in flights requests (against one broker) using idempotency and a custom version of But I'm more concerned about setting I believe the The way forward is probably to:
|
…2182) * producer: ensure that the management message (fin) is never "leaked" Since async producer now support multiple inflight messages thanks to #1686 and #2094, it now may "leak" the "fin" internal management message to Kafka (and to the client) when broker producer is reconnecting to Kafka broker and retries multiple inflight messages at the same time. * test:async-producer: test broker restart (this fixes #2150) * tests:async-producer: disable logger in TestAsyncProducerBrokerRestart * tests:async-producer: protect leader with a mutex to make race detector happy * test:async-producer: set 5mn default finish timeout * async-producer: do not clear bp.currentRetries when fin message is received just after syn * async-producer: use debug logger when fin message is handled for a healthy brokerProducer * test:async-producer:restart: make emptyValues atomic to avoid races * test:async-producer:restart: rename produceRequestTest to countRecordsWithEmptyValue * test:async-producer:restart: reduce retry backoff timeout to speed up the test * test:async-producer:restart: remove bogus 0
Enhancement
Although there is a
Config.Net.MaxOpenRequests
parameter, theAsyncProducer
uses at most one in-flight produce request when producing to a single Kafka broker.As shown in #1539 a while back, using request pipelining increases throughput when writing to a Kafka cluster over a high latency network link and something that is recommended when implementing the Kafka protocol as described in
their network section:
MaxOpenRequests
defaults to 5 but if you were to execute the following code:then you would only see a single in-flight
Produce
request.That is, to send the next request you must first wait for the
responseReceiver
Go routine to read the current response bytes and return it to calling Go routine.Obviously, if it takes a while to read such response because of network latency or slow replication, the calling Go routine will seat idle instead of sending more records therefore reducing the throughput (more on that in the
Testing done
section).
Unfortunately this is how the
AsyncProducer
currently works, at least how thebrokerProducer
"bridge" Go routine processes batches (i.e.*produceSet
):https://github.com/Shopify/sarama/blob/635bcf350a7b8a92b4da0aacd288ee09311e673d/async_producer.go#L693-L707
By adding a new
Broker.AyncProduce
receiver and using it in theAsyncProducer
, we can achieve proper pipelining ofProduce
requests.The new receiver relies on a callback to eventually receive the response:
The same could technically be done with
Broker.Produce
and N go routines:but it gets tricky to get the back-pressure behavior from the internal
Brokers.responses
channel and avoid creating too many Go routines.With the callback the back-pressure is propagated up and we can still use a channel to handle the responses asynchronously if necessary:
Returning or else passing a channel to
Broker.AsyncProduce
is another option but I believe it leads to more complex and less flexible code.As this is a non trivial behavior change in the
AsyncProducer
, I introduced a new configuration property to make that feature opt-in.I do believe that
Produce
request pipelining should be the default (like in the Java client) but an application that cares about ordering might see records persisted out of order depending on how it is configured which can be considered a breaking change.Changes
Broker.AsyncProduce
with a callback to have a mostly non blocking way to produce to a single brokerBroker.send
andBroker.responseReceiver
to support callbackBroker.updateThrottleMetric
Config.Produce.Pipeline
parameter (defaults tofalse
for backward compatibility)MaxOpenRequests
when usingAsyncProducer
andconf.Produce.Pipeline
is enabled-enable-pipeline
flag tokafka-producer-performance
for benchmarkingTesting done
The new unit and functional tests should provide good coverage for the new changes as they touch a critical part of the
Broker
and theAsyncProducer
logic.But for a real end to end and performance test I am using the
kafka-producer-performance
tool with a new-enable-pipeline
to show the impact of request pipelining when it is the most useful.For that, I am running the tool from an EC2 instance in AWS
us-west-2
and producing to:~1
msus-east-1
with a RTT of~70
mseu-central-1
with a RTT of~140
msHere are some details about the EC2 instance and the command line used as the baseline:
Here are the performance results for each remote target cluster with the respective extra flags:
us-west-2
-partitioner manual
-partition 0
1
55,417.7
35.87
1
us-west-2
-partitioner manual
-partition 0
-enable-pipeline
1
63,645.8
41.70
6
us-east-1
-partitioner manual
-partition 0
70
10,758.9
9.74
1
us-east-1
-partitioner manual
-partition 0
-enable-pipeline
70
64,800.0
36.12
6
eu-central-1
-partitioner manual
-partition 0
140
5,494.0
5.14
1
eu-central-1
-partitioner manual
-partition 0
-enable-pipeline
-max-open-requests 1
140
11,371.9
10.10
2
eu-central-1
-partitioner manual
-partition 0
-enable-pipeline
140
32,999.4
25.47
6
eu-central-1
roundrobin
partitioneron 5 Brokers/128 partitions
140
119,782.4
59.19
5
eu-central-1
roundrobin
partitioneron 5 Brokers/128 partitions
plus
-enable-pipeline
140
133,124.7
65.88
16
Here is the detailed output of
kafka-producer-performance
when producing toeu-central-1
over a single topic partition (-partitioner manual -partition 0
).Request pipelining disabled (current behavior) details
Request pipelining enabled (-enable-pipeline -max-open-requests 1) details
Request pipelining enabled (-enable-pipeline [-max-open-requests 5]) details
There is an oddity with using
-max-open-requests
as it is reported as2
when set to1
and6
when set to5
(default).I thought it was because the
requests-in-flight
metric is increased before writing to the socket but the effective rate limiting is done later when writing to theBrokers.responses
channel:https://github.com/Shopify/sarama/blob/635bcf350a7b8a92b4da0aacd288ee09311e673d/broker.go#L830-L848
I think there might actually be room for one more in-flight request even if the channel is buffered with a
N-1
capacity. That would explain the 2x throughput improvement when using-max-open-requests 1
as we would be able topipeline up to 2
Produce
requests.Anyway, based on those numbers the throughput performance improvement ranges from
11%
to395%
.When writing to multiple topic partitions like when using the
roundrobin
partitioner (i.e. writing to more than one broker) the throughput increases because multiplebrokerProducer
s are active.If you have Kafka producers distributed around the world and a few centralized Kafka clusters like we do, writing to multiple brokers and using request pipeline can greatly improve the throughput.
The same applies if you want to mirror records between two Kafka clusters located in remote data centers.
Having a constant stream of bytes written to a TCP connection is key to increase the TCP window and therefore the throughput especially on high latency network links.
We have been using this specific enhancement in our log shipper successfully for a few years in production.
With other improvements, we are able to use Sarama to push up to 1M records/s per instance (131 Bytes records in average) and produce overall ~6M records/s during peak hours across our 37 data centers.
So thanks a lot for creating and maintaining this Go library and merry Christmas 🎄!