-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat][broker] Add config to count filtered entries towards rate limits #17686
[feat][broker] Add config to count filtered entries towards rate limits #17686
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I am going to test this patch in some automated integration tests
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
Outdated
Show resolved
Hide resolved
@@ -220,6 +229,11 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray | |||
|
|||
} | |||
|
|||
if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one side effect of putting this here is that this affects the "analize-backlog" command.
also, we are blocking the dispatch thread.
the users will probably have to tune numWorkerThreadsForNonPersistentTopic
, because the subscription dispatch threads will have some idle time due to this throttling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one side effect of putting this here is that this affects the "analize-backlog" command.
Good point, I think that might be good though in order to prevent that command from consuming too many resources at a time in the broker.
also, we are blocking the dispatch thread.
The call to tryDispatchPermit
is non-blocking, other than needing to acquire a lock on the subscription's rate limiter. Here is the relevant code path:
Lines 100 to 108 in ad62138
public boolean tryDispatchPermit(long msgPermits, long bytePermits) { | |
boolean acquiredMsgPermit = msgPermits <= 0 || dispatchRateLimiterOnMessage == null | |
// acquiring permits must be < configured msg-rate; | |
|| dispatchRateLimiterOnMessage.tryAcquire(msgPermits); | |
boolean acquiredBytePermit = bytePermits <= 0 || dispatchRateLimiterOnByte == null | |
// acquiring permits must be < configured msg-rate; | |
|| dispatchRateLimiterOnByte.tryAcquire(bytePermits); | |
return acquiredMsgPermit && acquiredBytePermit; | |
} |
pulsar/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
Lines 175 to 202 in 84b6559
public synchronized boolean tryAcquire(long acquirePermit) { | |
checkArgument(!isClosed(), "Rate limiter is already shutdown"); | |
// lazy init and start task only once application start using it | |
if (renewTask == null) { | |
renewTask = createTask(); | |
} | |
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; | |
if (isDispatchOrPrecisePublishRateLimiter) { | |
// for dispatch rate limiter just add acquirePermit | |
acquiredPermits += acquirePermit; | |
// we want to back-pressure from the current state of the rateLimiter therefore we should check if there | |
// are any available premits again | |
canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; | |
} else { | |
// acquired-permits can't be larger than the rate | |
if (acquirePermit + acquiredPermits > this.permits) { | |
return false; | |
} | |
if (canAcquire) { | |
acquiredPermits += acquirePermit; | |
} | |
} | |
return canAcquire; | |
} |
It's interesting that the current solution acquires permits after sending messages. In looking at the RateLimiter
code above, we completely ignore whether or not the specific messages actually get the permit to dispatch messages when isDispatchOrPrecisePublishRateLimiter
is false. That is probably the intention of the setting though, and since this PR does not change those semantics though, I think it is fine to leave it as is for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed that isDispatchOrPrecisePublishRateLimiter
is always true in this class, so there are no issues with getting the permits after delivery. We'll allow for some over-delivery, and then we will throttle consumers.
@michaeljmarshall Please resolve the merge conflict. |
Motivation
Currently, when using entry filters, filtered out messages do not count against the rate limit. Therefore, a subscription that is completely filtered will never be throttled due to rate limiting. When the messages are delivered to the consumer for a filtered subscription, those messages will count against the rate limit, and in that case, the message filtering can be throttled because the check to delay
readMoreEntries()
happens before message filtering. Therefore, the rate limit will essentially be increased as a function of the percent of messages let through the filter (some quick math is that the new rate is likelydispatchRate * (1 / percentDelivered)
, where percent delivered is a percent as a decimal).It's possible that some use cases prefer this behavior, but in my case, I think it'd be valuable to include these filtered messages in the dispatch throttling because these messages still cost the broker network, memory, and cpu. This PR adds a configuration to count filtered out messages towards dispatch rate limits for the broker, the topic, and the subscription.
Modifications
dispatchThrottlingForFilteredEntriesEnabled
. Default it to false so we maintain the original behavior. When true, count filtered messages against rate limits.acquirePermitsForDeliveredMessages
so that it is in theAbstractBaseDispatcher
, which makes it available to the entry filtering logic.Verifying this change
A new test is added as part of this PR.
Does this pull request potentially affect one of the following parts:
This PR introduces a new config while maintaining the current behavior.
Documentation
doc-not-needed
Config docs are auto-generated.