Skip to content

Commit

Permalink
make getInternalStats method async
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt committed Apr 12, 2022
1 parent 1cb4798 commit b2ab007
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1180,24 +1180,30 @@ protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseB
}
}

protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
protected void internalGetInternalStats(AsyncResponse asyncResponse, boolean authoritative, boolean metadata) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_STATS);

Topic topic = getTopicReference(topicName);
try {
if (metadata) {
validateTopicOperation(topicName, TopicOperation.GET_METADATA);
}
return topic.getInternalStats(metadata).get();
} catch (Exception e) {
log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR,
(e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
.thenCompose(__ -> {
if (metadata) {
return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA);
}
return null;
})
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.getInternalStats(metadata))
.thenAccept(stats -> asyncResponse.resume(stats))
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -356,13 +355,21 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(property, cluster, namespace, encodedTopic);
return internalGetInternalStats(authoritative, metadata);
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetInternalStats(asyncResponse, authoritative, metadata);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -1035,7 +1034,8 @@ public TopicStats getStats(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public PersistentTopicInternalStats getInternalStats(
public void getInternalStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -1045,8 +1045,14 @@ public PersistentTopicInternalStats getInternalStats(
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetInternalStats(authoritative, metadata);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetInternalStats(asyncResponse, authoritative, metadata);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down

0 comments on commit b2ab007

Please sign in to comment.