Skip to content

Commit

Permalink
[fix][broker][branch-3.1] Avoid PublishRateLimiter use an already clo…
Browse files Browse the repository at this point in the history
…sed RateLimiter (#22011)
  • Loading branch information
coderzc committed Feb 19, 2024
1 parent a88f592 commit d02ba2c
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void update(long publishRateInMsgs, long publishRateInBytes) {

public boolean tryAcquire(int numbers, long bytes) {
return (publishRateLimiterOnMessage == null || publishRateLimiterOnMessage.tryAcquire(numbers))
&& (publishRateLimiterOnByte == null || publishRateLimiterOnByte.tryAcquire(bytes));
&& (publishRateLimiterOnByte == null || publishRateLimiterOnByte.tryAcquire(bytes));
}

public void registerRateLimitFunction(String name, RateLimitFunction func) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.util.RateLimiter;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ResourceGroupRateLimiterTest extends BrokerTestBase {

final String rgName = "testRG";
Expand Down Expand Up @@ -147,6 +154,76 @@ public void testResourceGroupPublishRateLimit() throws Exception {
testRateLimit();
}

@Test
public void testWithConcurrentUpdate() throws Exception {
cleanup();
setup();
createResourceGroup(rgName, testAddRg);
admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);

Awaitility.await().untilAsserted(() ->
assertNotNull(pulsar.getResourceGroupServiceManager()
.getNamespaceResourceGroup(NamespaceName.get(namespaceName))));

Awaitility.await().untilAsserted(() ->
assertNotNull(pulsar.getResourceGroupServiceManager()
.resourceGroupGet(rgName).getResourceGroupPublishLimiter()));

ResourceGroupPublishLimiter resourceGroupPublishLimiter = Mockito.spy(pulsar.getResourceGroupServiceManager()
.resourceGroupGet(rgName).getResourceGroupPublishLimiter());

AtomicBoolean blocking = new AtomicBoolean(false);
BiFunction<Function<Long, Boolean>, Long, Boolean> blockFunc = (function, acquirePermit) -> {
blocking.set(true);
while (blocking.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return function.apply(acquirePermit);
};

Mockito.doAnswer(invocation -> {
RateLimiter publishRateLimiterOnMessage =
(RateLimiter) FieldUtils.readDeclaredField(resourceGroupPublishLimiter,
"publishRateLimiterOnMessage", true);
RateLimiter publishRateLimiterOnByte =
(RateLimiter) FieldUtils.readDeclaredField(resourceGroupPublishLimiter,
"publishRateLimiterOnByte", true);
int numbers = invocation.getArgument(0);
long bytes = invocation.getArgument(1);
return (publishRateLimiterOnMessage == null || publishRateLimiterOnMessage.tryAcquire(numbers))
&& (publishRateLimiterOnByte == null || blockFunc.apply(publishRateLimiterOnByte::tryAcquire, bytes));
}).when(resourceGroupPublishLimiter).tryAcquire(Mockito.anyInt(), Mockito.anyLong());

ConcurrentHashMap resourceGroupsMap =
(ConcurrentHashMap) FieldUtils.readDeclaredField(pulsar.getResourceGroupServiceManager(),
"resourceGroupsMap", true);
FieldUtils.writeDeclaredField(resourceGroupsMap.get(rgName), "resourceGroupPublishLimiter",
resourceGroupPublishLimiter, true);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(namespaceName + "/test-topic")
.create();

CompletableFuture<MessageId> sendFuture = producer.sendAsync(new byte[MESSAGE_SIZE]);

Awaitility.await().untilAsserted(() -> Assert.assertTrue(blocking.get()));

testAddRg.setPublishRateInBytes(Long.valueOf(MESSAGE_SIZE) + 1);
admin.resourcegroups().updateResourceGroup(rgName, testAddRg);
blocking.set(false);

sendFuture.join();

// Now detach the namespace
admin.namespaces().removeNamespaceResourceGroup(namespaceName);
deleteResourceGroup(rgName);
}


private void prepareData() {
testAddRg.setPublishRateInBytes(Long.valueOf(MESSAGE_SIZE));
testAddRg.setPublishRateInMsgs(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimiter;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Test(groups = "broker")
@Slf4j
public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{

@Override
Expand Down Expand Up @@ -180,4 +189,64 @@ public void testBrokerLevelPublishRateDynamicUpdate() throws Exception{
producer.close();
super.internalCleanup();
}

@Test
public void testWithConcurrentUpdate() throws Exception {
PublishRate publishRate = new PublishRate(-1,10);
conf.setPreciseTopicPublishRateLimiterEnable(true);
conf.setMaxPendingPublishRequestsPerConnection(1000);
super.baseSetup();
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
final String topic = "persistent://prop/ns-abc/testWithConcurrentUpdate";
@Cleanup
org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();

AbstractTopic topicRef = (AbstractTopic) pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);

PublishRateLimiter topicPublishRateLimiter = Mockito.spy(topicRef.getTopicPublishRateLimiter());

AtomicBoolean blocking = new AtomicBoolean(false);
BiFunction<Function<Long, Boolean>, Long, Boolean> blockFunc = (function, acquirePermit) -> {
blocking.set(true);
while (blocking.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return function.apply(acquirePermit);
};

Mockito.doAnswer(invocation -> {
log.info("tryAcquire: {}, {}", invocation.getArgument(0), invocation.getArgument(1));
RateLimiter publishRateLimiterOnMessage =
(RateLimiter) FieldUtils.readDeclaredField(topicPublishRateLimiter,
"topicPublishRateLimiterOnMessage", true);
RateLimiter publishRateLimiterOnByte =
(RateLimiter) FieldUtils.readDeclaredField(topicPublishRateLimiter,
"topicPublishRateLimiterOnByte", true);
int numbers = invocation.getArgument(0);
long bytes = invocation.getArgument(1);
return (publishRateLimiterOnMessage == null || publishRateLimiterOnMessage.tryAcquire(numbers))
&& (publishRateLimiterOnByte == null || blockFunc.apply(publishRateLimiterOnByte::tryAcquire, bytes));
}).when(topicPublishRateLimiter).tryAcquire(Mockito.anyInt(), Mockito.anyLong());

FieldUtils.writeField(topicRef, "topicPublishRateLimiter", topicPublishRateLimiter, true);

CompletableFuture<MessageId> sendFuture = producer.sendAsync(new byte[10]);

Awaitility.await().untilAsserted(() -> Assert.assertTrue(blocking.get()));
publishRate.publishThrottlingRateInByte = 20;
admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
blocking.set(false);

sendFuture.join();

super.internalCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
Expand All @@ -50,6 +51,7 @@
* <li><b>Faster: </b>RateLimiter is light-weight and faster than Guava-RateLimiter</li>
* </ul>
*/
@Slf4j
public class RateLimiter implements AutoCloseable{
private final ScheduledExecutorService executorService;
private long rateTime;
Expand Down Expand Up @@ -175,7 +177,10 @@ public synchronized boolean tryAcquire() {
* @return {@code true} if the permits were acquired, {@code false} otherwise
*/
public synchronized boolean tryAcquire(long acquirePermit) {
checkArgument(!isClosed(), "Rate limiter is already shutdown");
if (isClosed()) {
log.info("The current rate limiter is already shutdown, acquire permits directly.");
return true;
}
// lazy init and start task only once application start using it
if (renewTask == null) {
renewTask = createTask();
Expand Down

0 comments on commit d02ba2c

Please sign in to comment.