Skip to content

Commit

Permalink
[improve][broker] Make splitNamespaceBundle and getTopicHashPositions…
Browse files Browse the repository at this point in the history
… async (#16411)

* make splitNamespaceBundle and getTopicHashPositions async

* appli comments

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Co-authored-by: Zixuan Liu <[email protected]>

* fix error

* fix error

* fix check style error

Co-authored-by: gavingaozhangmin <[email protected]>
Co-authored-by: Zixuan Liu <[email protected]>
  • Loading branch information
3 people authored Jul 25, 2022
1 parent eb5725a commit f47c705
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1115,116 +1115,99 @@ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleR
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
validateSuperUserAccess();
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);

String bundleRange = getBundleRange(bundleName);

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}

validatePoliciesReadOnlyAccess();

List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
}
}

NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, false);
} catch (Exception e) {
asyncResponse.resume(e);
return;
}
protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
boolean authoritative, boolean unload,
String splitAlgorithmName,
List<Long> splitBoundaries) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms);
}
if (splitAlgorithmName
.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
throw new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be "
+ "emtpy");
}
}
})
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return validateClusterOwnershipAsync(namespaceName.getCluster())
.thenCompose(ignore -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies->{
String bundleRange = getBundleRange(bundleName);
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false)
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));

pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
if (ex.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, ex.getMessage());
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Split bundle failed due to invalid request"));
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
asyncResponse.resume(new RestException(ex.getCause()));
}
return null;
});
});
}

protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsync(String bundleRange,
List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
}
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
false, true);
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
(allTopicsInThisBundle, throwable) -> {
if (throwable != null) {
log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
namespaceName, bundle);
asyncResponse.resume(new RestException(throwable));
}
// if topics is empty, return all topics' hash position in this bundle
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> {
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
false, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
.thenApply(allTopicsInThisBundle -> {
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).toList()) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService()
.getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
}
}
}
}
}
}
}
asyncResponse.resume(
new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
return new TopicHashPositions(namespaceName.toString(), bundleRange,
topicHashPositions);
});
});
}

Expand Down
Loading

0 comments on commit f47c705

Please sign in to comment.