Skip to content

Commit

Permalink
Avoid call sync method in async rest API for delete subscription (#13666
Browse files Browse the repository at this point in the history
)
  • Loading branch information
codelipenghui authored and merlimat committed Jan 13, 2022
1 parent 9015e17 commit dfc298f
Showing 1 changed file with 31 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1485,35 +1485,38 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su

private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);

Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.delete().get();
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
if (e.getCause() instanceof SubscriptionBusyException) {
log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else if (e instanceof WebApplicationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
clientAppId(), topicName, e);
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
return sub.delete();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
Throwable cause = e.getCause();
if (cause instanceof SubscriptionBusyException) {
log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
topicName, cause);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else if (cause instanceof WebApplicationException) {
if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
clientAppId(), topicName, cause);
}
asyncResponse.resume(cause);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
asyncResponse.resume(new RestException(cause));
}
return null;
});
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
Expand Down

0 comments on commit dfc298f

Please sign in to comment.