Skip to content

Commit

Permalink
[improve][broker][PIP-149]make getPartitionedTopicList method async (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt authored Jun 27, 2022
1 parent 0c2fe5a commit eeb22ba
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,19 @@ protected CompletableFuture<List<String>> internalGetListAsync() {
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
}

protected List<String> internalGetPartitionedTopicList() {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
try {
if (!namespaceResources().namespaceExists(namespaceName)) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
} catch (RestException e) {
throw e;
} catch (Exception e) {
log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenCompose(namespaceExists -> {
// Validate that namespace exists, throws 404 if it doesn't exist
if (!namespaceExists) {
log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} else {
return getPartitionedTopicListAsync(TopicDomain.getEnum(domain()));
}
});
}

protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -89,10 +88,21 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
public List<String> getPartitionedTopicList(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
public void getPartitionedTopicList(
@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetPartitionedTopicList();
internalGetPartitionedTopicListAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,25 @@ public void getList(
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
public List<String> getPartitionedTopicList(
public void getPartitionedTopicList(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
internalGetPartitionedTopicListAsync()
.thenAccept(partitionedTopicList -> asyncResponse.resume(
filterSystemTopic(partitionedTopicList, includeSystemTopic)))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,14 +794,19 @@ public void persistentTopics() throws Exception {
persistentTopics.getList(response, property, cluster, namespace, null);
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
verify(response, timeout(5000).times(1))
.resume(Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));

TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,30 @@ public void testGetPartitionedTopicsList() throws KeeperException, InterruptedEx
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());

List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);

response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.persistent.value());

response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, true);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 2);

List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
nonPersistentTopic.getPartitionedTopicList(response, testTenant, testNamespace, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
List<String> nonPersistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
TopicDomain.non_persistent.value());
}

@Test
Expand Down

0 comments on commit eeb22ba

Please sign in to comment.