Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid call sync method in async rest API for delete subscription #13666

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1505,35 +1505,38 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su

private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
String subName, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);

Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
return;
}
sub.delete().get();
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
} catch (Exception e) {
if (e.getCause() instanceof SubscriptionBusyException) {
log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else if (e instanceof WebApplicationException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
clientAppId(), topicName, e);
validateTopicOwnershipAsync(topicName, authoritative)
.thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE))
.thenCompose(__ -> {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
}
asyncResponse.resume(e);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e);
asyncResponse.resume(new RestException(e));
}
}
return sub.delete();
}).thenRun(() -> {
log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
Throwable cause = e.getCause();
if (cause instanceof SubscriptionBusyException) {
log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
topicName, cause);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers"));
} else if (cause instanceof WebApplicationException) {
if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.",
clientAppId(), topicName, cause);
}
asyncResponse.resume(cause);
} else {
log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
asyncResponse.resume(new RestException(cause));
}
return null;
});
}

protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse,
Expand Down