Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 14505] PIP-145: Enable evaluating subscription pattern on broker side #14804

Merged
merged 6 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int resourceUsageTransportPublishIntervalInSecs = 60;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Enables evaluating subscription pattern on broker side."
)
private boolean enableBrokerSideSubscriptionPatternEvaluation = true;
andrasbeni marked this conversation as resolved.
Show resolved Hide resolved

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Max length of subscription pattern"
)
private int subscriptionPatternMaxLength = 50;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -1242,7 +1243,8 @@ public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceNam
private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl peerClusterData,
NamespaceName namespace) {
PulsarClientImpl client = getNamespaceClient(peerClusterData);
return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT);
return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT, null, null)
.thenApply(GetTopicsResult::getTopics);
}

public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, l

void sendSendError(long producerId, long sequenceId, ServerError error, String errorMsg);

void sendGetTopicsOfNamespaceResponse(List<String> topics, long requestId);
void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash, boolean filtered,
boolean changed, long requestId);

void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ public void sendSendError(long producerId, long sequenceId, ServerError error, S
}

@Override
public void sendGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, requestId);
public void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash,
boolean filtered, boolean changed, long requestId) {
BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
filtered, changed, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.net.SocketAddress;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -144,6 +145,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.functions.utils.Exceptions;
Expand All @@ -159,6 +161,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final String listenerName;
private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
private State state;
private volatile boolean isActive = true;
String authRole = null;
Expand Down Expand Up @@ -267,6 +271,8 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
/ conf.getNumIOThreads();
this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2;
this.connectionController = new ConnectionController.DefaultConnectionController(conf);
this.enableSubscriptionPatternEvaluation = conf.isEnableBrokerSideSubscriptionPatternEvaluation();
this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength();
}

@Override
Expand Down Expand Up @@ -1952,6 +1958,10 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
final Optional<String> topicsPattern = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsPattern()
? commandGetTopicsOfNamespace.getTopicsPattern() : null);
final Optional<String> topicsHash = Optional.ofNullable(commandGetTopicsOfNamespace.hasTopicsHash()
? commandGetTopicsOfNamespace.getTopicsHash() : null);
final NamespaceName namespaceName = NamespaceName.get(namespace);

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
Expand All @@ -1968,12 +1978,26 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
if (isAuthorized) {
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
.thenAccept(topics -> {
boolean filterTopics = enableSubscriptionPatternEvaluation && topicsPattern.isPresent()
&& topicsPattern.get().length() <= maxSubscriptionPatternLength;
merlimat marked this conversation as resolved.
Show resolved Hide resolved
List<String> filteredTopics;
if (filterTopics) {
filteredTopics = TopicList.filterTopics(topics, topicsPattern.get());
} else {
filteredTopics = topics;
}
String hash = TopicList.calculateHash(filteredTopics);
boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash);
if (hashUnchanged) {
filteredTopics = Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug(
"[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
remoteAddress, namespace, requestId, topics.size());
}
commandSender.sendGetTopicsOfNamespaceResponse(topics, requestId);
commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, filterTopics,
!hashUnchanged, requestId);
lookupSemaphore.release();
})
.exceptionally(ex -> {
Expand Down Expand Up @@ -2010,6 +2034,8 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
}
}



@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -89,6 +91,8 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
Expand All @@ -99,13 +103,15 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -142,6 +148,14 @@ public class ServerCnxTest {
private final String successSubName = "successSub";
private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic";
private final List<String> matchingTopics = Arrays.asList(
"persistent://use/ns-abc/topic-1",
"persistent://use/ns-abc/topic-2");

private final List<String> topics = Arrays.asList(
"persistent://use/ns-abc/topic-1",
"persistent://use/ns-abc/topic-2",
"persistent://use/ns-abc/topic");

private final ManagedLedger ledgerMock = mock(ManagedLedger.class);
private final ManagedCursor cursorMock = mock(ManagedCursor.class);
Expand Down Expand Up @@ -194,6 +208,8 @@ public void setup() throws Exception {
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
doReturn(true).when(namespaceService).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);

setupMLAsyncCallbackMocks();

Expand Down Expand Up @@ -1836,4 +1852,99 @@ public void testTopicIsNotReady() throws Exception {

channel.finish();
}

@Test
public void testGetTopicsOfNamespace() throws Exception {
svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true);
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest(
"use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL, null, null);
channel.writeInbound(clientCommand);
CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse();

assertEquals(response.getTopicsList(), topics);
assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics));
assertTrue(response.isChanged());
assertFalse(response.isFiltered());

channel.finish();
}

