From c0029d7381e9b5a91fad35102044bb81ebffc158 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 10 Jul 2024 18:08:48 +0800 Subject: [PATCH] [fix][broker]Fix lookupService.getTopicsUnderNamespace can not work with a quote pattern (#23014) (cherry picked from commit 7c0e82739215fbae9e21270d4c70c9a52dd3e403) --- .../impl/PatternTopicsConsumerImplTest.java | 56 ++++++++++++++++++ .../pulsar/common/topics/TopicList.java | 20 +++++-- .../pulsar/common/topics/TopicListTest.java | 58 ++++++++++++++++++- 3 files changed, 128 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index d443ca44a7ea2..04bfb03bf46e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -27,6 +27,8 @@ import com.google.common.collect.Lists; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -1113,4 +1116,57 @@ public void testTopicDeletion() throws Exception { assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty()); assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent()); } + + @Test(dataProvider = "partitioned") + public void testPatternQuote(boolean partitioned) throws Exception { + final NamespaceName namespace = NamespaceName.get("public/default"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final PulsarClientImpl client = (PulsarClientImpl) pulsarClient; + final LookupService lookup = client.getLookup(); + List expectedRes = new ArrayList<>(); + if (partitioned) { + admin.topics().createPartitionedTopic(topicName, 2); + expectedRes.add(TopicName.get(topicName).getPartition(0).toString()); + expectedRes.add(TopicName.get(topicName).getPartition(1).toString()); + Collections.sort(expectedRes); + } else { + admin.topics().createNonPartitionedTopic(topicName); + expectedRes.add(topicName); + } + + // Verify 1: "java.util.regex.Pattern.quote". + String pattern1 = java.util.regex.Pattern.quote(topicName); + List res1 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern1, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res1); + assertEquals(res1, expectedRes); + + // Verify 2: "com.google.re2j.Pattern.quote" + String pattern2 = com.google.re2j.Pattern.quote(topicName); + List res2 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern2, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res2); + assertEquals(res2, expectedRes); + + // Verify 3: "java.util.regex.Pattern.quote" & "^$" + String pattern3 = "^" + java.util.regex.Pattern.quote(topicName) + "$"; + List res3 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern3, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res3); + assertEquals(res3, expectedRes); + + // Verify 4: "com.google.re2j.Pattern.quote" & "^$" + String pattern4 = "^" + com.google.re2j.Pattern.quote(topicName) + "$"; + List res4 = lookup.getTopicsUnderNamespace(namespace, CommandGetTopicsOfNamespace.Mode.PERSISTENT, + pattern4, null).join().getNonPartitionedOrPartitionTopics(); + Collections.sort(res4); + assertEquals(res4, expectedRes); + + // cleanup. + if (partitioned) { + admin.topics().deletePartitionedTopic(topicName, false); + } else { + admin.topics().delete(topicName, false); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index 4c0a8d500b703..4dd48732225e1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.topics; +import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -28,6 +29,7 @@ import java.util.stream.Collectors; import lombok.experimental.UtilityClass; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @UtilityClass @@ -82,15 +84,23 @@ public static Set minus(Collection list1, Collection lis return s1; } - private static String removeTopicDomainScheme(String originalRegexp) { + @VisibleForTesting + static String removeTopicDomainScheme(String originalRegexp) { if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) { return originalRegexp; } - String removedTopicDomain = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString())[1]; - if (originalRegexp.contains("^")) { - return String.format("^%s", removedTopicDomain); + String[] parts = SCHEME_SEPARATOR_PATTERN.split(originalRegexp.toString()); + String prefix = parts[0]; + String removedTopicDomain = parts[1]; + if (prefix.equals(TopicDomain.persistent.value()) || prefix.equals(TopicDomain.non_persistent.value())) { + prefix = ""; + } else if (prefix.endsWith(TopicDomain.non_persistent.value())) { + prefix = prefix.substring(0, prefix.length() - TopicDomain.non_persistent.value().length()); + } else if (prefix.endsWith(TopicDomain.persistent.value())){ + prefix = prefix.substring(0, prefix.length() - TopicDomain.persistent.value().length()); } else { - return removedTopicDomain; + throw new IllegalArgumentException("Does not support topic domain: " + prefix); } + return String.format("%s%s", prefix, removedTopicDomain); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java index 9069dd6dcc7b9..b3a7536ebff9e 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java @@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class TopicListTest { @@ -107,5 +108,60 @@ public void testCalculateHash() { } - + @Test + public void testRemoveTopicDomainScheme() { + // persistent. + final String tpName1 = "persistent://public/default/tp"; + String res1 = TopicList.removeTopicDomainScheme(tpName1); + assertEquals(res1, "public/default/tp"); + + // non-persistent + final String tpName2 = "non-persistent://public/default/tp"; + String res2 = TopicList.removeTopicDomainScheme(tpName2); + assertEquals(res2, "public/default/tp"); + + // without topic domain. + final String tpName3 = "public/default/tp"; + String res3 = TopicList.removeTopicDomainScheme(tpName3); + assertEquals(res3, "public/default/tp"); + + // persistent & "java.util.regex.Pattern.quote". + final String tpName4 = java.util.regex.Pattern.quote(tpName1); + String res4 = TopicList.removeTopicDomainScheme(tpName4); + assertEquals(res4, java.util.regex.Pattern.quote("public/default/tp")); + + // persistent & "java.util.regex.Pattern.quote" & "^$". + final String tpName5 = "^" + java.util.regex.Pattern.quote(tpName1) + "$"; + String res5 = TopicList.removeTopicDomainScheme(tpName5); + assertEquals(res5, "^" + java.util.regex.Pattern.quote("public/default/tp") + "$"); + + // persistent & "com.google.re2j.Pattern.quote". + final String tpName6 = Pattern.quote(tpName1); + String res6 = TopicList.removeTopicDomainScheme(tpName6); + assertEquals(res6, Pattern.quote("public/default/tp")); + + // non-persistent & "java.util.regex.Pattern.quote". + final String tpName7 = java.util.regex.Pattern.quote(tpName2); + String res7 = TopicList.removeTopicDomainScheme(tpName7); + assertEquals(res7, java.util.regex.Pattern.quote("public/default/tp")); + + // non-persistent & "com.google.re2j.Pattern.quote". + final String tpName8 = Pattern.quote(tpName2); + String res8 = TopicList.removeTopicDomainScheme(tpName8); + assertEquals(res8, Pattern.quote("public/default/tp")); + + // non-persistent & "com.google.re2j.Pattern.quote" & "^$". + final String tpName9 = "^" + Pattern.quote(tpName2) + "$"; + String res9 = TopicList.removeTopicDomainScheme(tpName9); + assertEquals(res9, "^" + Pattern.quote("public/default/tp") + "$"); + + // wrong topic domain. + final String tpName10 = "xx://public/default/tp"; + try { + TopicList.removeTopicDomainScheme(tpName10); + fail("Does not support the topic domain xx"); + } catch (Exception ex) { + // expected error. + } + } }