Skip to content

Commit

Permalink
[improve][broker] Skip unloading when bundle throughput is zero
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Nov 21, 2024
1 parent 949750f commit 13b3e05
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
var stat = etr.getValue();

// skip zero traffic bundles
if (stat.msgThroughputIn + stat.msgThroughputOut == 0) {
continue;
}
// TODO: do not filter system topic while shedding
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,14 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,

var bundleData = e.stats();
double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
if (maxBrokerBundleThroughput == 0) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " It has zero throughput.", bundle));
}
numOfBrokersWithFewBundles++;
break;
}
boolean swap = false;
List<Unload> minToMaxUnloads = new ArrayList<>();
double minBrokerBundleSwapThroughput = 0.0;
Expand All @@ -549,6 +557,9 @@ public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
var minBrokerBundleThroughput =
minBrokerBundleData.stats().msgThroughputIn
+ minBrokerBundleData.stats().msgThroughputOut;
if (minBrokerBundleThroughput == 0) {
continue;
}
var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
if (maxBrokerNewThroughputTmp < maxBrokerThroughput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,17 @@ public void testTopBundlesLoadData() {
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 100000;
stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 500;
stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);

NamespaceBundleStats stats3 = new NamespaceBundleStats();
stats3.msgRateIn = 10000;
stats3.msgThroughputOut = 10;
bundleStats.put(bundle3, stats3);

NamespaceBundleStats stats4 = new NamespaceBundleStats();
Expand All @@ -118,10 +121,12 @@ public void testSystemNamespace() {
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
stats1.msgThroughputOut = 10;
bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
stats2.msgThroughputOut = 10;
bundleStats.put(bundle1, stats2);

topKBundles.update(bundleStats, 2);
Expand All @@ -131,6 +136,21 @@ public void testSystemNamespace() {
assertEquals(top0.bundleName(), bundle1);
}

@Test
public void testZeroMsgThroughputBundleStats() {
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
bundleStats.put(bundle1, stats2);

topKBundles.update(bundleStats, 2);

assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0);
}


private void setAntiAffinityGroup() throws MetadataStoreException {
LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
Expand Down Expand Up @@ -166,10 +186,12 @@ public void testIsolationPolicy() throws MetadataStoreException {
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);

topKBundles.update(bundleStats, 2);
Expand All @@ -188,10 +210,12 @@ public void testAntiAffinityGroupPolicy() throws MetadataStoreException {
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);

topKBundles.update(bundleStats, 2);
Expand All @@ -213,10 +237,12 @@ public void testLoadBalancerSheddingBundlesWithPoliciesEnabledConfig() throws Me
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);

topKBundles.update(bundleStats, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,26 @@ public void testBundleThroughputLargerThanOffloadThreshold() {
assertEquals(counter.getLoadStd(), setupLoadStd);
}

@Test
public void testZeroBundleThroughput() {
UnloadCounter counter = new UnloadCounter();
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
for (var e : topBundlesLoadDataStore.entrySet()) {
for (var stat : e.getValue().getTopBundlesLoadData()) {
stat.stats().msgThroughputOut = 0;
stat.stats().msgThroughputIn = 0;

}
}
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
}


@Test
public void testTargetStdAfterTransfer() {
Expand Down

0 comments on commit 13b3e05

Please sign in to comment.