diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index f23d82b32cd43..afb17a186477c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -29,8 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import com.google.common.collect.Sets; import lombok.Cleanup; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; @@ -79,6 +78,12 @@ public Object[][] getTopicPartition() { return new Object[][] {{ 0 }, { 3 }}; } + @DataProvider(name = "topics") + public Object[][] getTopics() { + return new Object[][] {{ List.of("persistent://my-property/my-ns/my-topic") }, + { List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }}; + } + @Test public void testProducerInterceptor() throws Exception { Map> ackCallback = new HashMap<>(); @@ -403,9 +408,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -449,13 +454,19 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId int keyCount = 0; for (int i = 0; i < 2; i++) { - Message received = consumer.receive(); + Message received; + if (i % 2 == 0) { + received = consumer.receive(); + } else { + received = consumer.receiveAsync().join(); + } MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { if ("beforeConsumer".equals(keyValue.getKey())) { keyCount++; } } + Assert.assertEquals(keyCount, i + 1); consumer.acknowledge(received); } Assert.assertEquals(2, keyCount); @@ -475,9 +486,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -612,8 +623,8 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForNegativeAcksSend(List topics) throws PulsarClientException, InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -640,6 +651,7 @@ public void onAcknowledgeCumulative(Consumer consumer, MessageId message @Override public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } @@ -650,7 +662,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId }; Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionType(SubscriptionType.Failover) .intercept(interceptor) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -658,7 +670,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId .subscribe(); Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); for (int i = 0; i < totalNumOfMessages; i++) { @@ -682,8 +694,9 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForAckTimeoutSend(List topics) throws PulsarClientException, + InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -714,16 +727,17 @@ public void onNegativeAcksSend(Consumer consumer, Set message @Override public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } }; Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionName("foo") .intercept(interceptor) .ackTimeout(2, TimeUnit.SECONDS) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 3f5e501b28130..bf8bd6cc95117 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private final MessageIdAdv startMessageId; private volatile boolean duringSeek = false; private final long startMessageRollbackDurationInSec; + private final ConsumerInterceptors internalConsumerInterceptors; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { @@ -137,6 +139,11 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { long startMessageRollbackDurationInSec) { super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); + if (interceptors != null) { + this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors); + } else { + this.internalConsumerInterceptors = null; + } checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); @@ -316,7 +323,8 @@ private void messageReceived(ConsumerImpl consumer, Message message) { CompletableFuture> receivedFuture = nextPendingReceive(); if (receivedFuture != null) { unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount()); - completePendingReceive(receivedFuture, topicMessage); + final Message interceptMessage = beforeConsume(topicMessage); + completePendingReceive(receivedFuture, interceptMessage); } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } @@ -369,7 +377,7 @@ protected Message internalReceive() throws PulsarClientException { checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - return message; + return beforeConsume(message); } catch (Exception e) { ExceptionHandler.handleInterruptedException(e); throw PulsarClientException.unwrap(e); @@ -388,6 +396,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount()); + message = beforeConsume(message); } resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -447,7 +456,7 @@ protected CompletableFuture> internalReceiveAsync() { checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); + result.complete(beforeConsume(message)); } }); return result; @@ -1185,7 +1194,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, this.internalConsumerInterceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); } @@ -1595,4 +1604,45 @@ private CompletableFuture> getExistsPartitions(String topic) { return list; }); } + + private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { + return new ConsumerInterceptors(new ArrayList<>()) { + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledge(consumer, messageId, exception); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, + MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception); + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set set) { + multiTopicInterceptors.onNegativeAcksSend(consumer, set); + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set set) { + multiTopicInterceptors.onAckTimeoutSend(consumer, set); + } + + @Override + public void onPartitionsChange(String topicName, int partitions) { + multiTopicInterceptors.onPartitionsChange(topicName, partitions); + } + + @Override + public void close() throws IOException { + multiTopicInterceptors.close(); + } + }; + } }