From d242c8e8117e0d39ef5122f9d78c8310e1de3bd2 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 4 Dec 2019 12:55:17 -0800 Subject: [PATCH] PIP-52: [pulsar-sever] Add support of dispatch throttling relative to publish-rate --- .../persistent/DispatchRateLimiter.java | 33 ++++++++- .../service/persistent/PersistentTopic.java | 19 ++++- .../persistent/SubscribeRateLimiter.java | 12 +-- .../api/MessageDispatchThrottlingTest.java | 74 +++++++++++++++++++ .../pulsar/admin/cli/CmdNamespaces.java | 12 ++- .../common/policies/data/DispatchRate.java | 7 ++ .../pulsar/common/util/RateLimiter.java | 18 ++++- .../pulsar/common/util/RateLimiterTest.java | 14 +++- .../instance/stats/FunctionStatsManager.java | 4 +- .../instance/stats/SinkStatsManager.java | 4 +- .../instance/stats/SourceStatsManager.java | 4 +- 11 files changed, 175 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index d7d0ade129776..550ad3867d7da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @@ -44,17 +45,23 @@ public enum Type { REPLICATOR } + private final PersistentTopic topic; private final String topicName; private final Type type; private final BrokerService brokerService; private RateLimiter dispatchRateLimiterOnMessage; private RateLimiter dispatchRateLimiterOnByte; + private long subscriptionRelativeRatelimiterOnMessage; + private long subscriptionRelativeRatelimiterOnByte; public DispatchRateLimiter(PersistentTopic topic, Type type) { + this.topic = topic; this.topicName = topic.getName(); this.brokerService = topic.getBrokerService(); this.type = type; + this.subscriptionRelativeRatelimiterOnMessage = -1; + this.subscriptionRelativeRatelimiterOnByte = -1; updateDispatchRate(); } @@ -272,14 +279,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { long byteRate = dispatchRate.dispatchThrottlingRateInByte; long ratePeriod = dispatchRate.ratePeriodInSecond; + Supplier permitUpdaterMsg = dispatchRate.relativeToPublishRate + ? () -> getRelativeDispatchRateInMsg(dispatchRate) + : null; // update msg-rateLimiter if (msgRate > 0) { if (this.dispatchRateLimiterOnMessage == null) { this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate, - ratePeriod, TimeUnit.SECONDS); + ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg); } else { this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond, - TimeUnit.SECONDS); + TimeUnit.SECONDS, permitUpdaterMsg); } } else { // message-rate should be disable and close @@ -289,14 +299,17 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { } } + Supplier permitUpdaterByte = dispatchRate.relativeToPublishRate + ? () -> getRelativeDispatchRateInByte(dispatchRate) + : null; // update byte-rateLimiter if (byteRate > 0) { if (this.dispatchRateLimiterOnByte == null) { this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate, - ratePeriod, TimeUnit.SECONDS); + ratePeriod, TimeUnit.SECONDS, permitUpdaterByte); } else { this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond, - TimeUnit.SECONDS); + TimeUnit.SECONDS, permitUpdaterByte); } } else { // message-rate should be disable and close @@ -307,6 +320,18 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { } } + private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) { + return (topic != null && dispatchRate != null) + ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.dispatchThrottlingRateInMsg + : 0; + } + + private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) { + return (topic != null && dispatchRate != null) + ? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.dispatchThrottlingRateInByte + : 0; + } + /** * Get configured msg dispatch-throttling rate. Returns -1 if not configured * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f45bb96262087..efbc4c05274d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -174,6 +174,8 @@ protected TopicStatsHelper initialValue() { }; private final AtomicLong pendingWriteOps = new AtomicLong(0); + private volatile double lastUpdatedAvgPublishRateInMsg = 0; + private volatile double lastUpdatedAvgPublishRateInByte = 0; private static class TopicStatsHelper { public double averageMsgSize; @@ -1283,7 +1285,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats } }); topicStatsStream.endList(); - + // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep + // average rate. + lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg + ? topicStatsHelper.aggMsgRateIn + : (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2; + lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte + ? topicStatsHelper.aggMsgThroughputIn + : (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2; // Start replicator stats topicStatsStream.startObject("replication"); nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size(); @@ -1447,6 +1456,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.reset(); } + public double getLastUpdatedAvgPublishRateInMsg() { + return lastUpdatedAvgPublishRateInMsg; + } + + public double getLastUpdatedAvgPublishRateInByte() { + return lastUpdatedAvgPublishRateInByte; + } + public TopicStats getStats() { TopicStats stats = new TopicStats(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 3a05eac1cd503..ee072504c5f52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -23,8 +23,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.util.RateLimiter; @@ -38,10 +36,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.apache.pulsar.broker.web.PulsarWebResource.path; - public class SubscribeRateLimiter { private final String topicName; @@ -121,7 +115,6 @@ private synchronized void removeSubscribeLimiter(ConsumerIdentifier consumerIden * @param subscribeRate */ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) { - long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer; long ratePeriod = subscribeRate.ratePeriodInSecond; @@ -129,9 +122,10 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif if (ratePerConsumer > 0) { if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS)); + ratePeriod, TimeUnit.SECONDS, null)); } else { - this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS); + this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, + null); } } else { // subscribe-rate should be disable and close diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 6d139f3971212..b85e088e82d68 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -20,12 +20,15 @@ import com.google.common.collect.Sets; +import static org.testng.Assert.assertNotNull; + import java.lang.reflect.Field; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -904,4 +907,75 @@ protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception { }); } + /** + * It verifies that relative throttling at least dispatch messages as publish-rate. + * + * @param subscription + * @throws Exception + */ + @Test(dataProvider = "subscriptions") + public void testRelativeMessageRateLimitingThrottling(SubscriptionType subscription) throws Exception { + log.info("-- Starting {} test --", methodName); + + final String namespace = "my-property/relative_throttling_ns"; + final String topicName = "persistent://" + namespace + "/relative-throttle"; + + final int messageRate = 1; + DispatchRate dispatchRate = new DispatchRate(messageRate, -1, 1, true); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + admin.namespaces().setDispatchRate(namespace, dispatchRate); + // create producer and topic + Producer producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + boolean isMessageRateUpdate = false; + int retry = 10; + for (int i = 0; i < retry; i++) { + if (topic.getDispatchRateLimiter().get().getDispatchRateOnMsg() > 0) { + isMessageRateUpdate = true; + break; + } else { + if (i != retry - 1) { + Thread.sleep(100); + } + } + } + Assert.assertTrue(isMessageRateUpdate); + Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate); + Thread.sleep(2000); + + final int numProducedMessages = 1000; + + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscriptionType(subscription).subscribe(); + // deactive cursors + deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger()); + + // send a message, which will make dispatcher-ratelimiter initialize and schedule renew task + producer.send("test".getBytes()); + assertNotNull(consumer.receive(100, TimeUnit.MILLISECONDS)); + + Field lastUpdatedMsgRateIn = PersistentTopic.class.getDeclaredField("lastUpdatedAvgPublishRateInMsg"); + lastUpdatedMsgRateIn.setAccessible(true); + lastUpdatedMsgRateIn.set(topic, numProducedMessages); + + for (int i = 0; i < numProducedMessages; i++) { + final String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + int totalReceived = 0; + // Relative throttling will let it drain immediately because it allows to dispatch = (publish-rate + + // dispatch-rate) + for (int i = 0; i < numProducedMessages; i++) { + Message msg = consumer.receive(100, TimeUnit.MILLISECONDS); + totalReceived++; + assertNotNull(msg); + } + + Assert.assertEquals(totalReceived, numProducedMessages); + + consumer.close(); + producer.close(); + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index eb72052dfd381..f3c4028e68252 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -536,11 +536,15 @@ private class SetDispatchRate extends CliCommand { "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false) private int dispatchRatePeriodSec = 1; + @Parameter(names = { "--relative-to-publish-rate", + "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false) + private boolean relativeToPublishRate = false; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); admin.namespaces().setDispatchRate(namespace, - new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec)); + new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate)); } } @@ -608,11 +612,15 @@ private class SetSubscriptionDispatchRate extends CliCommand { "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)\n", required = false) private int dispatchRatePeriodSec = 1; + @Parameter(names = { "--relative-to-publish-rate", + "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false) + private boolean relativeToPublishRate = false; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); admin.namespaces().setSubscriptionDispatchRate(namespace, - new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec)); + new DispatchRate(msgDispatchRate, byteDispatchRate, dispatchRatePeriodSec, relativeToPublishRate)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java index c93a71388c507..002cdac09f338 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/DispatchRate.java @@ -28,6 +28,7 @@ public class DispatchRate { public int dispatchThrottlingRateInMsg = -1; public long dispatchThrottlingRateInByte = -1; + public boolean relativeToPublishRate = false; /* throttles dispatch relatively publish-rate */ public int ratePeriodInSecond = 1; /* by default dispatch-rate will be calculate per 1 second */ public DispatchRate() { @@ -45,6 +46,12 @@ public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRate this.ratePeriodInSecond = ratePeriodInSecond; } + public DispatchRate(int dispatchThrottlingRateInMsg, long dispatchThrottlingRateInByte, + int ratePeriodInSecond, boolean relativeToPublishRate) { + this(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, ratePeriodInSecond); + this.relativeToPublishRate = relativeToPublishRate; + } + @Override public int hashCode() { return Objects.hash(dispatchThrottlingRateInMsg, dispatchThrottlingRateInByte, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 006cddd16c3c7..45ae8e2a41276 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a @@ -57,19 +58,22 @@ public class RateLimiter implements AutoCloseable{ private long permits; private long acquiredPermits; private boolean isClosed; + // permitUpdate helps to update permit-rate at runtime + private Supplier permitUpdater; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit); + this(null, permits, rateTime, timeUnit, null); } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit) { + final TimeUnit timeUnit, Supplier permitUpdater) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); this.rateTime = rateTime; this.timeUnit = timeUnit; this.permits = permits; + this.permitUpdater = permitUpdater; if (service != null) { this.executorService = service; @@ -198,14 +202,16 @@ public synchronized void setRate(long permits) { * @param permits * @param rateTime * @param timeUnit + * @param permitUpdaterByte */ - public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit) { + public synchronized void setRate(long permits, long rateTime, TimeUnit timeUnit, Supplier permitUpdaterByte) { if (renewTask != null) { renewTask.cancel(false); } this.permits = permits; this.rateTime = rateTime; this.timeUnit = timeUnit; + this.permitUpdater = permitUpdaterByte; this.renewTask = createTask(); } @@ -232,6 +238,12 @@ protected ScheduledFuture createTask() { synchronized void renew() { acquiredPermits = 0; + if (permitUpdater != null) { + long newPermitRate = permitUpdater.get(); + if (newPermitRate > 0) { + setRate(newPermitRate); + } + } notifyAll(); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java index 71f7fa3ce4037..c20f5d16c997b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertEquals; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.testng.annotations.Test; @@ -152,7 +153,7 @@ public void testResetRate() throws Exception { assertEquals(rate.getAvailablePermits(), permits); // change rate-time from 1sec to 5sec - rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS); + rate.setRate(permits, 5 * rateTimeMSec, TimeUnit.MILLISECONDS, null); assertEquals(rate.getAvailablePermits(), 100); assertEquals(rate.tryAcquire(permits), true); assertEquals(rate.getAvailablePermits(), 0); @@ -163,4 +164,15 @@ public void testResetRate() throws Exception { rate.close(); } + @Test + public void testRateLimiterWithPermitUpdater() throws Exception{ + long permits = 10; + long rateTime = 1; + long newUpdatedRateLimit = 100L; + Supplier permitUpdater = () -> newUpdatedRateLimit; + RateLimiter limiter = new RateLimiter(null, permits , 1, TimeUnit.SECONDS, permitUpdater); + limiter.acquire(); + Thread.sleep(rateTime*3*1000); + assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit); + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 7943d5a0a5418..fdedb7452d12c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -232,8 +232,8 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, .help("Exception from sink.") .register(collectorRegistry); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); } public void addUserException(Throwable ex) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 3ed2aa15c99d7..c913225c09677 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -174,8 +174,8 @@ public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLab .help("Exception from sink.") .register(collectorRegistry); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index f496709804e98..0ec73523be67d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -173,8 +173,8 @@ public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsL .help("Exception from source.") .register(collectorRegistry); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); } @Override