-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-74] Support auto scaled consumer receiver queue #14494
Conversation
/pulsarbot run-failure-checks |
Here is a test with Non-partitioned topic consumer
3-partitioned topic consumer
|
/pulsarbot run-failure-checks |
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
2b03aef
to
db678d9
Compare
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
Outdated
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job
LGTM |
/pulsarbot run-failure-checks |
7d95afe
to
a157072
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Add autoScaledReceiverQueueSize * Add autoScaledReceiverQueueSize for PerformanceConsumer * remove memory limit code * fix typo
* Add autoScaledReceiverQueueSize * Add autoScaledReceiverQueueSize for PerformanceConsumer * remove memory limit code * fix typo
Motivation
Pick a proper receiver queue size is not an easy thing. If the value is too small, it impact the throughput, and if the value is too large, it consumes too many memory. With default set up, the queue size is 1000, and the max message size is 5MB, this means maximum of 5GB memory occupation.
This is part of the work for PIP 74. We need auto scale
currentReceiverQueue
to control client memory.Modifications
Add optional autoScaledReceiverQueueSizeEnabled for consumer client.
Previous
receiverQueueSize
is the max value of this auto scaled queue size.Every time the client will try to double the size of
currentReceiverQueue
if it limits message throughput. Currently, it's determined by the following two conditions in exact order:A) Current receiver queue (
ConsumerBase#incomingMessages
) is full after we put a message into it. (it's marked byscaleReceiverQueueHint
as true).B) Application wants process more messages but the receiver queue is empty. (
expectMoreIncomingMessages
is called in this PR).The queue size won't grow if we got new messages during A and B. So if assume current receiver queue size is 10, and the timeline would be like
scaleReceiverQueueHint
is marked as true.receive()
repeatedly and processed all 10 messages. And in the meanwhile no new message is sent to client[1].NOTE:
Condition A
here is slightly different from original design in PIP-74 described as "there are messages pending to be sent for this consumer" (let's refer it as Condition X). This is proposed with these consideration:a) We can assume that when receiver queue is full, the chance of broker have no more messages is insignificant in practice.
b) If we accept a), then
Condition A
impliesCondition X
, as new messages are pending because local queue size is full. But it's NOT vice versa.c) Once fact we should accept is that we don't need expand queue size every time it limits the throughput as some slight and occasional delay is acceptable and won't affect overall throughput. And by replacing
Condition A
withCondition X
, we can reduce the sensitivity of the expansion.Verifying this change
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
no-need-doc