diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java new file mode 100644 index 0000000000000..59cb7ae03d0e3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/LookupServiceTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.Collection; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +@Slf4j +public class LookupServiceTest extends ProducerConsumerBase { + + private PulsarClientImpl clientWithHttpLookup; + private PulsarClientImpl clientWitBinaryLookup; + + private boolean enableBrokerSideSubscriptionPatternEvaluation = true; + private int subscriptionPatternMaxLength = 10_000; + + @Override + @BeforeClass + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + clientWithHttpLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + clientWitBinaryLookup = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + if (clientWithHttpLookup != null) { + clientWithHttpLookup.close(); + } + if (clientWitBinaryLookup != null) { + clientWitBinaryLookup.close(); + } + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setEnableBrokerSideSubscriptionPatternEvaluation(enableBrokerSideSubscriptionPatternEvaluation); + conf.setSubscriptionPatternMaxLength(subscriptionPatternMaxLength); + } + + private LookupService getLookupService(boolean isUsingHttpLookup) { + if (isUsingHttpLookup) { + return clientWithHttpLookup.getLookup(); + } else { + return clientWitBinaryLookup.getLookup(); + } + } + + @DataProvider(name = "isUsingHttpLookup") + public Object[][] isUsingHttpLookup() { + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(dataProvider = "isUsingHttpLookup") + public void testGetTopicsOfGetTopicsResult(boolean isUsingHttpLookup) throws Exception { + LookupService lookupService = getLookupService(isUsingHttpLookup); + String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + String nonPersistentTopic = BrokerTestUtil.newUniqueName("non-persistent://public/default/tp"); + + // Verify the new method "GetTopicsResult.getTopics" works as expected. + Collection topics = lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"), + Mode.PERSISTENT, "public/default/.*", null).join().getTopics(); + assertTrue(topics.contains(nonPartitionedTopic)); + assertTrue(topics.contains(partitionedTopic)); + assertFalse(topics.contains(nonPersistentTopic)); + assertFalse(topics.contains(TopicName.get(partitionedTopic).getPartition(0).toString())); + // Verify the new method "GetTopicsResult.nonPartitionedOrPartitionTopics" works as expected. + Collection nonPartitionedOrPartitionTopics = + lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"), + Mode.PERSISTENT, "public/default/.*", null).join() + .getNonPartitionedOrPartitionTopics(); + assertTrue(nonPartitionedOrPartitionTopics.contains(nonPartitionedTopic)); + assertFalse(nonPartitionedOrPartitionTopics.contains(partitionedTopic)); + assertFalse(nonPartitionedOrPartitionTopics.contains(nonPersistentTopic)); + assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(0) + .toString())); + assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(1) + .toString())); + assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(2) + .toString())); + + // Cleanup. + admin.topics().deletePartitionedTopic(partitionedTopic, false); + admin.topics().delete(nonPartitionedTopic, false); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 8eedb3250cdf5..9c9371b09cbdb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -23,8 +23,6 @@ import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -375,17 +373,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, log.debug("[namespace: {}] Success get topics list in request: {}", namespace, requestId); } - // do not keep partition part of topic name - List result = new ArrayList<>(); - r.getTopics().forEach(topic -> { - String filtered = TopicName.get(topic).getPartitionedTopicName(); - if (!result.contains(filtered)) { - result.add(filtered); - } - }); - - getTopicsResultFuture.complete(new GetTopicsResult(result, r.getTopicsHash(), - r.isFiltered(), r.isChanged())); + getTopicsResultFuture.complete(r); } client.getCnxPool().releaseConnection(clientCnx); }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 8158b6d979efd..ea13930411dc0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -24,10 +24,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; @@ -179,15 +176,7 @@ public CompletableFuture getTopicsUnderNamespace(NamespaceName httpClient .get(String.format(format, namespace, mode.toString()), String[].class) .thenAccept(topics -> { - List result = new ArrayList<>(); - // do not keep partition part of topic name - Arrays.asList(topics).forEach(topic -> { - String filtered = TopicName.get(topic).getPartitionedTopicName(); - if (!result.contains(filtered)) { - result.add(filtered); - } - }); - future.complete(new GetTopicsResult(result, topicsHash, false, true)); + future.complete(new GetTopicsResult(topics)); }).exceptionally(ex -> { Throwable cause = FutureUtil.unwrapCompletionException(ex); log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java index 55fe6253ff971..80f16e6c36717 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java @@ -18,21 +18,105 @@ */ package org.apache.pulsar.common.lookup; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; import lombok.ToString; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.topics.TopicList; -@Getter -@Setter -@AllArgsConstructor -@NoArgsConstructor +/*** + * A value object. + * - The response of HTTP API "admin/v2/namespaces/{domain}/topics" is a topic(non-partitioned topic or partitions) + * array. It will be wrapped to "topics: {topic array}, topicsHash: null, filtered: false, changed: true". + * - The response of binary API {@link CommandGetTopicsOfNamespace} is a {@link CommandGetTopicsOfNamespaceResponse}, + * it will be transferred to a {@link GetTopicsResult}. + * See more details https://github.com/apache/pulsar/pull/14804. + */ @ToString public class GetTopicsResult { - private List topics; - private String topicsHash; - private boolean filtered; - private boolean changed; + + /** + * Non-partitioned topics, and topic partitions of partitioned topics. + */ + @Getter + private final List nonPartitionedOrPartitionTopics; + + /** + * The topics have been filtered by Broker using a regexp. Otherwise, the client should do a client-side filter. + * There are three cases that brokers will not filter the topics: + * 1. the lookup service is typed HTTP lookup service, the HTTP API has not implemented this feature yet. + * 2. the broker does not support this feature(in other words, its version is lower than "2.11.0"). + * 3. the input param "topicPattern" is too long than the broker config "subscriptionPatternMaxLength". + */ + @Getter + private final boolean filtered; + + /** + * The topics hash that was calculated by {@link TopicList#calculateHash(List)}. The param topics that will be used + * to calculate the hash code is only contains the topics that has been filtered. + * Note: It is always "null" if broker did not filter the topics when calling the API + * "LookupService.getTopicsUnderNamespace"(in other words, {@link #filtered} is false). + */ + @Getter + private final String topicsHash; + + /** + * The topics hash has changed after compare with the input param "topicsHash" when calling + * "LookupService.getTopicsUnderNamespace". + * Note: It is always set "true" if the input param "topicsHash" that used to call + * "LookupService.getTopicsUnderNamespace" is null or the "LookupService" is "HttpLookupService". + */ + @Getter + private final boolean changed; + + /** + * Partitioned topics and non-partitioned topics. + * In other words, there is no topic partitions of partitioned topics in this list. + * Note: it is not a field of the response of "LookupService.getTopicsUnderNamespace", it is generated in + * client-side memory. + */ + private volatile List topics; + + /** + * This constructor is used for binary API. + */ + public GetTopicsResult(List nonPartitionedOrPartitionTopics, String topicsHash, boolean filtered, + boolean changed) { + this.nonPartitionedOrPartitionTopics = nonPartitionedOrPartitionTopics; + this.topicsHash = topicsHash; + this.filtered = filtered; + this.changed = changed; + } + + /** + * This constructor is used for HTTP API. + */ + public GetTopicsResult(String[] nonPartitionedOrPartitionTopics) { + this(Arrays.asList(nonPartitionedOrPartitionTopics), null, false, true); + } + + public List getTopics() { + if (topics != null) { + return topics; + } + synchronized (this) { + if (topics != null) { + return topics; + } + // Group partitioned topics. + List grouped = new ArrayList<>(); + for (String topic : nonPartitionedOrPartitionTopics) { + String partitionedTopic = TopicName.get(topic).getPartitionedTopicName(); + if (!grouped.contains(partitionedTopic)) { + grouped.add(partitionedTopic); + } + } + topics = grouped; + return topics; + } + } }