diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index b11538a9217e2..93ae777a89e2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -220,7 +220,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = mLedger.getNextValidPosition( PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index fccc7abc66854..9b45c0082e944 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -111,7 +111,7 @@ void setup() throws Exception { config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); - config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2); config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); @@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { client.close(); } + @Test(timeOut = 60000) + public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception { + assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), + new HashMap<>()); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(5) // set limit time as 5 seconds + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), BacklogQuota.BacklogQuotaType.message_age); + PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID(); + final String subName1 = "c1"; + final String subName2 = "c2"; + int numMsgs = 5; + + Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); + Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + TopicStats stats = getTopicStats(topic1); + assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5); + assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); + + // Sleep 5000 mills for first 5 messages. + Thread.sleep(5000l); + numMsgs = 9; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + // The first 5 messages are expired after sleeping 2000 more mills. + Thread.sleep(2000l); + rolloverStats(); + + TopicStats stats2 = getTopicStats(topic1); + // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not. + Awaitility.await().untilAsserted(() -> { + assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); + assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9); + }); + client.close(); + } + + @Test public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),