Skip to content

Commit

Permalink
[fix][broker] Fix namespace backlog quota check with retention. (#17706)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored and Technoboy- committed Sep 19, 2022
1 parent 43ada02 commit 8b26943
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,21 @@ protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRe
}

protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) {
if (retention == null || retention.getRetentionSizeInMB() <= 0 || retention.getRetentionTimeInMinutes() <= 0) {
if (retention == null
|| (retention.getRetentionSizeInMB() <= 0 && retention.getRetentionTimeInMinutes() <= 0)) {
return true;
}
if (quota == null) {
quota = pulsar().getBrokerService().getBacklogQuotaManager().getDefaultQuota();
}
if (quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {

if (retention.getRetentionSizeInMB() > 0
&& quota.getLimitSize() >= (retention.getRetentionSizeInMB() * 1024 * 1024)) {
return false;
}
// time based quota is in second
if (quota.getLimitTime() >= (retention.getRetentionTimeInMinutes() * 60)) {
if (retention.getRetentionTimeInMinutes() > 0
&& quota.getLimitTime() >= retention.getRetentionTimeInMinutes() * 60) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3397,4 +3397,36 @@ public void testGetPartitionStatsWithEarliestTimeInBacklog() throws PulsarAdminE
long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
Assert.assertNotEquals(value2, 0);
}

@Test
public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
String namespace = "prop-xyz/ns1";
//test size check.
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 10));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
});

//test time check
admin.namespaces().setRetention(namespace, new RetentionPolicies(10, -1));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60).build());
});

// test both size and time.
admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).
limitTime(9 * 60).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
});
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60).build());
});

}
}

0 comments on commit 8b26943

Please sign in to comment.