Skip to content

Commit

Permalink
[fix][broker]consumer backlog eviction policy should not reset read p…
Browse files Browse the repository at this point in the history
…osition for consumer (#5)

### Motivation
fix apache#18036

### Modifications
- The backlog eviction policy should use `asyncMarkDelete` instead of `resetCursor` in order to move the mark delete position.
  • Loading branch information
HQebupt authored Oct 19, 2022
1 parent c732852 commit 98e9089
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,28 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
try {
for (;;) {
for (; ; ) {
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
Position oldestPosition = slowestConsumer.getMarkDeletedPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] slowest consumer mark delete position is [{}], read position is [{}]",
slowestConsumer.getName(), oldestPosition, slowestConsumer.getReadPosition());
}
ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
if (ledgerInfo == null) {
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition));
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
slowestConsumer.markDelete(nextPosition);
continue;
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition = mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1));
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
if (!nextPosition.equals(oldestPosition)) {
slowestConsumer.resetCursor(nextPosition);
slowestConsumer.markDelete(nextPosition);
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -512,18 +513,22 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);

PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger();
Position slowConsumerReadPos = ml.getSlowestConsumer().getReadPosition();

Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
rolloverStats();

TopicStats stats2 = getTopicStats(topic1);
PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger();
// Messages on first 2 ledgers should be expired, backlog is number of
// message in current ledger.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), ml.getCurrentLedgerEntries());
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), ml.getCurrentLedgerEntries());
});

assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos);
client.close();
}

Expand Down

0 comments on commit 98e9089

Please sign in to comment.