@Test
public void testGetTopicsOfNamespaceDisabledFiltering() throws Exception {
svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(false);
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest(
"use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL,
"use/ns-abc/topic-.*", null);
channel.writeInbound(clientCommand);
CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse();

assertEquals(response.getTopicsList(), topics);
assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics));
assertTrue(response.isChanged());
assertFalse(response.isFiltered());

channel.finish();
}

@Test
public void testGetTopicsOfNamespaceLongPattern() throws Exception {
svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true);
svcConfig.setSubscriptionPatternMaxLength(10);
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest(
"use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL,
"use/ns-abc/(t|o|to|p|i|c)+-?)+!", null);
channel.writeInbound(clientCommand);
CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse();

assertEquals(response.getTopicsList(), topics);
assertEquals(response.getTopicsHash(), TopicList.calculateHash(topics));
assertTrue(response.isChanged());
assertFalse(response.isFiltered());

channel.finish();
}

@Test
public void testGetTopicsOfNamespaceFiltering() throws Exception {
svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true);
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest(
"use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL,
"use/ns-abc/topic-.*", "SOME_HASH");
channel.writeInbound(clientCommand);
CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse();

assertEquals(response.getTopicsList(), matchingTopics);
assertEquals(response.getTopicsHash(), TopicList.calculateHash(matchingTopics));
assertTrue(response.isChanged());
assertTrue(response.isFiltered());

channel.finish();
}

@Test
public void testGetTopicsOfNamespaceNoChange() throws Exception {
svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true);
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newGetTopicsOfNamespaceRequest(
"use/ns-abc", 1, CommandGetTopicsOfNamespace.Mode.ALL,
"use/ns-abc/topic-.*", TopicList.calculateHash(matchingTopics));
channel.writeInbound(clientCommand);
CommandGetTopicsOfNamespaceResponse response = (CommandGetTopicsOfNamespaceResponse) getResponse();

assertEquals(response.getTopicsList(), Collections.emptyList());
assertEquals(response.getTopicsHash(), TopicList.calculateHash(matchingTopics));
assertFalse(response.isChanged());
assertTrue(response.isFiltered());

channel.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Queue;

import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
Expand Down Expand Up @@ -150,6 +151,11 @@ protected void handleProducerSuccess(CommandProducerSuccess success) {
protected void handleLookupResponse(CommandLookupTopicResponse connection) {
queue.offer(new CommandLookupTopicResponse().copyFrom(connection));
}

@Override
protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) {
queue.offer(new CommandGetTopicsOfNamespaceResponse().copyFrom(response));
}
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -420,66 +419,6 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
producer4.close();
}

andrasbeni marked this conversation as resolved.
Show resolved Hide resolved
@Test(timeOut = testTimeout)
public void testTopicsPatternFilter() {
String topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
String topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
String topicName3 = "persistent://my-property/my-ns/hello-3";
String topicName4 = "non-persistent://my-property/my-ns/hello-4";

List<String> topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);

Pattern pattern1 = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
List<String> result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1);
assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2));

Pattern pattern2 = Pattern.compile("persistent://my-property/my-ns/.*");
List<String> result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2);
assertTrue(result2.size() == 4
&& Stream.of(topicName1, topicName2, topicName3, topicName4).allMatch(result2::contains));
}

@Test(timeOut = testTimeout)
public void testTopicsListMinus() {
String topicName1 = "persistent://my-property/my-ns/pattern-topic-1";
String topicName2 = "persistent://my-property/my-ns/pattern-topic-2";
String topicName3 = "persistent://my-property/my-ns/pattern-topic-3";
String topicName4 = "persistent://my-property/my-ns/pattern-topic-4";
String topicName5 = "persistent://my-property/my-ns/pattern-topic-5";
String topicName6 = "persistent://my-property/my-ns/pattern-topic-6";

List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6);

List<String> addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
List<String> removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);

assertTrue(addedNames.size() == 2 &&
addedNames.contains(topicName5) &&
addedNames.contains(topicName6));
assertTrue(removedNames.size() == 2 &&
removedNames.contains(topicName1) &&
removedNames.contains(topicName2));

// totally 2 different list, should return content of first lists.
List<String> addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
assertTrue(addedNames2.size() == 2 &&
addedNames2.contains(topicName5) &&
addedNames2.contains(topicName6));

// 2 same list, should return empty list.
List<String> addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
assertEquals(addedNames3.size(), 0);

// empty list minus: addedNames2.size = 2, addedNames3.size = 0
List<String> addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
assertEquals(addedNames2.size(), addedNames4.size());
addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));

List<String> addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
assertEquals(addedNames5.size(), 0);
}

// simulate subscribe a pattern which has no topics, but then matched topics added in.
@Test(timeOut = testTimeout)
public void testStartEmptyPatternConsumer() throws Exception {
Expand Down
Loading