Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker] Add config to count filtered entries towards rate limits #17686

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=

# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
# each relevant rate limit.
dispatchThrottlingForFilteredEntriesEnabled=false

# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,16 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private boolean dispatcherDispatchMessagesInSubscriptionThread = true;

@FieldContext(
dynamic = false,
category = CATEGORY_SERVER,
doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+ "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+ "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+ "each relevant rate limit."
)
private boolean dispatchThrottlingForFilteredEntriesEnabled = false;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
int filteredMessageCount = 0;
int filteredEntryCount = 0;
long filteredBytesCount = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Expand All @@ -130,12 +133,18 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
Expand Down Expand Up @@ -220,6 +229,11 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray

}

if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one side effect of putting this here is that this affects the "analize-backlog" command.
also, we are blocking the dispatch thread.
the users will probably have to tune numWorkerThreadsForNonPersistentTopic, because the subscription dispatch threads will have some idle time due to this throttling

@MMirelli @michaeljmarshall @lhotari

Copy link
Member Author

@michaeljmarshall michaeljmarshall Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one side effect of putting this here is that this affects the "analize-backlog" command.

Good point, I think that might be good though in order to prevent that command from consuming too many resources at a time in the broker.

also, we are blocking the dispatch thread.

The call to tryDispatchPermit is non-blocking, other than needing to acquire a lock on the subscription's rate limiter. Here is the relevant code path:

public boolean tryDispatchPermit(long msgPermits, long bytePermits) {
boolean acquiredMsgPermit = msgPermits <= 0 || dispatchRateLimiterOnMessage == null
// acquiring permits must be < configured msg-rate;
|| dispatchRateLimiterOnMessage.tryAcquire(msgPermits);
boolean acquiredBytePermit = bytePermits <= 0 || dispatchRateLimiterOnByte == null
// acquiring permits must be < configured msg-rate;
|| dispatchRateLimiterOnByte.tryAcquire(bytePermits);
return acquiredMsgPermit && acquiredBytePermit;
}

public synchronized boolean tryAcquire(long acquirePermit) {
checkArgument(!isClosed(), "Rate limiter is already shutdown");
// lazy init and start task only once application start using it
if (renewTask == null) {
renewTask = createTask();
}
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (isDispatchOrPrecisePublishRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;
// we want to back-pressure from the current state of the rateLimiter therefore we should check if there
// are any available premits again
canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit + acquiredPermits > this.permits) {
return false;
}
if (canAcquire) {
acquiredPermits += acquirePermit;
}
}
return canAcquire;
}

It's interesting that the current solution acquires permits after sending messages. In looking at the RateLimiter code above, we completely ignore whether or not the specific messages actually get the permit to dispatch messages when isDispatchOrPrecisePublishRateLimiter is false. That is probably the intention of the setting though, and since this PR does not change those semantics though, I think it is fine to leave it as is for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that isDispatchOrPrecisePublishRateLimiter is always true in this class, so there are no issues with getting the permits after delivery. We'll allow for some over-delivery, and then we will throttle consumers.

acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
filteredMessageCount, filteredBytesCount);
}

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
Expand All @@ -232,6 +246,19 @@ private void individualAcknowledgeMessageIfNeeded(Position position, Map<String,
}
}

protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
long totalMessagesSent, long totalBytesSent) {
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
|| (cursor != null && !cursor.isActive())) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(permits, totalBytesSent));
topic.getDispatchRateLimiter().ifPresent(rateLimter ->
rateLimter.tryDispatchPermit(permits, totalBytesSent));
getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
}
}

/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
Expand All @@ -700,23 +700,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return true;
}

private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}
}

private boolean sendChunkedMessagesToConsumers(ReadType readType,
List<Entry> entries,
MessageMetadata[] metadataArray) {
Expand Down Expand Up @@ -775,7 +758,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,8 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
: sendMessageInfo.getTotalMessages();
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}

if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes()));
}
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}

// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

stuckConsumers.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
Expand All @@ -60,8 +62,9 @@ public class AbstractBaseDispatcherTest {
@BeforeMethod
public void setup() throws Exception {
this.svcConfig = mock(ServiceConfiguration.class);
when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
this.subscriptionMock = mock(PersistentSubscription.class);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
}

@Test
Expand Down Expand Up @@ -89,17 +92,24 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
EntryFilter.FilterResult.REJECT);
ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter);
when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);

this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
subscriptionDispatchRateLimiter);

List<Entry> entries = new ArrayList<>();

entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
long expectedBytePermits = e.getLength();
entries.add(e);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
//
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);

ManagedCursor cursor = mock(ManagedCursor.class);

int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
assertEquals(size, 0);
verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
}

@Test
Expand Down Expand Up @@ -201,9 +211,18 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) {

private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {

private final Optional<DispatchRateLimiter> dispatchRateLimiter;

protected AbstractBaseDispatcherTestHelper(Subscription subscription,
ServiceConfiguration serviceConfig) {
ServiceConfiguration serviceConfig,
DispatchRateLimiter rateLimiter) {
super(subscription, serviceConfig);
dispatchRateLimiter = Optional.ofNullable(rateLimiter);
}

@Override
public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}

@Override
Expand Down