Skip to content

Commit

Permalink
Address reviewer comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Sep 8, 2022
1 parent 91e7dd2 commit 0ba3d01
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
17 changes: 9 additions & 8 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,24 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// sending the subscribe request.
cnx->registerConsumer(consumerId_, shared_from_this());

if (duringSeek_) {
ackGroupingTrackerPtr_->flushAndClean();
}

Lock lockForMessageId(mutexForMessageId_);
// Update startMessageId so that we can discard messages after delivery restarts
startMessageId_ = clearReceiveQueue();
const auto startMessageId = clearReceiveQueue();
const auto subscribeMessageId = (subscriptionMode_ == Commands::SubscriptionModeNonDurable)
? startMessageId_.get()
? startMessageId
: Optional<MessageId>::empty();
startMessageId_ = startMessageId;
lockForMessageId.unlock();

unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();

ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
if (duringSeek_) {
ackGroupingTrackerPtr_->flushAndClean();
}

SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
Expand Down Expand Up @@ -1403,12 +1404,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
});
}

bool ConsumerImpl::isPriorBatchIndex(long idx) {
bool ConsumerImpl::isPriorBatchIndex(int32_t idx) {
return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().batchIndex()
: idx <= startMessageId_.get().value().batchIndex();
}

bool ConsumerImpl::isPriorEntryIndex(long idx) {
bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
return config_.isStartMessageIdInclusive() ? idx < startMessageId_.get().value().entryId()
: idx <= startMessageId_.get().value().entryId();
}
Expand Down
5 changes: 2 additions & 3 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
#include <atomic>
#include "SharedBuffer.h"
#include "Synchronized.h"

using namespace pulsar;
Expand Down Expand Up @@ -170,8 +169,8 @@ class ConsumerImpl : public ConsumerImplBase,
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
int redeliveryCount);
bool isPriorBatchIndex(long idx);
bool isPriorEntryIndex(long idx);
bool isPriorBatchIndex(int32_t idx);
bool isPriorEntryIndex(int64_t idx);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);

bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
Expand Down
5 changes: 2 additions & 3 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,9 @@ class ConsumerSeekTest : public ::testing::TestWithParam<bool> {

TEST_P(ConsumerSeekTest, testSeekForMessageId) {
Client client(lookupUrl);
auto n = std::chrono::system_clock::now();
auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(n.time_since_epoch());

const std::string topic = "test-seek-for-message-id-" + std::to_string(now.count());
const std::string topic = "test-seek-for-message-id-" + std::string((GetParam() ? "batch-" : "")) +
std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
Expand Down

0 comments on commit 0ba3d01

Please sign in to comment.