Skip to content

Commit

Permalink
Avoid call sync method in async rest API for force delete subscription (
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Jan 12, 2022
1 parent e5d828a commit 416e7e5
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1605,33 +1605,34 @@ protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,

private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);

Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.deleteForcefully().get();
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
if (e instanceof WebApplicationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete subscription forcefully from topic {},"
+ " redirecting to other brokers.",
clientAppId(), topicName, e);
}
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
return sub.deleteForcefully();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException) {
if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
clientAppId(), topicName, cause);
}
asyncResponse.resume(cause);
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}",
clientAppId(), topicName, subName, cause);
asyncResponse.resume(new RestException(cause));
}
return null;
});
}

protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) {
Expand Down

0 comments on commit 416e7e5

Please sign in to comment.