Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [client] improve the class GetTopicsResult #22766

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
@Slf4j
public class LookupServiceTest extends ProducerConsumerBase {

private PulsarClientImpl clientWithHttpLookup;
private PulsarClientImpl clientWitBinaryLookup;

private boolean enableBrokerSideSubscriptionPatternEvaluation = true;
private int subscriptionPatternMaxLength = 10_000;

@Override
@BeforeClass
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
clientWithHttpLookup =
(PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
clientWitBinaryLookup =
(PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
if (clientWithHttpLookup != null) {
clientWithHttpLookup.close();
}
if (clientWitBinaryLookup != null) {
clientWitBinaryLookup.close();
}
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setEnableBrokerSideSubscriptionPatternEvaluation(enableBrokerSideSubscriptionPatternEvaluation);
conf.setSubscriptionPatternMaxLength(subscriptionPatternMaxLength);
}

private LookupService getLookupService(boolean isUsingHttpLookup) {
if (isUsingHttpLookup) {
return clientWithHttpLookup.getLookup();
} else {
return clientWitBinaryLookup.getLookup();
}
}

@DataProvider(name = "isUsingHttpLookup")
public Object[][] isUsingHttpLookup() {
return new Object[][]{
{true},
{false}
};
}

@Test(dataProvider = "isUsingHttpLookup")
public void testGetExistsPartitions(boolean isUsingHttpLookup) throws Exception {
LookupService lookupService = getLookupService(isUsingHttpLookup);
String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createPartitionedTopic(partitionedTopic, 3);
String nonPersistentTopic = BrokerTestUtil.newUniqueName("non-persistent://public/default/tp");

assertEquals(lookupService.getExistsPartitions(nonPartitionedTopic).join(), Collections.emptyList());
assertEquals(lookupService.getExistsPartitions(partitionedTopic).join(), Arrays.asList(0, 1, 2));
try {
lookupService.getExistsPartitions(nonPersistentTopic).join();
fail("Expected an error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("not support"));
}

// Cleanup.
admin.topics().deletePartitionedTopic(partitionedTopic, false);
admin.topics().delete(nonPartitionedTopic, false);
}

@Test(dataProvider = "isUsingHttpLookup")
public void testGetExistsPartitionsIfDisabledBrokerFilter(boolean isUsingHttpLookup) throws Exception {
cleanup();
enableBrokerSideSubscriptionPatternEvaluation = false;
setup();

LookupService lookupService = getLookupService(isUsingHttpLookup);
String nonPartitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
String partitionedTopic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createPartitionedTopic(partitionedTopic, 3);
String nonPersistentTopic = BrokerTestUtil.newUniqueName("non-persistent://public/default/tp");

assertEquals(lookupService.getExistsPartitions(nonPartitionedTopic).join(), Collections.emptyList());
assertEquals(lookupService.getExistsPartitions(partitionedTopic).join(), Arrays.asList(0, 1, 2));
try {
lookupService.getExistsPartitions(nonPersistentTopic).join();
fail("Expected an error");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("not support"));
}

// Cleanup.
admin.topics().deletePartitionedTopic(partitionedTopic, false);
admin.topics().delete(nonPartitionedTopic, false);
// Reset broker config.
cleanup();
enableBrokerSideSubscriptionPatternEvaluation = true;
setup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -375,17 +373,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
}
// do not keep partition part of topic name
List<String> result = new ArrayList<>();
r.getTopics().forEach(topic -> {
String filtered = TopicName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});

getTopicsResultFuture.complete(new GetTopicsResult(result, r.getTopicsHash(),
r.isFiltered(), r.isChanged()));
getTopicsResultFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -179,15 +176,7 @@ public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName
httpClient
.get(String.format(format, namespace, mode.toString()), String[].class)
.thenAccept(topics -> {
List<String> result = new ArrayList<>();
// do not keep partition part of topic name
Arrays.asList(topics).forEach(topic -> {
String filtered = TopicName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});
future.complete(new GetTopicsResult(result, topicsHash, false, true));
future.complete(new GetTopicsResult(topics));
}).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Provides lookup service to find broker which serves given topic. It helps to
Expand Down Expand Up @@ -105,4 +112,50 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
String topicPattern, String topicsHash);

/**
* Get the exists partitions of a partitioned topic, the result does not contain the partitions which has not been
* created yet(in other words, the partitions that do not exist in the response of "pulsar-admin topics list").
* @return sorted partitions list if it is a partitioned topic; @return an empty list if it is a non-partitioned
* topic.
*/
default CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
TopicName topicName = TopicName.get(topic);
if (!topicName.isPersistent()) {
return FutureUtil.failedFuture(new IllegalArgumentException("The API LookupService.getExistsPartitions does"
+ " not support non-persistent topic yet."));
}
return getTopicsUnderNamespace(topicName.getNamespaceObject(),
topicName.isPersistent() ? Mode.PERSISTENT : Mode.NON_PERSISTENT,
"^" + topicName.getPartitionedTopicName() + "$",
null).thenApply(getTopicsResult -> {
if (getTopicsResult.getNonPartitionedOrPartitionTopics() == null
|| getTopicsResult.getNonPartitionedOrPartitionTopics().isEmpty()) {
return Collections.emptyList();
}
// If broker version is less than "2.11.x", it does not support broker-side pattern check, so append
// a client-side pattern check.
// If lookup service is typed HttpLookupService, the HTTP API does not support broker-side pattern
// check yet, so append a client-side pattern check.
Predicate<String> clientSideFilter;
if (getTopicsResult.isFiltered()) {
clientSideFilter = __ -> true;
} else {
clientSideFilter = tp -> Pattern.compile(TopicName.getPartitionPatten(topic)).matcher(tp).matches();
}
ArrayList<Integer> list = new ArrayList<>(getTopicsResult.getNonPartitionedOrPartitionTopics().size());
for (String partition : getTopicsResult.getNonPartitionedOrPartitionTopics()) {
int partitionIndex = TopicName.get(partition).getPartitionIndex();
if (partitionIndex < 0) {
// It is not a partition.
continue;
}
if (clientSideFilter.test(partition)) {
list.add(partitionIndex);
}
}
Collections.sort(list);
return list;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,105 @@
*/
package org.apache.pulsar.common.lookup;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
/***
* A value object.
* - The response of HTTP API "admin/v2/namespaces/{domain}/topics" is a topic(non-partitioned topic or partitions)
* array. It will be wrapped to "topics: {topic array}, topicsHash: null, filtered: false, changed: true".
* - The response of binary API {@link CommandGetTopicsOfNamespace} is a {@link CommandGetTopicsOfNamespaceResponse},
* it will be transferred to a {@link GetTopicsResult}.
* See more details https://github.com/apache/pulsar/pull/14804.
*/
@ToString
public class GetTopicsResult {
private List<String> topics;
private String topicsHash;
private boolean filtered;
private boolean changed;

/**
* Non-partitioned topics, and topic partitions of partitioned topics.
*/
@Getter
private final List<String> nonPartitionedOrPartitionTopics;

/**
* The topics have been filtered by Broker using a regexp. Otherwise, the client should do a client-side filter.
* There are three cases that brokers will not filter the topics:
* 1. the lookup service is typed HTTP lookup service, the HTTP API has not implemented this feature yet.
* 2. the broker does not support this feature(in other words, its version is lower than "2.11.0").
* 3. the input param "topicPattern" is too long than the broker config "subscriptionPatternMaxLength".
*/
@Getter
private final boolean filtered;

/**
* The topics hash that was calculated by {@link TopicList#calculateHash(List)}. The param topics that will be used
* to calculate the hash code is only contains the topics that has been filtered.
* Note: It is always "null" if broker did not filter the topics when calling the API
* "LookupService.getTopicsUnderNamespace"(in other words, {@link #filtered} is false).
*/
@Getter
private final String topicsHash;

/**
* The topics hash has changed after compare with the input param "topicsHash" when calling
* "LookupService.getTopicsUnderNamespace".
* Note: It is always set "true" if the input param "topicsHash" that used to call
* "LookupService.getTopicsUnderNamespace" is null or the "LookupService" is "HttpLookupService".
*/
@Getter
private final boolean changed;

/**
* Partitioned topics and non-partitioned topics.
* In other words, there is no topic partitions of partitioned topics in this list.
* Note: it is not a field of the response of "LookupService.getTopicsUnderNamespace", it is generated in
* client-side memory.
*/
private volatile List<String> topics;

/**
* This constructor is used for binary API.
*/
public GetTopicsResult(List<String> nonPartitionedOrPartitionTopics, String topicsHash, boolean filtered,
boolean changed) {
this.nonPartitionedOrPartitionTopics = nonPartitionedOrPartitionTopics;
this.topicsHash = topicsHash;
this.filtered = filtered;
this.changed = changed;
}

/**
* This constructor is used for HTTP API.
*/
public GetTopicsResult(String[] nonPartitionedOrPartitionTopics) {
this(Arrays.asList(nonPartitionedOrPartitionTopics), null, false, true);
}

public List<String> getTopics() {
if (topics != null) {
return topics;
}
synchronized (this) {
if (topics != null) {
return topics;
}
// Group partitioned topics.
List<String> grouped = new ArrayList<>();
for (String topic : nonPartitionedOrPartitionTopics) {
String partitionedTopic = TopicName.get(topic).getPartitionedTopicName();
if (!grouped.contains(partitionedTopic)) {
grouped.add(partitionedTopic);
}
}
topics = grouped;
return topics;
}
}
}
Loading
Loading