diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 93ae777a89e2f7..210c6f8767a062 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -210,22 +210,28 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis(); ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); try { - for (;;) { + for (; ; ) { ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); Position oldestPosition = slowestConsumer.getMarkDeletedPosition(); + if (log.isDebugEnabled()) { + log.debug("[{}] slowest consumer mark delete position is [{}], read position is [{}]", + slowestConsumer.getName(), oldestPosition, slowestConsumer.getReadPosition()); + } ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get(); if (ledgerInfo == null) { - slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition)); + PositionImpl nextPosition = + PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1); + slowestConsumer.markDelete(nextPosition); continue; } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor - PositionImpl nextPosition = mLedger.getNextValidPosition( - PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)); + PositionImpl nextPosition = + PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); if (!nextPosition.equals(oldestPosition)) { - slowestConsumer.resetCursor(nextPosition); + slowestConsumer.markDelete(nextPosition); continue; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index e60486b3441824..f33e02ca676017 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -512,18 +513,22 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14); assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); + PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger(); + Position slowConsumerReadPos = ml.getSlowestConsumer().getReadPosition(); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); rolloverStats(); TopicStats stats2 = getTopicStats(topic1); - PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger(); // Messages on first 2 ledgers should be expired, backlog is number of // message in current ledger. Awaitility.await().untilAsserted(() -> { assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), ml.getCurrentLedgerEntries()); assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), ml.getCurrentLedgerEntries()); }); + + assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos); client.close(); }