Skip to content

Commit

Permalink
[fix] [admin] set ns level backlog quota does not take effect if rete…
Browse files Browse the repository at this point in the history
…ntion exists (#20690)

Motivation: When a retention policy exists, the command `pulsar-admin namespaces set-backlog-quota` does not take effect.In the PR #17383: it only checked the compatibility of `retention` and `backlog quota`, and it was not set.

Modifications: Fix the bug.
  • Loading branch information
poorbarcode authored Jun 30, 2023
1 parent 5abadbe commit 6acd01d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,7 @@ protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType backlogQ
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
policies.backlog_quota_map.put(quotaType, quota);
return policies;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3304,4 +3304,26 @@ private void testDeleteNamespaceForciblyWithManyTopics() throws Exception {
admin.namespaces().deleteNamespace(ns, true);
Assert.assertFalse(admin.namespaces().getNamespaces(defaultTenant).contains(ns));
}

@Test
private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Exception {
final String ns = defaultTenant + "/ns-testSetBacklogQuotasNamespaceLevel";
final long backlogQuotaLimitSize = 100000002;
final int backlogQuotaLimitTime = 2;
admin.namespaces().createNamespace(ns, 2);
// create retention.
admin.namespaces().setRetention(ns, new RetentionPolicies(1800, 10000));
// set backlog quota.
admin.namespaces().setBacklogQuota(ns, BacklogQuota.builder()
.limitSize(backlogQuotaLimitSize).limitTime(backlogQuotaLimitTime).build());
// Verify result.
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> map = admin.namespaces().getBacklogQuotaMap(ns);
assertEquals(map.size(), 1);
assertTrue(map.containsKey(BacklogQuota.BacklogQuotaType.destination_storage));
BacklogQuota backlogQuota = map.get(BacklogQuota.BacklogQuotaType.destination_storage);
assertEquals(backlogQuota.getLimitSize(), backlogQuotaLimitSize);
assertEquals(backlogQuota.getLimitTime(), backlogQuotaLimitTime);
// cleanup.
admin.namespaces().deleteNamespace(ns);
}
}

0 comments on commit 6acd01d

Please sign in to comment.