Skip to content

Commit

Permalink
[improve][broker] Improve backlogQuota endpoint to pure async. (#17383
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mattisonchao authored Sep 6, 2022
1 parent cfe95dd commit ff4dc08
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
Expand Down Expand Up @@ -110,9 +111,8 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public abstract class NamespacesBase extends AdminResource {

protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
Expand Down Expand Up @@ -1430,40 +1430,29 @@ protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
try {
Policies policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist"));
RetentionPolicies r = policies.retention_policies;
if (r != null) {
Policies p = new Policies();
p.backlog_quota_map.put(quotaType, backlogQuota);
if (!checkQuotas(p, r)) {
log.warn(
"[{}] Failed to update backlog configuration"
+ " for namespace {}: conflicts with retention quota",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType backlogQuotaType,
BacklogQuota quota) {
return namespaceResources().setPoliciesAsync(namespaceName, policies -> {
RetentionPolicies retentionPolicies = policies.retention_policies;
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
if (retentionPolicies == null) {
policies.backlog_quota_map.put(quotaType, quota);
return policies;
}
policies.backlog_quota_map.put(quotaType, backlogQuota);
namespaceResources().setPolicies(namespaceName, p -> policies);
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName,
jsonMapper().writeValueAsString(backlogQuota));

} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
// If we have retention policies, we have to check the conflict.
BacklogQuota needCheckQuota = null;
if (quotaType == BacklogQuotaType.destination_storage) {
needCheckQuota = quota;
}
boolean passCheck = checkBacklogQuota(needCheckQuota, retentionPolicies);
if (!passCheck) {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
return policies;
});
}

protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
Expand Down Expand Up @@ -2767,5 +2756,69 @@ protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse)
return null;
});
}
private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);

/**
* Base method for getBackLogQuotaMap v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalGetBacklogQuotaMap(AsyncResponse asyncResponse) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.READ)
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenAccept(policiesOpt -> {
Map<BacklogQuotaType, BacklogQuota> backlogQuotaMap = policiesOpt.orElseThrow(() ->
new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"))
.backlog_quota_map;
asyncResponse.resume(backlogQuotaMap);
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
return null;
});
}

/**
* Base method for setBacklogQuota v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> setBacklogQuotaAsync(backlogQuotaType, backlogQuota))
.thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(),
namespaceName, backlogQuota);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update backlog quota map for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}

/**
* Base method for removeBacklogQuota v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQuotaType backlogQuotaType) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
policies.backlog_quota_map.remove(quotaType);
return policies;
})).thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully removed backlog namespace={}, quota={}", clientAppId(), namespaceName,
backlogQuotaType);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update backlog quota map for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1136,15 +1136,7 @@ public void getBacklogQuotaMap(
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.backlog_quota_map))
.exceptionally(ex -> {
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetBacklogQuotaMap(asyncResponse);
}

@POST
Expand All @@ -1156,12 +1148,13 @@ public void getBacklogQuotaMap(
@ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota."
+ " Increase retention quota and retry request")})
public void setBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
BacklogQuota backlogQuota) {
validateNamespaceName(property, cluster, namespace);
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}

@DELETE
Expand All @@ -1170,11 +1163,13 @@ public void setBacklogQuota(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeBacklogQuota(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void removeBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) {
validateNamespaceName(property, cluster, namespace);
internalRemoveBacklogQuota(backlogQuotaType);
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,15 +1147,7 @@ public void getBacklogQuotaMap(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.backlog_quota_map))
.exceptionally(ex -> {
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetBacklogQuotaMap(asyncResponse);
}

@POST
Expand All @@ -1167,11 +1159,13 @@ public void getBacklogQuotaMap(
@ApiResponse(code = 412,
message = "Specified backlog quota exceeds retention quota."
+ " Increase retention quota and retry request")})
public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void setBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "Backlog quota for all topics of the specified namespace") BacklogQuota backlogQuota) {
validateNamespaceName(tenant, namespace);
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}

@DELETE
Expand All @@ -1180,10 +1174,12 @@ public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("name
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void removeBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) {
validateNamespaceName(tenant, namespace);
internalRemoveBacklogQuota(backlogQuotaType);
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1278,6 +1279,10 @@ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncRespon
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, realCause));
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
Expand Down

0 comments on commit ff4dc08

Please sign in to comment.