From cf472a15358ceb4a85c9417a83f1144a921d8c2e Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Tue, 22 Feb 2022 16:20:00 +0100 Subject: [PATCH 1/4] PIP-145: Enable evaluating subscription pattern on broker side This is part 1 of PIP-145: Improve performance of regex subscriptions This change covers applying subscription pattern on broker side and using hashes to skip updates when nothing changed. --- .../pulsar/broker/ServiceConfiguration.java | 14 +++ .../broker/namespace/NamespaceService.java | 4 +- .../broker/service/PulsarCommandSender.java | 3 +- .../service/PulsarCommandSenderImpl.java | 6 +- .../pulsar/broker/service/ServerCnx.java | 28 ++++- .../pulsar/broker/service/ServerCnxTest.java | 111 ++++++++++++++++++ .../service/utils/ClientChannelHelper.java | 6 + .../impl/PatternTopicsConsumerImplTest.java | 61 ---------- .../client/impl/BinaryProtoLookupService.java | 32 +++-- .../apache/pulsar/client/impl/ClientCnx.java | 19 +-- .../pulsar/client/impl/HttpLookupService.java | 8 +- .../pulsar/client/impl/LookupService.java | 5 +- .../impl/PatternMultiTopicsConsumerImpl.java | 73 +++++++----- .../pulsar/client/impl/PulsarClientImpl.java | 33 ++---- .../PatternMultiTopicsConsumerImplTest.java | 110 +++++++++++++++++ .../client/impl/PulsarClientImplTest.java | 9 +- .../pulsar/common/lookup/GetTopicsResult.java | 38 ++++++ .../pulsar/common/lookup/package-info.java | 22 ++++ .../pulsar/common/protocol/Commands.java | 25 +++- .../pulsar/common/topics/TopicList.java | 66 +++++++++++ .../pulsar/common/topics/package-info.java | 19 +++ pulsar-common/src/main/proto/PulsarApi.proto | 8 ++ .../pulsar/common/topics/TopicListTest.java | 111 ++++++++++++++++++ .../proxy/server/LookupProxyHandler.java | 14 ++- 24 files changed, 674 insertions(+), 151 deletions(-) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/lookup/package-info.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/topics/package-info.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 3148f085b7fdb..844db084d3f3b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -914,6 +914,20 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int resourceUsageTransportPublishIntervalInSecs = 60; + @FieldContext( + dynamic = false, + category = CATEGORY_POLICIES, + doc = "Enables evaluating subscription pattern on broker side." + ) + private boolean enableBrokerSideSubscriptionPatternEvaluation = true; + + @FieldContext( + dynamic = false, + category = CATEGORY_POLICIES, + doc = "Max length of subscription pattern" + ) + private int subscriptionPatternMaxLength = 50; + // <-- dispatcher read settings --> @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2c17652554cff..d50fa5edf29ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -66,6 +66,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.BundleSplitOption; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1242,7 +1243,8 @@ public CompletableFuture> getListOfNonPersistentTopics(NamespaceNam private CompletableFuture> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl peerClusterData, NamespaceName namespace) { PulsarClientImpl client = getNamespaceClient(peerClusterData); - return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT); + return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT, null, null) + .thenApply(GetTopicsResult::getTopics); } public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index 2032d96bf25dc..b267f725986e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -50,7 +50,8 @@ void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, l void sendSendError(long producerId, long sequenceId, ServerError error, String errorMsg); - void sendGetTopicsOfNamespaceResponse(List topics, long requestId); + void sendGetTopicsOfNamespaceResponse(List topics, String topicsHash, boolean filtered, + boolean changed, long requestId); void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 27d2e58c9dff7..4cdc0a76e8c96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -119,8 +119,10 @@ public void sendSendError(long producerId, long sequenceId, ServerError error, S } @Override - public void sendGetTopicsOfNamespaceResponse(List topics, long requestId) { - BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, requestId); + public void sendGetTopicsOfNamespaceResponse(List topics, String topicsHash, + boolean filtered, boolean changed, long requestId) { + BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash, + filtered, changed, requestId); safeIntercept(command, cnx); ByteBuf outBuf = Commands.serializeWithSize(command); cnx.ctx().writeAndFlush(outBuf); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 7ececa6d62f5f..f71ab3fb47cc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -40,6 +40,7 @@ import java.net.SocketAddress; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -144,6 +145,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.functions.utils.Exceptions; @@ -159,6 +161,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final String listenerName; private final ConcurrentLongHashMap> producers; private final ConcurrentLongHashMap> consumers; + private final boolean enableSubscriptionPatternEvaluation; + private final int maxSubscriptionPatternLength; private State state; private volatile boolean isActive = true; String authRole = null; @@ -267,6 +271,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) { / conf.getNumIOThreads(); this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2; this.connectionController = new ConnectionController.DefaultConnectionController(conf); + this.enableSubscriptionPatternEvaluation = conf.isEnableBrokerSideSubscriptionPatternEvaluation(); + this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength(); } @Override @@ -1952,6 +1958,10 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet final long requestId = commandGetTopicsOfNamespace.getRequestId(); final String namespace = commandGetTopicsOfNamespace.getNamespace(); final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode(); + final Optional topicsPattern = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsPattern() + ? commandGetTopicsOfNamespace.getTopicsPattern() : null); + final Optional topicsHash = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsHash() + ? commandGetTopicsOfNamespace.getTopicsHash() : null); final NamespaceName namespaceName = NamespaceName.get(namespace); final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); @@ -1968,12 +1978,26 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet if (isAuthorized) { getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) .thenAccept(topics -> { + boolean filterTopics = enableSubscriptionPatternEvaluation && topicsPattern.isPresent() + && topicsPattern.get().length() <= maxSubscriptionPatternLength; + List filteredTopics; + if (filterTopics) { + filteredTopics = TopicList.filterTopics(topics, topicsPattern.get()); + } else { + filteredTopics = topics; + } + String hash = TopicList.calculateHash(filteredTopics); + boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash); + if (hashUnchanged) { + filteredTopics = Collections.emptyList(); + } if (log.isDebugEnabled()) { log.debug( "[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", remoteAddress, namespace, requestId, topics.size()); } - commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId); + commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, filterTopics, + !hashUnchanged, requestId); lookupSemaphore.release(); }) .exceptionally(ex -> { @@ -2010,6 +2034,8 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet } } + + @Override protected void handleGetSchema(CommandGetSchema commandGetSchema) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index e6d05548a0c3d..7f3afeb61b8d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -45,7 +45,9 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -89,6 +91,8 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandError; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; @@ -99,6 +103,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.Policies; @@ -106,6 +111,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.protocol.PulsarHandler; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -142,6 +148,14 @@ public class ServerCnxTest { private final String successSubName = "successSub"; private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic"; private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic"; + private final List matchingTopics = Arrays.asList( + "persistent://use/ns-abc/topic-1", + "persistent://use/ns-abc/topic-2"); + + private final List topics = Arrays.asList( + "persistent://use/ns-abc/topic-1", + "persistent://use/ns-abc/topic-2", + "persistent://use/ns-abc/topic"); private final ManagedLedger ledgerMock = mock(ManagedLedger.class); private final ManagedCursor cursorMock = mock(ManagedCursor.class); @@ -194,6 +208,8 @@ public void setup() throws Exception { doReturn(true).when(namespaceService).isServiceUnitOwned(any()); doReturn(true).when(namespaceService).isServiceUnitActive(any()); doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); + doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( + NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); setupMLAsyncCallbackMocks(); @@ -1836,4 +1852,99 @@ public void testTopicIsNotReady() throws Exception { channel.finish(); } + + @Test + public void testGetTopicsOfNamespace() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, null, null); + channel.writeInbound(clientCommand); + CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); + + assertEquals(response.getTopicsList(), topics); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics)); + assertTrue(response.isChanged()); + assertFalse(response.isFiltered()); + + channel.finish(); + } + + @Test + public void testGetTopicsOfNamespaceDisabledFiltering() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(false); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, + "use/ns-abc/topic-.*", null); + channel.writeInbound(clientCommand); + CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); + + assertEquals(response.getTopicsList(), topics); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics)); + assertTrue(response.isChanged()); + assertFalse(response.isFiltered()); + + channel.finish(); + } + + @Test + public void testGetTopicsOfNamespaceLongPattern() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true); + svcConfig.setSubscriptionPatternMaxLength(10); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, + "use/ns-abc/(t|o|to|p|i|c)+-?)+!", null); + channel.writeInbound(clientCommand); + CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); + + assertEquals(response.getTopicsList(), topics); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics)); + assertTrue(response.isChanged()); + assertFalse(response.isFiltered()); + + channel.finish(); + } + + @Test + public void testGetTopicsOfNamespaceFiltering() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, + "use/ns-abc/topic-.*", "SOME_HASH"); + channel.writeInbound(clientCommand); + CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); + + assertEquals(response.getTopicsList(), matchingTopics); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(matchingTopics)); + assertTrue(response.isChanged()); + assertTrue(response.isFiltered()); + + channel.finish(); + } + + @Test + public void testGetTopicsOfNamespaceNoChange() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true); + resetChannel(); + setChannelConnected(); + ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest( + "use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, + "use/ns-abc/topic-.*", TopicList.calculateHash(matchingTopics)); + channel.writeInbound(clientCommand); + CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse(); + + assertEquals(response.getTopicsList(), Collections.emptyList()); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(matchingTopics)); + assertFalse(response.isChanged()); + assertTrue(response.isFiltered()); + + channel.finish(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index cfcaa5cdc127e..92f2617cc680b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -20,6 +20,7 @@ import java.util.Queue; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; @@ -150,6 +151,11 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { protected void handleLookupResponse(CommandLookupTopicResponse connection) { queue.offer(new CommandLookupTopicResponse().copyFrom(connection)); } + + @Override + protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) { + queue.offer(new CommandGetTopicsOfNamespaceResponse().copyFrom(response)); + } }; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 5eaeea1f45e1d..51a6222a23f76 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; @@ -420,66 +419,6 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { producer4.close(); } - @Test(timeOut = testTimeout) - public void testTopicsPatternFilter() { - String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; - String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; - String topicName3 = "persistent://my-property/my-ns/hello-3"; - String topicName4 = "non-persistent://my-property/my-ns/hello-4"; - - List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); - - Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); - List result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1); - assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2)); - - Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*"); - List result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2); - assertTrue(result2.size() == 4 - && Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains)); - } - - @Test(timeOut = testTimeout) - public void testTopicsListMinus() { - String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; - String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; - String topicName3 = "persistent://my-property/my-ns/pattern-topic-3"; - String topicName4 = "persistent://my-property/my-ns/pattern-topic-4"; - String topicName5 = "persistent://my-property/my-ns/pattern-topic-5"; - String topicName6 = "persistent://my-property/my-ns/pattern-topic-6"; - - List oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); - List newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); - - List addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); - List removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); - - assertTrue(addedNames.size() == 2 && - addedNames.contains(topicName5) && - addedNames.contains(topicName6)); - assertTrue(removedNames.size() == 2 && - removedNames.contains(topicName1) && - removedNames.contains(topicName2)); - - // totally 2 different list, should return content of first lists. - List addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); - assertTrue(addedNames2.size() == 2 && - addedNames2.contains(topicName5) && - addedNames2.contains(topicName6)); - - // 2 same list, should return empty list. - List addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); - assertEquals(addedNames3.size(), 0); - - // empty list minus: addedNames2.size = 2, addedNames3.size = 0 - List addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); - assertEquals(addedNames2.size(), addedNames4.size()); - addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); - - List addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); - assertEquals(addedNames5.size(), 0); - } - // simulate subscribe a pattern which has no topics, but then matched topics added in. @Test(timeOut = testTimeout) public void testStartEmptyPatternConsumer() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 190599dfffdbc..cb809502f7e8e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -36,6 +36,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -256,8 +257,11 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { - CompletableFuture> topicsFuture = new CompletableFuture>(); + public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, + Mode mode, + String topicsPattern, + String topicsHash) { + CompletableFuture topicsFuture = new CompletableFuture<>(); AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() @@ -265,7 +269,8 @@ public CompletableFuture> getTopicsUnderNamespace(NamespaceName nam .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) .setMax(1, TimeUnit.MINUTES) .create(); - getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode); + getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode, + topicsPattern, topicsHash); return topicsFuture; } @@ -273,39 +278,41 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, NamespaceName namespace, Backoff backoff, AtomicLong remainingTime, - CompletableFuture> topicsFuture, - Mode mode) { + CompletableFuture getTopicsResultFuture, + Mode mode, + String topicsPattern, + String topicsHash) { client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( - namespace.toString(), requestId, mode); + namespace.toString(), requestId, mode, topicsPattern, topicsHash); clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> { if (t != null) { - topicsFuture.completeExceptionally(t); + getTopicsResultFuture.completeExceptionally(t); } else { if (log.isDebugEnabled()) { log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId); } - // do not keep partition part of topic name List result = new ArrayList<>(); - r.forEach(topic -> { + r.getTopics().forEach(topic -> { String filtered = TopicName.get(topic).getPartitionedTopicName(); if (!result.contains(filtered)) { result.add(filtered); } }); - topicsFuture.complete(result); + getTopicsResultFuture.complete(new GetTopicsResult(result, r.getTopicsHash(), + r.isFiltered(), r.isChanged())); } client.getCnxPool().releaseConnection(clientCnx); }); }).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { - topicsFuture.completeExceptionally( + getTopicsResultFuture.completeExceptionally( new PulsarClientException.TimeoutException( format("Could not get topics of namespace %s within configured timeout", namespace.toString()))); @@ -316,7 +323,8 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); - getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture, mode); + getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, getTopicsResultFuture, + mode, topicsPattern, topicsHash); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 4cbf98c4fe684..6c23d66823662 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -35,7 +35,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -89,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaVersion; @@ -793,7 +793,7 @@ public CompletableFuture newLookup(ByteBuf request, long reque return future; } - public CompletableFuture> newGetTopicsOfNamespace(ByteBuf request, long requestId) { + public CompletableFuture newGetTopicsOfNamespace(ByteBuf request, long requestId) { return sendRequestAndHandleTimeout(request, requestId, RequestType.GetTopics, true); } @@ -811,20 +811,21 @@ protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResp checkArgument(state == State.Ready); long requestId = success.getRequestId(); - List topics = new ArrayList(success.getTopicsCount()); - for (int i = 0; i < success.getTopicsCount(); i++) { - topics.add(success.getTopicAt(i)); - } + List topics = success.getTopicsList(); + if (log.isDebugEnabled()) { log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}", ctx.channel(), success.getRequestId(), topics.size()); } - CompletableFuture> requestFuture = - (CompletableFuture>) pendingRequests.remove(requestId); + CompletableFuture requestFuture = + (CompletableFuture) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(topics); + requestFuture.complete(new GetTopicsResult(topics, + success.hasTopicsHash() ? success.getTopicsHash() : null, + success.isFiltered(), + success.isChanged())); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index ced0aba9bba10..6d04c5fab3102 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -123,8 +124,9 @@ public String getServiceUrl() { } @Override - public CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode) { - CompletableFuture> future = new CompletableFuture<>(); + public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, + String topicsPattern, String topicsHash) { + CompletableFuture future = new CompletableFuture<>(); String format = namespace.isV2() ? "admin/v2/namespaces/%s/topics?mode=%s" : "admin/namespaces/%s/destinations?mode=%s"; @@ -139,7 +141,7 @@ public CompletableFuture> getTopicsUnderNamespace(NamespaceName nam result.add(filtered); } }); - future.complete(result); + future.complete(new GetTopicsResult(result, topicsHash, false, true)); }).exceptionally(ex -> { log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, ex.getMessage()); future.completeExceptionally(ex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 97c53a48f3413..e7d358148c0c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -19,12 +19,12 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -96,6 +96,7 @@ public interface LookupService extends AutoCloseable { * @param namespace : namespace-name * @return */ - CompletableFuture> getTopicsUnderNamespace(NamespaceName namespace, Mode mode); + CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, + String topicPattern, String topicsHash); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 2a994e2a892f8..d5c37aa9fa2f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -25,19 +25,19 @@ import io.netty.util.TimerTask; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +48,10 @@ public class PatternMultiTopicsConsumerImpl extends MultiTopicsConsumerImpl conf, ExecutorProvider executorProvider, @@ -60,6 +62,7 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, super(client, conf, executorProvider, subscribeFuture, schema, interceptors, false /* createTopicIfDoesNotExist */); this.topicsPattern = topicsPattern; + this.topicsHash = topicsHash; this.subscriptionMode = subscriptionMode; if (this.namespaceName == null) { @@ -83,17 +86,9 @@ public void run(Timeout timeout) throws Exception { return; } - CompletableFuture recheckFuture = new CompletableFuture<>(); - List> futures = Lists.newArrayListWithExpectedSize(2); - client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> { - if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); - topics.forEach(topicName -> - log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); - } - - List newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); + client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, topicsPattern.pattern(), + topicsHash).thenAccept(getTopicsResult -> { List oldTopics = new ArrayList<>(); oldTopics.addAll(getPartitionedTopics()); getPartitions().forEach(p -> { @@ -102,16 +97,8 @@ public void run(Timeout timeout) throws Exception { oldTopics.add(p); } }); - - futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); - futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); - FutureUtil.waitForAll(futures) - .thenAccept(finalFuture -> recheckFuture.complete(null)) - .exceptionally(ex -> { - log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); - recheckFuture.completeExceptionally(ex); - return null; - }); + topicsHash = updateSubscriptions(topicsPattern, topicsHash, namespaceName, getTopicsResult, + topicsChangeListener, oldTopics, topic); }); // schedule the next re-check task @@ -119,6 +106,41 @@ public void run(Timeout timeout) throws Exception { Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); } + static String updateSubscriptions(Pattern topicsPattern, String topicsHash, NamespaceName namespaceName, + GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangeListener, + List oldTopics, String dummyTopicName) { + List> futures = new ArrayList(2); + if (log.isDebugEnabled()) { + log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}", + namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), + getTopicsResult.isFiltered()); + getTopicsResult.getTopics().forEach(topicName -> + log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName)); + } + + if (!getTopicsResult.isChanged()) { + return topicsHash; + } + String newTopicsHash = getTopicsResult.getTopicsHash(); + + List newTopics; + if (getTopicsResult.isFiltered()) { + newTopics = getTopicsResult.getTopics(); + } else { + newTopics = TopicList.filterTopics(getTopicsResult.getTopics(), topicsPattern); + } + + + futures.add(topicsChangeListener.onTopicsAdded(TopicList.minus(newTopics, oldTopics))); + futures.add(topicsChangeListener.onTopicsRemoved(TopicList.minus(oldTopics, newTopics))); + FutureUtil.waitForAll(futures) + .exceptionally(ex -> { + log.warn("[{}] Failed to recheck topics change: {}", dummyTopicName, ex.getMessage()); + return null; + }); + return newTopicsHash; + } + public Pattern getPattern() { return this.topicsPattern; } @@ -176,13 +198,6 @@ public CompletableFuture onTopicsAdded(Collection addedTopics) { } } - // get topics, which are contained in list1, and not in list2 - public static List topicsListsMinus(List list1, List list2) { - HashSet s1 = new HashSet<>(list1); - s1.removeAll(list2); - return s1.stream().collect(Collectors.toList()); - } - @Override public CompletableFuture closeAsync() { Timeout timeout = recheckPatternTimeout; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 77fdbed0b5fac..6b448f9538b2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -46,8 +46,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import lombok.Builder; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -84,6 +82,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; @@ -534,17 +533,24 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo NamespaceName namespaceName = destination.getNamespaceObject(); CompletableFuture> consumerSubscribedFuture = new CompletableFuture<>(); - lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode) - .thenAccept(topics -> { + lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) + .thenAccept(getTopicsResult -> { if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}", namespaceName, topics.size()); - topics.forEach(topicName -> + log.debug("Get topics under namespace {}, topics.size: {}," + + " topicsHash: {}, changed: {}, filtered: {}", + namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), + getTopicsResult.isChanged(), getTopicsResult.isFiltered()); + getTopicsResult.getTopics().forEach(topicName -> log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName)); } - List topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); + List topicsList = getTopicsResult.getTopics(); + if (!getTopicsResult.isFiltered()) { + topicsList = TopicList.filterTopics(getTopicsResult.getTopics(), conf.getTopicsPattern()); + } conf.getTopicNames().addAll(topicsList); ConsumerBase consumer = new PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(), + getTopicsResult.getTopicsHash(), PulsarClientImpl.this, conf, externalExecutorProvider, @@ -562,19 +568,6 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo return consumerSubscribedFuture; } - // get topics that match 'topicsPattern' from original topics list - // return result should contain only topic names, without partition part - public static List topicsPatternFilter(List original, Pattern topicsPattern) { - final Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") - ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern; - - return original.stream() - .map(TopicName::get) - .map(TopicName::toString) - .filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()) - .collect(Collectors.toList()); - } - public CompletableFuture> createReaderAsync(ReaderConfigurationData conf) { return createReaderAsync(conf, Schema.BYTES); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java new file mode 100644 index 0000000000000..15622f4db6b8d --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.google.common.collect.Sets; +import org.apache.pulsar.common.lookup.GetTopicsResult; +import org.apache.pulsar.common.naming.NamespaceName; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +public class PatternMultiTopicsConsumerImplTest { + + private PatternMultiTopicsConsumerImpl.TopicsChangedListener mockListener; + + + @BeforeMethod(alwaysRun = true) + public void setUp() { + mockListener = mock(PatternMultiTopicsConsumerImpl.TopicsChangedListener.class); + when(mockListener.onTopicsAdded(any())).thenReturn(CompletableFuture.completedFuture(null)); + when(mockListener.onTopicsRemoved(any())).thenReturn(CompletableFuture.completedFuture(null)); + } + + @Test + public void testChangedUnfilteredResponse() { + String newHash = PatternMultiTopicsConsumerImpl.updateSubscriptions( + Pattern.compile("tenant/my-ns/name-.*"), + "TOPICS_HASH", + NamespaceName.get("tenant/my-ns"), + new GetTopicsResult(Arrays.asList( + "persistent://tenant/my-ns/name-1", + "persistent://tenant/my-ns/name-2", + "persistent://tenant/my-ns/non-matching"), + null, false, true), + mockListener, + Collections.emptyList(), + MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX); + verify(mockListener).onTopicsAdded(Sets.newHashSet( + "persistent://tenant/my-ns/name-1", + "persistent://tenant/my-ns/name-2")); + verify(mockListener).onTopicsRemoved(Collections.emptySet()); + Assert.assertNull(newHash); + } + + @Test + public void testChangedFilteredResponse() { + String newHash = PatternMultiTopicsConsumerImpl.updateSubscriptions( + Pattern.compile("tenant/my-ns/name-.*"), + "TOPICS_HASH", + NamespaceName.get("tenant/my-ns"), + new GetTopicsResult(Arrays.asList( + "persistent://tenant/my-ns/name-0", + "persistent://tenant/my-ns/name-1", + "persistent://tenant/my-ns/name-2"), + "OTHER_HASH", true, true), + mockListener, + Arrays.asList("persistent://tenant/my-ns/name-0"), + MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX); + verify(mockListener).onTopicsAdded(Sets.newHashSet( + "persistent://tenant/my-ns/name-1", + "persistent://tenant/my-ns/name-2")); + verify(mockListener).onTopicsRemoved(Collections.emptySet()); + Assert.assertEquals("OTHER_HASH", newHash); + } + + @Test + public void testUnchangedResponse() { + String newHash = PatternMultiTopicsConsumerImpl.updateSubscriptions( + Pattern.compile("tenant/my-ns/name-.*"), + "TOPICS_HASH", + NamespaceName.get("tenant/my-ns"), + new GetTopicsResult(Arrays.asList( + "persistent://tenant/my-ns/name-0", + "persistent://tenant/my-ns/name-1", + "persistent://tenant/my-ns/name-2"), + "TOPICS_HASH", true, false), + mockListener, + Arrays.asList("persistent://tenant/my-ns/name-0"), + MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX); + verify(mockListener, never()).onTopicsAdded(any()); + verify(mockListener, never()).onTopicsRemoved(any()); + Assert.assertEquals("TOPICS_HASH", newHash); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 386294e24b784..4c174ff0ac3f1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; @@ -55,6 +56,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; +import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -110,8 +112,11 @@ public void testConsumerIsClosed() throws Exception { LookupService lookup = mock(LookupService.class); when(lookup.getTopicsUnderNamespace( any(NamespaceName.class), - any(CommandGetTopicsOfNamespace.Mode.class))) - .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); + any(CommandGetTopicsOfNamespace.Mode.class), + nullable(String.class), + nullable(String.class))) + .thenReturn(CompletableFuture.completedFuture( + new GetTopicsResult(Collections.emptyList(), null, false, true))); when(lookup.getPartitionedTopicMetadata(any(TopicName.class))) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); when(lookup.getBroker(any(TopicName.class))) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java new file mode 100644 index 0000000000000..74e51b3ea4694 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/GetTopicsResult.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.lookup; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@ToString +public class GetTopicsResult { + private List topics; + private String topicsHash; + private boolean filtered; + private boolean changed; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/package-info.java new file mode 100644 index 0000000000000..988e5be90fa8d --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Lookup data. + */ +package org.apache.pulsar.common.lookup; \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 85843c800910c..d1cbe3ad96141 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1082,28 +1082,43 @@ public static BaseCommand newConsumerStatsResponseCommand(ServerError serverErro return cmd; } - public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, Mode mode) { + public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, Mode mode, + String topicsPattern, String topicsHash) { BaseCommand cmd = localCmd(Type.GET_TOPICS_OF_NAMESPACE); CommandGetTopicsOfNamespace topics = cmd.setGetTopicsOfNamespace(); topics.setNamespace(namespace); topics.setRequestId(requestId); topics.setMode(mode); + if (topicsPattern != null) { + topics.setTopicsPattern(topicsPattern); + } + if (topicsHash != null) { + topics.setTopicsHash(topicsHash); + } return serializeWithSize(cmd); } - public static BaseCommand newGetTopicsOfNamespaceResponseCommand(List topics, long requestId) { + public static BaseCommand newGetTopicsOfNamespaceResponseCommand(List topics, String topicsHash, + boolean filtered, boolean changed, + long requestId) { BaseCommand cmd = localCmd(Type.GET_TOPICS_OF_NAMESPACE_RESPONSE); CommandGetTopicsOfNamespaceResponse topicsResponse = cmd.setGetTopicsOfNamespaceResponse(); topicsResponse.setRequestId(requestId); for (int i = 0; i < topics.size(); i++) { topicsResponse.addTopic(topics.get(i)); } - + if (topicsHash != null) { + topicsResponse.setTopicsHash(topicsHash); + } + topicsResponse.setFiltered(filtered); + topicsResponse.setChanged(changed); return cmd; } - public static ByteBuf newGetTopicsOfNamespaceResponse(List topics, long requestId) { - return serializeWithSize(newGetTopicsOfNamespaceResponseCommand(topics, requestId)); + public static ByteBuf newGetTopicsOfNamespaceResponse(List topics, String topicsHash, + boolean filtered, boolean changed, long requestId) { + return serializeWithSize(newGetTopicsOfNamespaceResponseCommand( + topics, topicsHash, filtered, changed, requestId)); } private static final ByteBuf cmdPing; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java new file mode 100644 index 0000000000000..70680bba1a0ce --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.pulsar.common.naming.TopicName; + +public class TopicList { + + // get topics that match 'topicsPattern' from original topics list + // return result should contain only topic names, without partition part + public static List filterTopics(List original, String regex) { + Pattern topicsPattern = Pattern.compile(regex); + return filterTopics(original, topicsPattern); + } + public static List filterTopics(List original, Pattern topicsPattern) { + + final Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") + ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern; + + return original.stream() + .map(TopicName::get) + .map(TopicName::toString) + .filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()) + .collect(Collectors.toList()); + } + + public static String calculateHash(List topics) { + return Hashing.crc32c().hashBytes(topics.stream() + .sorted() + .collect(Collectors.joining(",")) + .getBytes(StandardCharsets.UTF_8)).toString(); + } + + + + // get topics, which are contained in list1, and not in list2 + public static Set minus(Collection list1, Collection list2) { + HashSet s1 = new HashSet<>(list1); + s1.removeAll(list2); + return s1; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/package-info.java new file mode 100644 index 0000000000000..0d16376a432b6 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 337c6bb2cd41a..a5d97e51acfd7 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -738,11 +738,19 @@ message CommandGetTopicsOfNamespace { required uint64 request_id = 1; required string namespace = 2; optional Mode mode = 3 [default = PERSISTENT]; + optional string topics_pattern = 4; + optional string topics_hash = 5; } message CommandGetTopicsOfNamespaceResponse { required uint64 request_id = 1; repeated string topics = 2; + // true iff the topic list was filtered by the pattern supplied by the client + optional bool filtered = 3 [default = false]; + // hash computed from the names of matching topics + optional string topics_hash = 4; + // if false, topics is empty and the list of matching topics has not changed + optional bool changed = 5 [default = true]; } message CommandGetSchema { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java new file mode 100644 index 0000000000000..9c3b54a0f0d80 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/topics/TopicListTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.topics; + +import com.google.common.collect.Lists; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +public class TopicListTest { + + @Test + public void testFilterTopics() { + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; + String topicName3 = "persistent://my-property/my-ns/hello-3"; + String topicName4 = "non-persistent://my-property/my-ns/hello-4"; + + List topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); + + Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*"); + List result1 = TopicList.filterTopics(topicsNames, pattern1); + assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2)); + + Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*"); + List result2 = TopicList.filterTopics(topicsNames, pattern2); + assertTrue(result2.size() == 4 + && Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains)); + } + + @Test + public void testMinus() { + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3"; + String topicName4 = "persistent://my-property/my-ns/pattern-topic-4"; + String topicName5 = "persistent://my-property/my-ns/pattern-topic-5"; + String topicName6 = "persistent://my-property/my-ns/pattern-topic-6"; + + List oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); + List newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); + + Set addedNames = TopicList.minus(newNames, oldNames); + Set removedNames = TopicList.minus(oldNames, newNames); + + assertTrue(addedNames.size() == 2 && + addedNames.contains(topicName5) && + addedNames.contains(topicName6)); + assertTrue(removedNames.size() == 2 && + removedNames.contains(topicName1) && + removedNames.contains(topicName2)); + + // totally 2 different list, should return content of first lists. + Set addedNames2 = TopicList.minus(addedNames, removedNames); + assertTrue(addedNames2.size() == 2 && + addedNames2.contains(topicName5) && + addedNames2.contains(topicName6)); + + // 2 same list, should return empty list. + Set addedNames3 = TopicList.minus(addedNames, addedNames); + assertEquals(addedNames3.size(), 0); + + // empty list minus: addedNames2.size = 2, addedNames3.size = 0 + Set addedNames4 = TopicList.minus(addedNames2, addedNames3); + assertEquals(addedNames2.size(), addedNames4.size()); + addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); + + Set addedNames5 = TopicList.minus(addedNames3, addedNames2); + assertEquals(addedNames5.size(), 0); + } + + @Test + public void testCalculateHash() { + String topicName1 = "persistent://my-property/my-ns/pattern-topic-1"; + String topicName2 = "persistent://my-property/my-ns/pattern-topic-2"; + String topicName3 = "persistent://my-property/my-ns/pattern-topic-3"; + String hash1 = TopicList.calculateHash(Arrays.asList(topicName3, topicName2, topicName1)); + String hash2 = TopicList.calculateHash(Arrays.asList(topicName1, topicName3, topicName2)); + assertEquals(hash1, hash2, "Hash must not depend on order of topics in the list"); + + String hash3 = TopicList.calculateHash(Arrays.asList(topicName1, topicName2)); + assertNotEquals(hash1, hash3, "Different list must have different hashes"); + + } + + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 69d60fcabd071..068772958c89a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -292,14 +292,20 @@ private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTo if (!StringUtils.isNotBlank(serviceUrl)) { return; } + String topicsPattern = commandGetTopicsOfNamespace.hasTopicsPattern() + ? commandGetTopicsOfNamespace.getTopicsPattern() : null; + String topicsHash = commandGetTopicsOfNamespace.hasTopicsHash() + ? commandGetTopicsOfNamespace.getTopicsHash() : null; performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, - 10, commandGetTopicsOfNamespace.getMode()); + 10, topicsPattern, topicsHash, commandGetTopicsOfNamespace.getMode()); } private void performGetTopicsOfNamespace(long clientRequestId, String namespaceName, String brokerServiceUrl, int numberOfRetries, + String topicsPattern, + String topicsHash, CommandGetTopicsOfNamespace.Mode mode) { if (numberOfRetries == 0) { proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId, ServerError.ServiceNotReady, @@ -321,7 +327,8 @@ private void performGetTopicsOfNamespace(long clientRequestId, // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode); + command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode, + topicsPattern, topicsHash); clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { if (t != null) { log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", @@ -330,7 +337,8 @@ private void performGetTopicsOfNamespace(long clientRequestId, Commands.newError(clientRequestId, ServerError.ServiceNotReady, t.getMessage())); } else { proxyConnection.ctx().writeAndFlush( - Commands.newGetTopicsOfNamespaceResponse(r, clientRequestId)); + Commands.newGetTopicsOfNamespaceResponse(r.getTopics(), r.getTopicsHash(), r.isFiltered(), + r.isChanged(), clientRequestId)); } }); From fde4aa6a7246d2fde42a61b5a6d43043007acfcb Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Wed, 30 Mar 2022 11:02:18 +0200 Subject: [PATCH 2/4] TopicList class: make it utility, precompile pattern --- .../org/apache/pulsar/common/topics/TopicList.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index 70680bba1a0ce..a4c6f9b2bab96 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -26,10 +26,17 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; + +import lombok.experimental.UtilityClass; import org.apache.pulsar.common.naming.TopicName; +@UtilityClass public class TopicList { + private static final String SCHEME_SEPARATOR = "://"; + + private static final Pattern SCHEME_SEPARATOR_PATTERN = Pattern.compile(Pattern.quote(SCHEME_SEPARATOR)); + // get topics that match 'topicsPattern' from original topics list // return result should contain only topic names, without partition part public static List filterTopics(List original, String regex) { @@ -38,13 +45,13 @@ public static List filterTopics(List original, String regex) { } public static List filterTopics(List original, Pattern topicsPattern) { - final Pattern shortenedTopicsPattern = topicsPattern.toString().contains("://") - ? Pattern.compile(topicsPattern.toString().split("\\:\\/\\/")[1]) : topicsPattern; + final Pattern shortenedTopicsPattern = topicsPattern.toString().contains(SCHEME_SEPARATOR) + ? Pattern.compile(SCHEME_SEPARATOR_PATTERN.split(topicsPattern.toString())[1]) : topicsPattern; return original.stream() .map(TopicName::get) .map(TopicName::toString) - .filter(topic -> shortenedTopicsPattern.matcher(topic.split("\\:\\/\\/")[1]).matches()) + .filter(topic -> shortenedTopicsPattern.matcher(SCHEME_SEPARATOR_PATTERN.split(topic)[1]).matches()) .collect(Collectors.toList()); } From 0033f4eb9496a1b5030fc13ef8a4f4f487ad8770 Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Thu, 31 Mar 2022 17:06:40 +0200 Subject: [PATCH 3/4] Log when client provided pattern is too long --- .../pulsar/broker/service/ServerCnx.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f71ab3fb47cc1..0c77d886e0306 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1978,13 +1978,17 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet if (isAuthorized) { getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) .thenAccept(topics -> { - boolean filterTopics = enableSubscriptionPatternEvaluation && topicsPattern.isPresent() - && topicsPattern.get().length() <= maxSubscriptionPatternLength; - List filteredTopics; - if (filterTopics) { - filteredTopics = TopicList.filterTopics(topics, topicsPattern.get()); - } else { - filteredTopics = topics; + boolean filterTopics = false; + List filteredTopics = topics; + + if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) { + if (topicsPattern.get().length() <= maxSubscriptionPatternLength) { + filterTopics = true; + filteredTopics = TopicList.filterTopics(topics, topicsPattern.get()); + } else { + log.info("[{}] Subscription pattern provided was longer than maximum {}.", + remoteAddress, maxSubscriptionPatternLength); + } } String hash = TopicList.calculateHash(filteredTopics); boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash); From 0be7c514ef169bbfbf8dac5c98b9cf2acdff0118 Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Fri, 1 Apr 2022 15:17:14 +0200 Subject: [PATCH 4/4] Fix code style error --- .../src/main/java/org/apache/pulsar/common/topics/TopicList.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index a4c6f9b2bab96..ed37386789869 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; - import lombok.experimental.UtilityClass; import org.apache.pulsar.common.naming.TopicName;