From ebe7220dd11e234b6108935e2b6b89f4a5cc51b8 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Thu, 8 Sep 2022 21:32:45 +0800 Subject: [PATCH] [improve][admin] Make `getBundleRange` async (#17402) --- .../broker/admin/impl/NamespacesBase.java | 47 ++++++++++--------- .../common/naming/NamespaceBundleFactory.java | 29 ++++++------ .../namespace/NamespaceServiceTest.java | 4 +- 3 files changed, 41 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 122f6f3d0ccec..d0f1ef0acc207 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1155,18 +1155,14 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl } }) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies->{ - String bundleRange = getBundleRange(bundleName); - if (bundleRange == null) { - throw new RestException(Status.NOT_FOUND, - String.format("Bundle range %s not found", bundleName)); - } - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, - authoritative, false) + .thenCompose(__ -> getBundleRangeAsync(bundleName)) + .thenCompose(bundleRange -> { + return getNamespacePoliciesAsync(namespaceName) + .thenCompose(policies -> + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)); - }); } @@ -1219,27 +1215,32 @@ protected CompletableFuture internalGetTopicHashPositionsAsy }); } - private String getBundleRange(String bundleName) { - NamespaceBundle nsBundle; + private CompletableFuture getBundleRangeAsync(String bundleName) { + CompletableFuture future; if (BundleType.LARGEST.toString().equals(bundleName)) { - nsBundle = findLargestBundleWithTopics(namespaceName); + future = findLargestBundleWithTopicsAsync(namespaceName); } else if (BundleType.HOT.toString().equals(bundleName)) { - nsBundle = findHotBundle(namespaceName); + future = findHotBundleAsync(namespaceName); } else { - return bundleName; + return CompletableFuture.completedFuture(bundleName); } - if (nsBundle == null) { - return null; - } - return nsBundle.getBundleRange(); + return future.thenApply(nsBundle -> { + if (nsBundle == null) { + throw new RestException(Status.NOT_FOUND, + String.format("Bundle range %s not found", bundleName)); + } + return nsBundle.getBundleRange(); + }); } - private NamespaceBundle findLargestBundleWithTopics(NamespaceName namespaceName) { - return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestTopics(namespaceName); + private CompletableFuture findLargestBundleWithTopicsAsync(NamespaceName namespaceName) { + return pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundleWithHighestTopicsAsync(namespaceName); } - private NamespaceBundle findHotBundle(NamespaceName namespaceName) { - return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundleWithHighestThroughput(namespaceName); + private CompletableFuture findHotBundleAsync(NamespaceName namespaceName) { + return pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundleWithHighestThroughputAsync(namespaceName); } private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 78eff0d0794ef..940eee139b953 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -237,24 +237,25 @@ public NamespaceBundle getBundle(TopicName topic) { return bundles != null ? bundles.findBundle(topic) : null; } - public NamespaceBundle getBundleWithHighestThroughput(NamespaceName nsName) { + public CompletableFuture getBundleWithHighestThroughputAsync(NamespaceName nsName) { LoadManager loadManager = pulsar.getLoadManager().get(); if (loadManager instanceof ModularLoadManagerWrapper) { - NamespaceBundles bundles = getBundles(nsName); - double maxMsgThroughput = -1; - NamespaceBundle bundleWithHighestThroughput = null; - for (NamespaceBundle bundle : bundles.getBundles()) { - BundleData bundleData = ((ModularLoadManagerWrapper) loadManager).getLoadManager() - .getBundleDataOrDefault(bundle.toString()); - if (bundleData.getTopics() > 0 - && bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput) { - maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput(); - bundleWithHighestThroughput = bundle; + return getBundlesAsync(nsName).thenApply(bundles -> { + double maxMsgThroughput = -1; + NamespaceBundle bundleWithHighestThroughput = null; + for (NamespaceBundle bundle : bundles.getBundles()) { + BundleData bundleData = ((ModularLoadManagerWrapper) loadManager).getLoadManager() + .getBundleDataOrDefault(bundle.toString()); + if (bundleData.getTopics() > 0 + && bundleData.getLongTermData().totalMsgThroughput() > maxMsgThroughput) { + maxMsgThroughput = bundleData.getLongTermData().totalMsgThroughput(); + bundleWithHighestThroughput = bundle; + } } - } - return bundleWithHighestThroughput; + return bundleWithHighestThroughput; + }); } - return getBundleWithHighestTopics(nsName); + return getBundleWithHighestTopicsAsync(nsName); } public NamespaceBundles getBundles(NamespaceName nsname) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 03260aee46d7d..b30633ef635ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -664,8 +664,8 @@ public void testSplitBundleWithHighestThroughput() throws Exception { assertEquals(targetBundleData.getTopics(), 10); }); - String hotBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestThroughput(nsname) - .getBundleRange(); + String hotBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestThroughputAsync(nsname) + .get().getBundleRange(); assertEquals(bundle, hotBundle);