Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Improve retention endpoint to pure async. #17496

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -1307,33 +1307,30 @@ protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
}
}

protected void internalSetRetention(RetentionPolicies retention) {
validateRetentionPolicies(retention);
validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

protected CompletableFuture<Void> internalSetRetentionAsync(RetentionPolicies retentionPolicies) {
try {
Policies policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace policies does not exist"));
if (!checkQuotas(policies, retention)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can delete this method.

log.warn("[{}] Failed to update retention configuration"
+ " for namespace {}: conflicts with backlog quota",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for namespace.");
}
policies.retention_policies = retention;
namespaceResources().setPolicies(namespaceName, p -> policies);
log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(retention));
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update retention configuration for namespace {}", clientAppId(), namespaceName,
e);
throw new RestException(e);
validateRetentionPolicies(retentionPolicies);
} catch (Throwable ex) {
return FutureUtil.failedFuture(ex);
}
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap = policies.backlog_quota_map;
if (backlogQuotaMap.isEmpty()) {
policies.retention_policies = retentionPolicies;
return policies;
}
// If we have backlog quota, we have to check the conflict
BacklogQuota backlogQuota = backlogQuotaMap.get(BacklogQuotaType.destination_storage);
boolean passCheck = checkBacklogQuota(backlogQuota, retentionPolicies);
if (!passCheck) {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for namespace.");
}
policies.retention_policies = retentionPolicies;
return policies;
}));
}

protected CompletableFuture<Void> internalDeletePersistenceAsync() {
Expand Down Expand Up @@ -1821,39 +1818,6 @@ protected BundlesData validateBundlesData(BundlesData initialBundles) {
.build();
}

private void validatePolicies(NamespaceName ns, Policies policies) {
if (ns.isV2() && policies.replication_clusters.isEmpty()) {
// Default to local cluster
policies.replication_clusters = Collections.singleton(config().getClusterName());
}

// Validate cluster names and permissions
policies.replication_clusters.forEach(cluster -> validateClusterForTenant(ns.getTenant(), cluster));

if (policies.message_ttl_in_seconds != null && policies.message_ttl_in_seconds < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}

if (policies.bundles != null && policies.bundles.getNumBundles() > 0) {
if (policies.bundles.getBoundaries() == null || policies.bundles.getBoundaries().size() == 0) {
policies.bundles = getBundles(policies.bundles.getNumBundles());
} else {
policies.bundles = validateBundlesData(policies.bundles);
}
} else {
int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles();
policies.bundles = getBundles(defaultNumberOfBundles);
}

if (policies.persistence != null) {
validatePersistencePolicies(policies.persistence);
}

if (policies.retention_policies != null) {
validateRetentionPolicies(policies.retention_policies);
}
}

private CompletableFuture<Void> validatePoliciesAsync(NamespaceName ns, Policies policies) {
if (ns.isV2() && policies.replication_clusters.isEmpty()) {
// Default to local cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,12 +1195,16 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to change this?

.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> {
Policies policies = policiesOpt.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND,
"Namespace policies does not exist"));
return policies.retention_policies;
}).thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to get retention config on a namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
Expand All @@ -1212,10 +1216,22 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void setRetention(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void setRetention(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, RetentionPolicies retention) {
validateNamespaceName(property, cluster, namespace);
internalSetRetention(retention);
internalSetRetentionAsync(retention)
.thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, retention);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update retention configuration for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,12 +1204,16 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RETENTION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.retention_policies))
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> {
Policies policies = policiesOpt.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND,
"Namespace policies does not exist"));
return policies.retention_policies;
}).thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get retention config on a namespace {}", clientAppId(), namespaceName,
ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to get retention config on a namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
Expand All @@ -1221,10 +1225,23 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void setRetention(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
public void setRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Retention policies for the specified namespace")
RetentionPolicies retention) {
validateNamespaceName(tenant, namespace);
internalSetRetention(retention);
internalSetRetentionAsync(retention)
.thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, retention);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update retention configuration for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}

@DELETE
Expand All @@ -1234,10 +1251,21 @@ public void setRetention(@PathParam("tenant") String tenant, @PathParam("namespa
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void removeRetention(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
public void removeRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetRetention(null);
internalSetRetentionAsync(null)
.thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully deleted retention configuration: namespace={}, map={}", clientAppId(),
namespaceName, null);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to delete retention configuration for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void setup() throws Exception {
doReturn(FutureUtil.failedFuture(new RestException(Status.UNAUTHORIZED, "unauthorized"))).when(namespaces)
.validateNamespacePolicyOperationAsync(NamespaceName.get("other-tenant/use/test-namespace-1"),
PolicyName.RETENTION, PolicyOperation.WRITE);

nsSvc = pulsar.getNamespaceService();
}

Expand Down Expand Up @@ -1046,7 +1046,8 @@ public void testRetention() throws Exception {
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
RetentionPolicies retention = new RetentionPolicies(10, 10);
namespaces.setRetention(this.testTenant, this.testLocalCluster, bundledNsLocal, retention);
asyncRequests(ctx ->
namespaces.setRetention(ctx, this.testTenant, this.testLocalCluster, bundledNsLocal, retention));
AsyncResponse response = mock(AsyncResponse.class);
namespaces.getRetention(response, this.testTenant, this.testLocalCluster, bundledNsLocal);
ArgumentCaptor<RetentionPolicies> captor = ArgumentCaptor.forClass(RetentionPolicies.class);
Expand All @@ -1063,7 +1064,8 @@ public void testRetentionUnauthorized() throws Exception {
try {
NamespaceName testNs = this.testLocalNamespaces.get(3);
RetentionPolicies retention = new RetentionPolicies(10, 10);
namespaces.setRetention(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), retention);
asyncRequests(ctx -> namespaces.setRetention(ctx, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), retention));
fail("Should fail");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.UNAUTHORIZED.getStatusCode());
Expand Down