Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make splitNamespaceBundle and getTopicHashPositions async #16411

Merged
merged 11 commits into from
Jul 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -1115,116 +1115,99 @@ public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleR
}

@SuppressWarnings("deprecation")
protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String bundleName, boolean authoritative,
boolean unload, String splitAlgorithmName, List<Long> splitBoundaries) {
validateSuperUserAccess();
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);

String bundleRange = getBundleRange(bundleName);

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}

validatePoliciesReadOnlyAccess();

List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms));
}
if (splitAlgorithmName.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be emtpy"));
}
}

NamespaceBundle nsBundle;
try {
nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, false);
} catch (Exception e) {
asyncResponse.resume(e);
return;
}
protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
boolean authoritative, boolean unload,
String splitAlgorithmName,
List<Long> splitBoundaries) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName)) {
if (!supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unsupported namespace bundle split algorithm, supported algorithms are "
+ supportedNamespaceBundleSplitAlgorithms);
}
if (splitAlgorithmName
.equalsIgnoreCase(NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE)
&& (splitBoundaries == null || splitBoundaries.size() == 0)) {
throw new RestException(Status.PRECONDITION_FAILED,
"With specified_positions_divide split algorithm, splitBoundaries must not be "
+ "emtpy");
}
}
})
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return validateClusterOwnershipAsync(namespaceName.getCluster())
.thenCompose(ignore -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
}
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies->{
String bundleRange = getBundleRange(bundleName);
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false)
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));

pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)
.thenRun(() -> {
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
if (ex.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
bundleRange, ex.getMessage());
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Split bundle failed due to invalid request"));
} else {
log.error("[{}] Failed to split namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange, ex);
asyncResponse.resume(new RestException(ex.getCause()));
}
return null;
});
});
}

protected void internalGetTopicHashPositions(AsyncResponse asyncResponse, String bundleRange, List<String> topics) {
protected CompletableFuture<TopicHashPositions> internalGetTopicHashPositionsAsync(String bundleRange,
List<String> topics) {
if (log.isDebugEnabled()) {
log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topics, bundleRange);
}
validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
false, true);
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete(
(allTopicsInThisBundle, throwable) -> {
if (throwable != null) {
log.error("[{}] {} Failed to get topic list for bundle {}.", clientAppId(),
namespaceName, bundle);
asyncResponse.resume(new RestException(throwable));
}
// if topics is empty, return all topics' hash position in this bundle
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).collect(Collectors.toList())) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> {
return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
false, true)
.thenCompose(nsBundle ->
pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle))
.thenApply(allTopicsInThisBundle -> {
Map<String, Long> topicHashPositions = new HashMap<>();
if (topics == null || topics.size() == 0) {
allTopicsInThisBundle.forEach(t -> {
topicHashPositions.put(t,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(t));
});
} else {
for (String topic : topics.stream().map(Codec::decode).toList()) {
TopicName topicName = TopicName.get(topic);
// partitioned topic
if (topicName.getPartitionIndex() == -1) {
allTopicsInThisBundle.stream()
.filter(t -> TopicName.get(t).getPartitionedTopicName()
.equals(TopicName.get(topic).getPartitionedTopicName()))
.forEach(partition -> {
topicHashPositions.put(partition,
pulsar().getNamespaceService()
.getNamespaceBundleFactory()
.getLongHashCode(partition));
});
} else { // topic partition
if (allTopicsInThisBundle.contains(topicName.toString())) {
topicHashPositions.put(topic,
pulsar().getNamespaceService().getNamespaceBundleFactory()
.getLongHashCode(topic));
}
}
}
}
}
}
}
asyncResponse.resume(
new TopicHashPositions(namespaceName.toString(), bundleRange, topicHashPositions));
return new TopicHashPositions(namespaceName.toString(), bundleRange,
topicHashPositions);
});
});
}

Expand Down
Loading