Skip to content

Commit

Permalink
[fix][broker]unify time unit at dropping the backlog on a topic
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt committed Oct 7, 2022
1 parent 1148204 commit b297fa9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition = mLedger.getNextValidPosition(
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void setup() throws Exception {
config.setBrokerServicePort(Optional.of(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2);
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType("non-partitioned");
Expand Down Expand Up @@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
client.close();
}

@Test(timeOut = 60000)
public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
new HashMap<>());
admin.namespaces().setBacklogQuota("prop/ns-quota",
BacklogQuota.builder()
.limitTime(5) // set limit time as 5 seconds
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build(), BacklogQuota.BacklogQuotaType.message_age);
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
final String subName1 = "c1";
final String subName2 = "c2";
int numMsgs = 5;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}

TopicStats stats = getTopicStats(topic1);
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);

// Sleep 5000 mills for first 5 messages.
Thread.sleep(5000l);
numMsgs = 9;
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}

// The first 5 messages are expired after sleeping 2000 more mills.
Thread.sleep(2000l);
rolloverStats();

TopicStats stats2 = getTopicStats(topic1);
// The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9);
});
client.close();
}


@Test
public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Expand Down

0 comments on commit b297fa9

Please sign in to comment.