Skip to content

Commit

Permalink
[improve][broker]consumer backlog eviction policy should not reset re…
Browse files Browse the repository at this point in the history
…ad position for consumer (#18350)
  • Loading branch information
Technoboy- authored Nov 7, 2022
1 parent 3a936bc commit 14824a5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,28 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
try {
for (;;) {
for (; ; ) {
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
Position oldestPosition = slowestConsumer.getMarkDeletedPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] slowest consumer mark delete position is [{}], read position is [{}]",
slowestConsumer.getName(), oldestPosition, slowestConsumer.getReadPosition());
}
ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
if (ledgerInfo == null) {
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition));
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
slowestConsumer.markDelete(nextPosition);
continue;
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition = mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1));
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
if (!nextPosition.equals(oldestPosition)) {
slowestConsumer.resetCursor(nextPosition);
slowestConsumer.markDelete(nextPosition);
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -47,7 +50,6 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -148,7 +150,7 @@ private void rolloverStats() {
}

/**
* Readers should not effect backlog quota
* Readers should not effect backlog quota.
*/
@Test
public void testBacklogQuotaWithReader() throws Exception {
Expand All @@ -160,11 +162,13 @@ public void testBacklogQuotaWithReader() throws Exception {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1";
final int numMsgs = 20;

Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1)
.startMessageId(MessageId.latest).create();

org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);

Expand All @@ -187,7 +191,7 @@ public void testBacklogQuotaWithReader() throws Exception {
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");

try {
// try to send over backlog quota and make sure it fails
Expand Down Expand Up @@ -237,10 +241,12 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
final int numMsgs = 20;
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1)
.startMessageId(MessageId.latest).create();
Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
Expand All @@ -257,7 +263,7 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception {
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
try {
// try to send over backlog quota and make sure it fails
for (int i = 0; i < numMsgs; i++) {
Expand Down Expand Up @@ -307,10 +313,12 @@ public void testTriggerBacklogTimeQuotaWithReader() throws Exception {
.limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
.build());
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
.statsInterval(0, TimeUnit.SECONDS).build();) {
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
final int numMsgs = 9;
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1)
.startMessageId(MessageId.latest).create();
Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
Expand Down Expand Up @@ -472,14 +480,22 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 14);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 14);

PersistentTopic topic1Reference = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1Reference.getManagedLedger();
Position slowConsumerReadPos = ml.getSlowestConsumer().getReadPosition();

Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
rolloverStats();

stats = admin.topics().getStats(topic1);
TopicStats stats2 = admin.topics().getStats(topic1);
// Messages on first 2 ledgers should be expired, backlog is number of
// message in current ledger which should be 4.
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 4);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 4);
// message in current ledger.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), ml.getCurrentLedgerEntries());
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), ml.getCurrentLedgerEntries());
});

assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos);
client.close();
}

Expand Down Expand Up @@ -1284,7 +1300,8 @@ public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception {
pulsar.start();

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS)
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.build();

final String topic1 = "persistent://prop/ns-quota/topic2";
Expand Down

0 comments on commit 14824a5

Please sign in to comment.