Skip to content

Commit

Permalink
[improve][broker] Make Namespaces.deleteNamespaceBundle async (apache…
Browse files Browse the repository at this point in the history
…#16287)

Master Issue: apache#14365

### Motivation

Please see apache#14365

### Modifications

* Make Namespaces.deleteNamespaceBundle async
* Combine internalDeleteNamespaceBundle
* Make removeOwnedServiceUnit async
  • Loading branch information
RobertIndie authored and weimob-wuxuanqi committed Jul 14, 2022
1 parent 7ca2e31 commit b55a8fa
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -551,153 +553,95 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
});
}

protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
if (force) {
internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
} else {
internalDeleteNamespaceBundle(bundleRange, authoritative);
}
}

@SuppressWarnings("deprecation")
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
validatePoliciesReadOnlyAccess();

// ensure that non-global namespace is directed to the correct cluster
if (!namespaceName.isGlobal()) {
validateClusterOwnership(namespaceName.getCluster());
}

Policies policies = getNamespacePolicies(namespaceName);
// ensure the local cluster is the only cluster for the global namespace configuration
try {
if (namespaceName.isGlobal()) {
if (policies.replication_clusters.size() > 1) {
// There are still more than one clusters configured for the global namespace
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ namespaceName + ". There are still more than one replication clusters configured.");
}
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData =
clusterResources().getCluster(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
if (!config().isTlsEnabled() || !isRequestHttps()) {
replClusterUrl = new URL(replClusterData.getServiceUrl());
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
} else {
throw new RestException(Status.PRECONDITION_FAILED,
"The replication cluster does not provide TLS encrypted service");
}
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
.port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
clientAppId(), redirect, replCluster);
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
if (!namespaceName.isGlobal()) {
return validateClusterOwnershipAsync(namespaceName.getCluster());
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
throw new RestException(e);
}

try {
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
.get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
for (String topic : topics) {
NamespaceBundle topicBundle = pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
if (bundle.equals(topicBundle)) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
}
}

// remove from owned namespace map and ephemeral node from ZK
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
bundleRange, e);
throw new RestException(e);
}
}

@SuppressWarnings("deprecation")
protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
validatePoliciesReadOnlyAccess();
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> {
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (namespaceName.isGlobal()) {

// ensure that non-global namespace is directed to the correct cluster
if (!namespaceName.isGlobal()) {
validateClusterOwnership(namespaceName.getCluster());
}
if (policies.replication_clusters.size() > 1) {
// There are still more than one clusters configured for the global namespace
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ namespaceName
+ ". There are still more than one replication clusters configured.");
}
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
future = clusterResources().getClusterAsync(replCluster)
.thenCompose(clusterData -> {
if (clusterData.isEmpty()) {
throw new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist");
}
ClusterData replClusterData = clusterData.get();
URL replClusterUrl;
try {
if (!config().isTlsEnabled() || !isRequestHttps()) {
replClusterUrl = new URL(replClusterData.getServiceUrl());
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
} else {
throw new RestException(Status.PRECONDITION_FAILED,
"The replication cluster does not provide TLS encrypted "
+ "service");
}
} catch (MalformedURLException malformedURLException) {
throw new RestException(malformedURLException);
}

Policies policies = getNamespacePolicies(namespaceName);
// ensure the local cluster is the only cluster for the global namespace configuration
try {
if (namespaceName.isGlobal()) {
if (policies.replication_clusters.size() > 1) {
// There are still more than one clusters configured for the global namespace
throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ namespaceName + ". There are still more than one replication clusters configured.");
}
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData =
clusterResources().getCluster(replCluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
URL replClusterUrl;
if (!config().isTlsEnabled() || !isRequestHttps()) {
replClusterUrl = new URL(replClusterData.getServiceUrl());
} else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
} else {
throw new RestException(Status.PRECONDITION_FAILED,
"The replication cluster does not provide TLS encrypted service");
}
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
.port(replClusterUrl.getPort())
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
clientAppId(), redirect, replCluster);
URI redirect =
UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
.port(replClusterUrl.getPort())
.replaceQueryParam("authoritative", false).build();
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the rest call to {}: cluster={}",
clientAppId(), redirect, replCluster);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
});
}
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
throw new RestException(e);
}
return future.thenCompose(__ -> {
NamespaceBundle bundle =
validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
.thenCompose(topics -> {
CompletableFuture<Void> deleteTopicsFuture =
CompletableFuture.completedFuture(null);
if (!force) {
List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
for (String topic : topics) {
futures.add(pulsar().getNamespaceService()
.getBundleAsync(TopicName.get(topic))
.thenCompose(topicBundle -> {
if (bundle.equals(topicBundle)) {
throw new RestException(Status.CONFLICT,
"Cannot delete non empty bundle");
}
return CompletableFuture.completedFuture(null);
}));

try {
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
// directly remove from owned namespace map and ephemeral node from ZK
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
} catch (WebApplicationException wae) {
throw wae;
} catch (Exception e) {
log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
bundleRange, e);
throw new RestException(e);
}
}
deleteTopicsFuture = FutureUtil.waitForAll(futures);
}
return deleteTopicsFuture.thenCompose(
___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
});
});
});
}

protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
public void deleteNamespaceBundle(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespaceBundle(bundleRange, authoritative, force);
@ApiResponse(code = 409, message = "Namespace bundle is not empty")})
public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
.thenRun(() -> response.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(response, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,22 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@ApiResponse(code = 409, message = "Namespace bundle is not empty")})
public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalDeleteNamespaceBundle(bundleRange, authoritative, force);
internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
.thenRun(() -> response.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(response, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,10 +1047,9 @@ public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
.thenCompose(ownershipCache::checkOwnershipAsync);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
return ownershipCache.removeOwnership(nsBundle)
.thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
}

protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
Expand Down
Loading

0 comments on commit b55a8fa

Please sign in to comment.