Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a quote pattern #23014

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.common.collect.Lists;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -49,6 +51,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -1114,4 +1117,57 @@ public void testTopicDeletion() throws Exception {
assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
}

@Test(dataProvider = "partitioned")
public void testPatternQuote(boolean partitioned) throws Exception {
final NamespaceName namespace = NamespaceName.get("public/default");
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
final LookupService lookup = client.getLookup();
List<String> expectedRes = new ArrayList<>();
if (partitioned) {
admin.topics().createPartitionedTopic(topicName, 2);
expectedRes.add(TopicName.get(topicName).getPartition(0).toString());
expectedRes.add(TopicName.get(topicName).getPartition(1).toString());
Collections.sort(expectedRes);
} else {
admin.topics().createNonPartitionedTopic(topicName);
expectedRes.add(topicName);
}

// Verify 1: "java.util.regex.Pattern.quote".
String pattern1 = java.util.regex.Pattern.quote(topicName);
List<String> res1 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern1, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res1);
assertEquals(res1, expectedRes);

// Verify 2: "com.google.re2j.Pattern.quote"
String pattern2 = com.google.re2j.Pattern.quote(topicName);
List<String> res2 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern2, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res2);
assertEquals(res2, expectedRes);

// Verify 3: "java.util.regex.Pattern.quote" & "^$"
String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$";
List<String> res3 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern3, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res3);
assertEquals(res3, expectedRes);

// Verify 4: "com.google.re2j.Pattern.quote" & "^$"
String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$";
List<String> res4 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT,
pattern4, null).join().getNonPartitionedOrPartitionTopics();
Collections.sort(res4);
assertEquals(res4, expectedRes);

// cleanup.
if (partitioned) {
admin.topics().deletePartitionedTopic(topicName, false);
} else {
admin.topics().delete(topicName, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.topics;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hashing;
import com.google.re2j.Pattern;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,6 +29,7 @@
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;

@UtilityClass
Expand Down Expand Up @@ -83,15 +85,23 @@ public static Set<String> minus(Collection<String> list1, Collection<String> lis
return s1;
}

private static String removeTopicDomainScheme(String originalRegexp) {
@VisibleForTesting
static String removeTopicDomainScheme(String originalRegexp) {
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
return originalRegexp;
}
String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1];
if (originalRegexp.contains("^")) {
return String.format("^%s", removedTopicDomain);
String[] parts = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString());
String prefix = parts[0];
String removedTopicDomain = parts[1];
if (prefix.equals(TopicDomain.persistent.value()) || prefix.equals(TopicDomain.non_persistent.value())) {
prefix = "";
} else if (prefix.endsWith(TopicDomain.non_persistent.value())) {
prefix = prefix.substring(0, prefix.length() - TopicDomain.non_persistent.value().length());
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
} else if (prefix.endsWith(TopicDomain.persistent.value())){
prefix = prefix.substring(0, prefix.length() - TopicDomain.persistent.value().length());
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
} else {
return removedTopicDomain;
throw new IllegalArgumentException("Does not support topic domain: " + prefix);
}
return String.format("%s%s", prefix, removedTopicDomain);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TopicListTest {

Expand Down Expand Up @@ -107,5 +108,60 @@ public void testCalculateHash() {

}


@Test
public void testRemoveTopicDomainScheme() {
// persistent.
final String tpName1 = "persistent://public/default/tp";
String res1 = TopicList.removeTopicDomainScheme(tpName1);
assertEquals(res1, "public/default/tp");

// non-persistent
final String tpName2 = "non-persistent://public/default/tp";
String res2 = TopicList.removeTopicDomainScheme(tpName2);
assertEquals(res2, "public/default/tp");

// without topic domain.
final String tpName3 = "public/default/tp";
String res3 = TopicList.removeTopicDomainScheme(tpName3);
assertEquals(res3, "public/default/tp");

// persistent & "java.util.regex.Pattern.quote".
final String tpName4 = java.util.regex.Pattern.quote(tpName1);
String res4 = TopicList.removeTopicDomainScheme(tpName4);
assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp"));

// persistent & "java.util.regex.Pattern.quote" & "^$".
final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + "$";
String res5 = TopicList.removeTopicDomainScheme(tpName5);
assertEquals(res5, "^" + java.util.regex.Pattern.quote("public/default/tp") + "$");

// persistent & "com.google.re2j.Pattern.quote".
final String tpName6 = Pattern.quote(tpName1);
String res6 = TopicList.removeTopicDomainScheme(tpName6);
assertEquals(res6, Pattern.quote("public/default/tp"));

// non-persistent & "java.util.regex.Pattern.quote".
final String tpName7 = java.util.regex.Pattern.quote(tpName2);
String res7 = TopicList.removeTopicDomainScheme(tpName7);
assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp"));

// non-persistent & "com.google.re2j.Pattern.quote".
final String tpName8 = Pattern.quote(tpName2);
String res8 = TopicList.removeTopicDomainScheme(tpName8);
assertEquals(res8, Pattern.quote("public/default/tp"));

// non-persistent & "com.google.re2j.Pattern.quote" & "^$".
final String tpName9 = "^" + Pattern.quote(tpName2) + "$";
String res9 = TopicList.removeTopicDomainScheme(tpName9);
assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$");

// wrong topic domain.
final String tpName10 = "xx://public/default/tp";
try {
TopicList.removeTopicDomainScheme(tpName10);
fail("Does not support the topic domain xx");
} catch (Exception ex) {
// expected error.
}
}
}
Loading