Skip to content

Commit

Permalink
Reduce GetReplicatedSubscriptionStatus local REST call
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Aug 4, 2022
1 parent cd95594 commit 38f60e7
Showing 1 changed file with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Map<String, Boolean>>> futures =
Lists.newArrayListWithCapacity(partitionMetadata.partitions);
final Map<String, Boolean> status = Maps.newHashMap();
List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
Map<String, Boolean> status = Maps.newHashMap();

for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName partition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
partition.toString(), subName).whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on {} {}",
clientAppId(), partition, subName, throwable);
asyncResponse.resume(new RestException(throwable));
futures.add(
pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
.thenCompose(owned -> {
if (owned) {
// if this broker owned the partition do action like
// `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
return getTopicReferenceAsync(partition)
.thenApply(topic -> {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName));
}
Map<String, Boolean> res = new HashMap<>();
if (topic instanceof PersistentTopic
&& sub instanceof PersistentSubscription) {
res.put(topicName.toString(), sub.isReplicated());
} else {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Cannot get replicated subscriptions on non-persistent"
+ " topics");
}
return res;
});
} else {
try {
return pulsar().getAdminClient().topics()
.getReplicatedSubscriptionStatusAsync(partition.toString(), subName)
.whenComplete((__, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to get replicated subscriptions on"
+ " {} {}",
clientAppId(), partition, subName, throwable);
}
});
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
return FutureUtil.failedFuture(e);
}
}
status.putAll(response);
}));
} catch (Exception e) {
log.warn("[{}] Failed to get replicated subscription status on {} {}",
clientAppId(), partition, subName, e);
throw new RestException(e);
}
}).thenAccept(status::putAll)
);
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
Expand Down

0 comments on commit 38f60e7

Please sign in to comment.