Skip to content

Commit

Permalink
[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer (
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc authored Aug 14, 2024
1 parent 3e461c0 commit c07b158
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,8 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -79,6 +78,12 @@ public Object[][] getTopicPartition() {
return new Object[][] {{ 0 }, { 3 }};
}

@DataProvider(name = "topics")
public Object[][] getTopics() {
return new Object[][] {{ List.of("persistent://my-property/my-ns/my-topic") },
{ List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }};
}

@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
Expand Down Expand Up @@ -403,9 +408,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -449,13 +454,19 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId

int keyCount = 0;
for (int i = 0; i < 2; i++) {
Message<String> received = consumer.receive();
Message<String> received;
if (i % 2 == 0) {
received = consumer.receive();
} else {
received = consumer.receiveAsync().join();
}
MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
keyCount++;
}
}
Assert.assertEquals(keyCount, i + 1);
consumer.acknowledge(received);
}
Assert.assertEquals(2, keyCount);
Expand All @@ -475,9 +486,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -612,8 +623,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand All @@ -640,6 +651,7 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}

Expand All @@ -650,15 +662,15 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

for (int i = 0; i < totalNumOfMessages; i++) {
Expand All @@ -682,8 +694,9 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException,
InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand Down Expand Up @@ -714,16 +727,17 @@ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> message

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionName("foo")
.intercept(interceptor)
.ackTimeout(2, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final MessageIdAdv startMessageId;
private volatile boolean duringSeek = false;
private final long startMessageRollbackDurationInSec;
private final ConsumerInterceptors<T> internalConsumerInterceptors;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -137,6 +139,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long startMessageRollbackDurationInSec) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
if (interceptors != null) {
this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors);
} else {
this.internalConsumerInterceptors = null;
}

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
Expand Down Expand Up @@ -316,7 +323,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
completePendingReceive(receivedFuture, topicMessage);
final Message<T> interceptMessage = beforeConsume(topicMessage);
completePendingReceive(receivedFuture, interceptMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
Expand Down Expand Up @@ -369,7 +377,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
return beforeConsume(message);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand All @@ -388,6 +396,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
message = beforeConsume(message);
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
Expand Down Expand Up @@ -447,7 +456,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
result.complete(beforeConsume(message));
}
});
return result;
Expand Down Expand Up @@ -1185,7 +1194,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, interceptors,
startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}

Expand Down Expand Up @@ -1595,4 +1604,45 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
return list;
});
}

private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledge(consumer, messageId, exception);
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer,
MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception);
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onNegativeAcksSend(consumer, set);
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onAckTimeoutSend(consumer, set);
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
multiTopicInterceptors.onPartitionsChange(topicName, partitions);
}

@Override
public void close() throws IOException {
multiTopicInterceptors.close();
}
};
}
}

0 comments on commit c07b158

Please sign in to comment.