Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-149]make getPartitionedTopicList method async #16217

Merged
merged 1 commit into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,19 @@ protected CompletableFuture<List<String>> internalGetListAsync() {
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
}

protected List<String> internalGetPartitionedTopicList() {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
if (!namespaceResources().namespaceExists(namespaceName)) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
} catch (RestException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenCompose(namespaceExists -> {
// Validate that namespace exists, throws 404 if it doesn't exist
if (!namespaceExists) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else {
return getPartitionedTopicListAsync(TopicDomain.getEnum(domain()));
}
});
}

protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -89,10 +88,21 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
public List<String> getPartitionedTopicList(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
public void getPartitionedTopicList(
@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetPartitionedTopicList();
internalGetPartitionedTopicListAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,25 @@ public void getList(
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
public List<String> getPartitionedTopicList(
public void getPartitionedTopicList(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
internalGetPartitionedTopicListAsync()
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,14 +794,19 @@ public void persistentTopics() throws Exception {
persistentTopics.getList(response, property, cluster, namespace, null);
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
verify(response, timeout(5000).times(1))
.resume(Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));

TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,30 @@ public void testGetPartitionedTopicsList() throws KeeperException, InterruptedEx
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);

response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.persistent.value());

response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 2);

List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> nonPersistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.non_persistent.value());
}

@Test
Expand Down