Skip to content

Commit

Permalink
[improve][admin] Make getBundleRange async (#17402)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Sep 8, 2022
1 parent 88dd816 commit ebe7220
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1155,18 +1155,14 @@ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundl
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies->{
String bundleRange = getBundleRange(bundleName);
if (bundleRange == null) {
throw new RestException(Status.NOT_FOUND,
String.format("Bundle range %s not found", bundleName));
}
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false)
.thenCompose(__ -> getBundleRangeAsync(bundleName))
.thenCompose(bundleRange -> {
return getNamespacePoliciesAsync(namespaceName)
.thenCompose(policies ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));

});
}

Expand Down Expand Up @@ -1219,27 +1215,32 @@ protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsy
});
}

private String getBundleRange(String bundleName) {
NamespaceBundle nsBundle;
private CompletableFuture<String> getBundleRangeAsync(String bundleName) {
CompletableFuture<NamespaceBundle> future;
if (BundleType.LARGEST.toString().equals(bundleName)) {
nsBundle = findLargestBundleWithTopics(namespaceName);
future = findLargestBundleWithTopicsAsync(namespaceName);
} else if (BundleType.HOT.toString().equals(bundleName)) {
nsBundle = findHotBundle(namespaceName);
future = findHotBundleAsync(namespaceName);
} else {
return bundleName;
return CompletableFuture.completedFuture(bundleName);
}
if (nsBundle == null) {
return null;
}
return nsBundle.getBundleRange();
return future.thenApply(nsBundle -> {
if (nsBundle == null) {
throw new RestException(Status.NOT_FOUND,
String.format("Bundle range %s not found", bundleName));
}
return nsBundle.getBundleRange();
});
}

private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestTopics(namespaceName);
private CompletableFuture<NamespaceBundle> findLargestBundleWithTopicsAsync(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundleWithHighestTopicsAsync(namespaceName);
}

private NamespaceBundle findHotBundle(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestThroughput(namespaceName);
private CompletableFuture<NamespaceBundle> findHotBundleAsync(NamespaceName namespaceName) {
return pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundleWithHighestThroughputAsync(namespaceName);
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,25 @@ public NamespaceBundle getBundle(TopicName topic) {
return bundles != null ? bundles.findBundle(topic) : null;
}

public NamespaceBundle getBundleWithHighestThroughput(NamespaceName nsName) {
public CompletableFuture<NamespaceBundle> getBundleWithHighestThroughputAsync(NamespaceName nsName) {
LoadManager loadManager = pulsar.getLoadManager().get();
if (loadManager instanceof ModularLoadManagerWrapper) {
NamespaceBundles bundles = getBundles(nsName);
double maxMsgThroughput = -1;
NamespaceBundle bundleWithHighestThroughput = null;
for (NamespaceBundle bundle : bundles.getBundles()) {
BundleData bundleData = ((ModularLoadManagerWrapper) loadManager).getLoadManager()
.getBundleDataOrDefault(bundle.toString());
if (bundleData.getTopics() > 0
&& bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput) {
maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput();
bundleWithHighestThroughput = bundle;
return getBundlesAsync(nsName).thenApply(bundles -> {
double maxMsgThroughput = -1;
NamespaceBundle bundleWithHighestThroughput = null;
for (NamespaceBundle bundle : bundles.getBundles()) {
BundleData bundleData = ((ModularLoadManagerWrapper) loadManager).getLoadManager()
.getBundleDataOrDefault(bundle.toString());
if (bundleData.getTopics() > 0
&& bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput) {
maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput();
bundleWithHighestThroughput = bundle;
}
}
}
return bundleWithHighestThroughput;
return bundleWithHighestThroughput;
});
}
return getBundleWithHighestTopics(nsName);
return getBundleWithHighestTopicsAsync(nsName);
}

public NamespaceBundles getBundles(NamespaceName nsname) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ public void testSplitBundleWithHighestThroughput() throws Exception {
assertEquals(targetBundleData.getTopics(), 10);
});

String hotBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestThroughput(nsname)
.getBundleRange();
String hotBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestThroughputAsync(nsname)
.get().getBundleRange();

assertEquals(bundle, hotBundle);

Expand Down

0 comments on commit ebe7220

Please sign in to comment.