From 22371c22b3989d1abc85f1832f1b3438d83fcb61 Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Wed, 19 Oct 2022 17:11:22 +0800 Subject: [PATCH] [improve][broker]consumer backlog eviction policy should not reset read position for consumer (#18037) Fixes #18036 - The backlog eviction policy should use `asyncMarkDelete` instead of `resetCursor` in order to move the mark delete position. (cherry picked from commit 0b7140b2da295058159e56533cdc1371c1388e8c) --- .../broker/service/BacklogQuotaManager.java | 16 +++++++++++----- .../service/BacklogQuotaManagerTest.java | 18 +++++++++++++----- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 805d00adca6bb..915f9e7c6b917 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -210,22 +210,28 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis(); ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); try { - for (;;) { + for (; ; ) { ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); Position oldestPosition = slowestConsumer.getMarkDeletedPosition(); + if (log.isDebugEnabled()) { + log.debug("[{}] slowest consumer mark delete position is [{}], read position is [{}]", + slowestConsumer.getName(), oldestPosition, slowestConsumer.getReadPosition()); + } ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get(); if (ledgerInfo == null) { - slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition)); + PositionImpl nextPosition = + PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1); + slowestConsumer.markDelete(nextPosition); continue; } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { // skip whole ledger for the slowest cursor - PositionImpl nextPosition = mLedger.getNextValidPosition( - PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)); + PositionImpl nextPosition = + PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); if (!nextPosition.equals(oldestPosition)) { - slowestConsumer.resetCursor(nextPosition); + slowestConsumer.markDelete(nextPosition); continue; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index d0dbb86e69376..c999fa744efec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -502,16 +503,23 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14); assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14); + PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger(); + Position slowConsumerReadPos = ml.getSlowestConsumer().getReadPosition(); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); rolloverStats(); - stats = getTopicStats(topic1); - PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger(); + TopicStats stats2 = getTopicStats(topic1); // Messages on first 2 ledgers should be expired, backlog is number of // message in current ledger. - assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), ml.getCurrentLedgerEntries()); - assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), ml.getCurrentLedgerEntries()); + Awaitility.await().untilAsserted(() -> { + assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), ml.getCurrentLedgerEntries()); + assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), ml.getCurrentLedgerEntries()); + }); + + assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos); + client.close(); } @Test