Skip to content

Commit

Permalink
[improve] [client] improve the class GetTopicsResult (apache#22766)
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored May 30, 2024
1 parent 34898e3 commit 87a3339
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.util.Collection;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
@Slf4j
public class LookupServiceTest extends ProducerConsumerBase {

private PulsarClientImpl clientWithHttpLookup;
private PulsarClientImpl clientWitBinaryLookup;

private boolean enableBrokerSideSubscriptionPatternEvaluation = true;
private int subscriptionPatternMaxLength = 10_000;

@Override
@BeforeClass
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
clientWithHttpLookup =
(PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
clientWitBinaryLookup =
(PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
if (clientWithHttpLookup != null) {
clientWithHttpLookup.close();
}
if (clientWitBinaryLookup != null) {
clientWitBinaryLookup.close();
}
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setEnableBrokerSideSubscriptionPatternEvaluation(enableBrokerSideSubscriptionPatternEvaluation);
conf.setSubscriptionPatternMaxLength(subscriptionPatternMaxLength);
}

private LookupService getLookupService(boolean isUsingHttpLookup) {
if (isUsingHttpLookup) {
return clientWithHttpLookup.getLookup();
} else {
return clientWitBinaryLookup.getLookup();
}
}

@DataProvider(name = "isUsingHttpLookup")
public Object[][] isUsingHttpLookup() {
return new Object[][]{
{true},
{false}
};
}

@Test(dataProvider = "isUsingHttpLookup")
public void testGetTopicsOfGetTopicsResult(boolean isUsingHttpLookup) throws Exception {
LookupService lookupService = getLookupService(isUsingHttpLookup);
String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createPartitionedTopic(partitionedTopic, 3);
String nonPersistentTopic = BrokerTestUtil.newUniqueName("non-persistent://public/default/tp");

// Verify the new method "GetTopicsResult.getTopics" works as expected.
Collection<String> topics = lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"),
Mode.PERSISTENT, "public/default/.*", null).join().getTopics();
assertTrue(topics.contains(nonPartitionedTopic));
assertTrue(topics.contains(partitionedTopic));
assertFalse(topics.contains(nonPersistentTopic));
assertFalse(topics.contains(TopicName.get(partitionedTopic).getPartition(0).toString()));
// Verify the new method "GetTopicsResult.nonPartitionedOrPartitionTopics" works as expected.
Collection<String> nonPartitionedOrPartitionTopics =
lookupService.getTopicsUnderNamespace(NamespaceName.get("public/default"),
Mode.PERSISTENT, "public/default/.*", null).join()
.getNonPartitionedOrPartitionTopics();
assertTrue(nonPartitionedOrPartitionTopics.contains(nonPartitionedTopic));
assertFalse(nonPartitionedOrPartitionTopics.contains(partitionedTopic));
assertFalse(nonPartitionedOrPartitionTopics.contains(nonPersistentTopic));
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(0)
.toString()));
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(1)
.toString()));
assertTrue(nonPartitionedOrPartitionTopics.contains(TopicName.get(partitionedTopic).getPartition(2)
.toString()));

// Cleanup.
admin.topics().deletePartitionedTopic(partitionedTopic, false);
admin.topics().delete(nonPartitionedTopic, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -378,17 +376,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
}
// do not keep partition part of topic name
List<String> result = new ArrayList<>();
r.getTopics().forEach(topic -> {
String filtered = TopicName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});

getTopicsResultFuture.complete(new GetTopicsResult(result, r.getTopicsHash(),
r.isFiltered(), r.isChanged()));
getTopicsResultFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -181,15 +178,7 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
httpClient
.get(String.format(format, namespace, mode.toString()), String[].class)
.thenAccept(topics -> {
List<String> result = new ArrayList<>();
// do not keep partition part of topic name
Arrays.asList(topics).forEach(topic -> {
String filtered = TopicName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});
future.complete(new GetTopicsResult(result, topicsHash, false, true));
future.complete(new GetTopicsResult(topics));
}).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,105 @@
*/
package org.apache.pulsar.common.lookup;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
/***
* A value object.
* - The response of HTTP API "admin/v2/namespaces/{domain}/topics" is a topic(non-partitioned topic or partitions)
* array. It will be wrapped to "topics: {topic array}, topicsHash: null, filtered: false, changed: true".
* - The response of binary API {@link CommandGetTopicsOfNamespace} is a {@link CommandGetTopicsOfNamespaceResponse},
* it will be transferred to a {@link GetTopicsResult}.
* See more details https://github.com/apache/pulsar/pull/14804.
*/
@ToString
public class GetTopicsResult {
private List<String> topics;
private String topicsHash;
private boolean filtered;
private boolean changed;

/**
* Non-partitioned topics, and topic partitions of partitioned topics.
*/
@Getter
private final List<String> nonPartitionedOrPartitionTopics;

/**
* The topics have been filtered by Broker using a regexp. Otherwise, the client should do a client-side filter.
* There are three cases that brokers will not filter the topics:
* 1. the lookup service is typed HTTP lookup service, the HTTP API has not implemented this feature yet.
* 2. the broker does not support this feature(in other words, its version is lower than "2.11.0").
* 3. the input param "topicPattern" is too long than the broker config "subscriptionPatternMaxLength".
*/
@Getter
private final boolean filtered;

/**
* The topics hash that was calculated by {@link TopicList#calculateHash(List)}. The param topics that will be used
* to calculate the hash code is only contains the topics that has been filtered.
* Note: It is always "null" if broker did not filter the topics when calling the API
* "LookupService.getTopicsUnderNamespace"(in other words, {@link #filtered} is false).
*/
@Getter
private final String topicsHash;

/**
* The topics hash has changed after compare with the input param "topicsHash" when calling
* "LookupService.getTopicsUnderNamespace".
* Note: It is always set "true" if the input param "topicsHash" that used to call
* "LookupService.getTopicsUnderNamespace" is null or the "LookupService" is "HttpLookupService".
*/
@Getter
private final boolean changed;

/**
* Partitioned topics and non-partitioned topics.
* In other words, there is no topic partitions of partitioned topics in this list.
* Note: it is not a field of the response of "LookupService.getTopicsUnderNamespace", it is generated in
* client-side memory.
*/
private volatile List<String> topics;

/**
* This constructor is used for binary API.
*/
public GetTopicsResult(List<String> nonPartitionedOrPartitionTopics, String topicsHash, boolean filtered,
boolean changed) {
this.nonPartitionedOrPartitionTopics = nonPartitionedOrPartitionTopics;
this.topicsHash = topicsHash;
this.filtered = filtered;
this.changed = changed;
}

/**
* This constructor is used for HTTP API.
*/
public GetTopicsResult(String[] nonPartitionedOrPartitionTopics) {
this(Arrays.asList(nonPartitionedOrPartitionTopics), null, false, true);
}

public List<String> getTopics() {
if (topics != null) {
return topics;
}
synchronized (this) {
if (topics != null) {
return topics;
}
// Group partitioned topics.
List<String> grouped = new ArrayList<>();
for (String topic : nonPartitionedOrPartitionTopics) {
String partitionedTopic = TopicName.get(topic).getPartitionedTopicName();
if (!grouped.contains(partitionedTopic)) {
grouped.add(partitionedTopic);
}
}
topics = grouped;
return topics;
}
}
}

0 comments on commit 87a3339

Please sign in to comment.