diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 424c081d9877c..c5e280c5577d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1711,107 +1711,6 @@ private void internalGetSubscriptionPropertiesForNonPartitionedTopic(AsyncRespon }); } - protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, - String subName, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenAccept(__ -> { - if (topicName.isPartitioned()) { - internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative); - } else { - getPartitionedTopicMetadataAsync(topicName, - authoritative, false).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - final List> futures = new ArrayList<>(); - - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - futures.add(pulsar().getAdminClient().topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true)); - } catch (Exception e) { - log.error("[{}] Failed to delete subscription forcefully {} {}", - clientAppId(), topicNamePartition, subName, - e); - asyncResponse.resume(new RestException(e)); - return; - } - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName))); - return null; - } else { - log.error("[{}] Failed to delete subscription forcefully {} {}", - clientAppId(), topicName, subName, t); - asyncResponse.resume(new RestException(t)); - return null; - } - } - - asyncResponse.resume(Response.noContent().build()); - return null; - }); - } else { - internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, - authoritative); - } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to delete subscription forcefully {} from topic {}", - clientAppId(), subName, topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to delete subscription {} from topic {}", - clientAppId(), subName, topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - - private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, - String subName, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE, subName)) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) - .thenCompose(topic -> { - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - throw new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName)); - } - return sub.deleteForcefully(); - }).thenRun(() -> { - log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error("[{}] Failed to delete subscription forcefully {} {}", - clientAppId(), topicName, subName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); - } - protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName); future.thenCompose(__ -> {