Skip to content

Commit

Permalink
[cherry-pick][branch-2.10] Fix issue where unexpected ack timeout (ap…
Browse files Browse the repository at this point in the history
…ache#18906)

### Motivation
Cherry-pick apache#17503 to release 2.10.3 and run tests.

### Modifications

Cherry-pick apache#17503 to release 2.10.3.

(cherry picked from commit b5cfde1)
  • Loading branch information
liangyepianzhou authored and nicoloboschi committed Jan 10, 2023
1 parent cb97b51 commit 720b427
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
Expand Down
18 changes: 13 additions & 5 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,14 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
shared_from_this(), ResultOk, msg, callback));
} else {
if (messages_.full()) {
lock.unlock();
}
messages_.push(msg);
if (messageListener_) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
}
Expand All @@ -469,7 +468,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);

unAckedMessageTrackerPtr_->add(m.getMessageId());
try {
messageListener_(Consumer(shared_from_this()), m);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -535,11 +534,20 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
shared_from_this(), ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}

void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
const ReceiveCallback& callback) {
if (result == ResultOk) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
callback(result, msg);
}

void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);

void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
Expand Down
71 changes: 71 additions & 0 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,77 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
client.close();
}

TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
ClientConfiguration clientConfig;
clientConfig.setMessageListenerThreads(1);
Client client(lookupUrl, clientConfig);

const std::string partitionedTopic =
"testPartitionedConsumerUnexpectedAckTimeout" + std::to_string(time(nullptr));
std::string subName = "sub";
constexpr int numPartitions = 2;
constexpr int numOfMessages = 3;
constexpr int unAckedMessagesTimeoutMs = 10000;
constexpr int tickDurationInMs = 1000;
pulsar::Latch latch(numOfMessages);
std::vector<Message> messages;
std::mutex mtx;

int res =
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setConsumerType(ConsumerShared);
consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
consumerConfig.setTickDurationInMs(tickDurationInMs);
consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
// acknowledge received messages immediately, so no ack timeout is expected
ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
ASSERT_EQ(0, msg.getRedeliveryCount());

{
std::lock_guard<std::mutex> lock(mtx);
messages.emplace_back(msg);
}

if (latch.getCount() > 0) {
std::this_thread::sleep_for(
std::chrono::milliseconds(unAckedMessagesTimeoutMs + tickDurationInMs * 2));
latch.countdown();
}
});
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer));

// send messages
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setBlockIfQueueFull(true);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
std::string prefix = "message-";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg = MessageBuilder().setContent(messageContent).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
producer.close();

bool wasUnblocked = latch.wait(
std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs * 2) * numOfMessages + 5000));
ASSERT_TRUE(wasUnblocked);

std::this_thread::sleep_for(std::chrono::milliseconds(5000));
// messages are expected not to be redelivered
ASSERT_EQ(numOfMessages, messages.size());

consumer.close();
client.close();
}

TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
Client client(lookupUrl);
const std::string nonPartitionedTopic =
Expand Down

0 comments on commit 720b427

Please sign in to comment.