From b2ab007ab6d88acfcf9afbc573d2d6fb4352632d Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Sat, 29 Jan 2022 20:51:41 +0800 Subject: [PATCH] make getInternalStats method async --- .../admin/impl/PersistentTopicsBase.java | 38 +++++++++++-------- .../broker/admin/v1/PersistentTopics.java | 15 ++++++-- .../broker/admin/v2/PersistentTopics.java | 14 +++++-- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3c3f622266c56..60234c7601a0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1180,24 +1180,30 @@ protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseB } } - protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) { + protected void internalGetInternalStats(AsyncResponse asyncResponse, boolean authoritative, boolean metadata) { + CompletableFuture future; if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.GET_STATS); - - Topic topic = getTopicReference(topicName); - try { - if (metadata) { - validateTopicOperation(topicName, TopicOperation.GET_METADATA); - } - return topic.getInternalStats(metadata).get(); - } catch (Exception e) { - log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e); - throw new RestException(Status.INTERNAL_SERVER_ERROR, - (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage()); + future = validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + future = CompletableFuture.completedFuture(null); } + future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)) + .thenCompose(__ -> { + if (metadata) { + return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA); + } + return null; + }) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> topic.getInternalStats(metadata)) + .thenAccept(stats -> asyncResponse.resume(stats)) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, cause); + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); } protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index fcb71566f6b3a..2fc5e039c01af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -52,7 +52,6 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -356,13 +355,21 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic does not exist") }) - public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property, + public void getInternalStats( + @Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("metadata") @DefaultValue("false") boolean metadata) { - validateTopicName(property, cluster, namespace, encodedTopic); - return internalGetInternalStats(authoritative, metadata); + try { + validateTopicName(property, cluster, namespace, encodedTopic); + internalGetInternalStats(asyncResponse, authoritative, metadata); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 30836b9b3d5e3..2c02fdff0e668 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -60,7 +60,6 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1035,7 +1034,8 @@ public TopicStats getStats( @ApiResponse(code = 412, message = "Topic name is not valid"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) - public PersistentTopicInternalStats getInternalStats( + public void getInternalStats( + @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @ApiParam(value = "Specify the namespace", required = true) @@ -1045,8 +1045,14 @@ public PersistentTopicInternalStats getInternalStats( @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("metadata") @DefaultValue("false") boolean metadata) { - validateTopicName(tenant, namespace, encodedTopic); - return internalGetInternalStats(authoritative, metadata); + try { + validateTopicName(tenant, namespace, encodedTopic); + internalGetInternalStats(asyncResponse, authoritative, metadata); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } } @GET