From d761f7b4e38a0ee8b1f4f383a89d1373cf6c937c Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Tue, 13 Jul 2021 18:28:13 +0200 Subject: [PATCH] Allow partitioned producers to start lazily Avoids performing lookups and producer registration on all partitions of a partitioned topic when using SinglePartition routing without keyed messages --- .../include/pulsar/ProducerConfiguration.h | 19 +++ .../include/pulsar/c/producer_configuration.h | 6 + pulsar-client-cpp/lib/HandlerBase.cc | 14 ++- pulsar-client-cpp/lib/HandlerBase.h | 1 + .../lib/PartitionedProducerImpl.cc | 69 +++++++++-- .../lib/PartitionedProducerImpl.h | 5 +- .../lib/ProducerConfiguration.cc | 10 ++ .../lib/ProducerConfigurationImpl.h | 1 + pulsar-client-cpp/lib/ProducerImpl.cc | 48 ++++++-- pulsar-client-cpp/lib/ProducerImpl.h | 3 + .../lib/c/c_ProducerConfiguration.cc | 10 ++ pulsar-client-cpp/python/src/config.cc | 2 + pulsar-client-cpp/tests/BasicEndToEndTest.cc | 45 +++++-- .../tests/PartitionsUpdateTest.cc | 44 ++++--- pulsar-client-cpp/tests/ProducerTest.cc | 113 ++++++++++++++++++ 15 files changed, 343 insertions(+), 47 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 3306b271d8c21e..9148c2c8476b3c 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -262,6 +262,25 @@ class PULSAR_PUBLIC ProducerConfiguration { */ HashingScheme getHashingScheme() const; + /** + * This config affects producers of partitioned topics only. It controls whether + * producers register and connect immediately to the owner broker of each partition + * or start lazily on demand. Lazy starts occur when a message needs to be routed + * to a partition that the producer has not yet registered and connected to. + * Using this mode can reduce the strain on brokers for topics with large numbers of + * partitions and when the SinglePartition routing policy is used without keyed messages. + * Because producer registration and connection is on demand, this can produce extra + * latency while the registration is being carried out. + * @param true/false as to whether to start partition producers lazily + * @return + */ + ProducerConfiguration& setLazyStartPartitionedProducers(bool); + + /** + * The getter associated with setLazyStartPartitionedProducers() + */ + bool getLazyStartPartitionedProducers() const; + /** * The setter associated with getBlockIfQueueFull() */ diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index 17653d42807d5d..cf62baafe1ff3e 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ PULSAR_PUBLIC pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf); +PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers); + +PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf); + PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full( pulsar_producer_configuration_t *conf, int blockIfQueueFull); diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index de9929d6a73b56..d7025ad004b15f 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, mutex_(), creationTimestamp_(TimeUtils::now()), operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), - state_(Pending), + state_(NotStarted), backoff_(backoff), epoch_(0), timer_(executor_->createDeadlineTimer()) {} HandlerBase::~HandlerBase() { timer_->cancel(); } -void HandlerBase::start() { grabCnx(); } +void HandlerBase::start() { + Lock lock(mutex_); + // guard against concurrent state changes such as closing + if (state_ == NotStarted) { + state_ = Pending; + lock.unlock(); + + grabCnx(); + } +} void HandlerBase::grabCnx() { Lock lock(mutex_); @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con scheduleReconnection(handler); break; + case NotStarted: case Closing: case Closed: case Failed: diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 93abba944eb888..eeb8ebe1c5e8de 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -97,6 +97,7 @@ class HandlerBase { enum State { + NotStarted, Pending, Ready, Closing, diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 4e01263d6aac15..b2d9ce6bd7b430 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { return getNumPartitions(); } -ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const { +ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) { using namespace std::placeholders; std::string topicPartitionName = topicName_->getTopicPartitionName(partition); auto producer = std::make_shared(client_, topicPartitionName, conf_, partition); - producer->getProducerCreatedFuture().addListener( - std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, - const_cast(this)->shared_from_this(), _1, _2, partition)); + + if (conf_.getLazyStartPartitionedProducers()) { + createLazyPartitionProducer(partition); + } else { + producer->getProducerCreatedFuture().addListener( + std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, + const_cast(this)->shared_from_this(), _1, _2, partition)); + } LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName); return producer; @@ -108,8 +113,10 @@ void PartitionedProducerImpl::start() { producers_.push_back(newInternalProducer(i)); } - for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->start(); + if (!conf_.getLazyStartPartitionedProducers()) { + for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { + (*prod)->start(); + } } } @@ -147,6 +154,20 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result } } +void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) { + const auto numPartitions = getNumPartitions(); + assert(numProducersCreated_ <= numPartitions); + assert(partitionIndex <= numPartitions); + numProducersCreated_++; + if (numProducersCreated_ == numPartitions) { + state_ = Ready; + if (partitionsUpdateTimer_) { + runPartitionUpdateTask(); + } + partitionedProducerCreatedPromise_.setValue(shared_from_this()); + } +} + // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // get partition for this message from router policy @@ -161,7 +182,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac } // find a producer for that partition, index should start from 0 ProducerImplPtr producer = producers_[partition]; + + // if the producer is not started (lazy producer), then kick-off the start process + // but only if we're still in the Ready state (i.e not closing or closed) + if (!producer->isStarted()) { + if (assertState(Ready)) { + producer->start(); + } else { + callback(ResultAlreadyClosed, msg.getMessageId()); + } + } + producersLock.unlock(); + // send message on that partition producer->sendAsync(msg, callback); } @@ -175,6 +208,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) { lock.unlock(); } +bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) { + Lock lock(mutex_); + return state_ == state; +} + const std::string& PartitionedProducerImpl::getProducerName() const { Lock producersLock(producersMutex_); return producers_[0]->getProducerName(); @@ -285,7 +323,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; } void PartitionedProducerImpl::triggerFlush() { Lock producersLock(producersMutex_); for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->triggerFlush(); + if ((*prod)->isStarted()) { + (*prod)->triggerFlush(); + } } } @@ -322,7 +362,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { }; for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->flushAsync(subFlushCallback); + if ((*prod)->isStarted()) { + (*prod)->flushAsync(subFlushCallback); + } else { + subFlushCallback(ResultOk); + } } } @@ -356,7 +400,10 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { auto producer = newInternalProducer(i); - producer->start(); + + if (!conf_.getLazyStartPartitionedProducers()) { + producer->start(); + } producers_.push_back(producer); } // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()` @@ -379,8 +426,8 @@ bool PartitionedProducerImpl::isConnected() const { Lock producersLock(producersMutex_); const auto producers = producers_; producersLock.unlock(); - for (const auto& producer : producers_) { - if (!producer->isConnected()) { + for (const auto& producer : producers) { + if (producer->isStarted() && !producer->isConnected()) { return false; } } diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index c097190fca3336..de8144b7220e86 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase, uint64_t getNumberOfConnectedProducer() override; void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, const unsigned int partitionIndex); - + void createLazyPartitionProducer(const unsigned int partitionIndex); void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex, CloseCallback callback); @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase, unsigned int getNumPartitions() const; unsigned int getNumPartitionsWithLock() const; - ProducerImplPtr newInternalProducer(unsigned int partition) const; + ProducerImplPtr newInternalProducer(unsigned int partition); MessageRoutingPolicyPtr routerPolicy_; @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void runPartitionUpdateTask(); void getPartitionMetadata(); void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata); + bool assertState(const PartitionedProducerState state); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc index 61217f5f47966f..3e027eeb1975fa 100644 --- a/pulsar-client-cpp/lib/ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key) return *this; } +ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers( + bool useLazyStartPartitionedProducers) { + impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers; + return *this; +} + +bool ProducerConfiguration::getLazyStartPartitionedProducers() const { + return impl_->useLazyStartPartitionedProducers; +} + ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) { impl_->schemaInfo = schemaInfo; return *this; diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index fa6b755822c632..a41b2507ea43c5 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl { ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; + bool useLazyStartPartitionedProducers{false}; bool blockIfQueueFull{false}; bool batchingEnabled{true}; unsigned int batchingMaxMessages{1000}; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index c7a45513ab2f7a..e070a881fcea23 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -109,7 +109,7 @@ ProducerImpl::~ProducerImpl() { LOG_DEBUG(getName() << "~ProducerImpl"); cancelTimers(); printStats(); - if (state_ == Ready) { + if (state_ == Ready || state_ == Pending) { LOG_WARN(getName() << "Destroyed producer which was not properly closed"); } } @@ -169,6 +169,14 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r const ResponseData& responseData) { LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result)); + // make sure we're still in the Pending state, closeAsync could have been invoked + // while waiting for this response if using lazy producers + if (!assertState(Pending)) { + LOG_DEBUG("Producer created response received but producer already closed"); + failPendingMessages(ResultAlreadyClosed); + return; + } + if (result == ResultOk) { // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately @@ -389,7 +397,8 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { } Lock lock(mutex_); - if (state_ != Ready) { + // producers may be lazily starting and be in the pending state + if (state_ != Ready && state_ != Pending) { lock.unlock(); releaseSemaphore(payloadSize); cb(ResultAlreadyClosed, msg.getMessageId()); @@ -550,10 +559,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e return; } LOG_DEBUG(getName() << " - Batch Message Timer expired"); - Lock lock(mutex_); - auto failures = batchMessageAndSend(); - lock.unlock(); - failures.complete(); + + // ignore if the producer is already closing/closed + if (assertState(Ready)) { + Lock lock(mutex_); + auto failures = batchMessageAndSend(); + lock.unlock(); + failures.complete(); + } } void ProducerImpl::printStats() { @@ -568,12 +581,19 @@ void ProducerImpl::printStats() { void ProducerImpl::closeAsync(CloseCallback callback) { Lock lock(mutex_); + // if the producer was never started then there is nothing to clean up + if (state_ == NotStarted) { + state_ = Closed; + callback(ResultOk); + return; + } + // Keep a reference to ensure object is kept alive ProducerImplPtr ptr = shared_from_this(); cancelTimers(); - if (state_ != Ready) { + if (state_ != Ready && state_ != Pending) { lock.unlock(); if (callback) { callback(ResultAlreadyClosed); @@ -631,6 +651,10 @@ void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerI } else { LOG_ERROR(getName() << "Failed to close producer: " << strResult(result)); } + + // ensure any remaining send callbacks are called before calling the close callback + failPendingMessages(ResultAlreadyClosed); + if (callback) { callback(result); } @@ -828,5 +852,15 @@ bool ProducerImpl::isConnected() const { uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } +bool ProducerImpl::isStarted() const { + Lock lock(mutex_); + return state_ != NotStarted; +} + +bool ProducerImpl::assertState(const State state) { + Lock lock(mutex_); + return state_ == state; +} + } // namespace pulsar /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 2c51d41f12788a..4667cdeb88792a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -71,6 +71,7 @@ class ProducerImpl : public HandlerBase, void flushAsync(FlushCallback callback) override; bool isConnected() const override; uint64_t getNumberOfConnectedProducer() override; + bool isStarted() const; bool removeCorruptMessage(uint64_t sequenceId); @@ -166,6 +167,8 @@ class ProducerImpl : public HandlerBase, uint32_t dataKeyGenIntervalSec_; MemoryLimitController& memoryLimitController_; + + bool assertState(const State state); }; struct ProducerImplCmp { diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc index 7bc7915c035e04..f26f63a593b089 100644 --- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc @@ -135,6 +135,16 @@ void pulsar_producer_configuration_set_message_router(pulsar_producer_configurat conf->conf.setMessageRouter(std::make_shared(router, ctx)); } +void pulsar_producer_configuration_set_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers) { + conf->conf.setLazyStartPartitionedProducers(useLazyStartPartitionedProducers); +} + +int pulsar_producer_configuration_get_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf) { + return conf->conf.getLazyStartPartitionedProducers(); +} + void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf, int blockIfQueueFull) { conf->conf.setBlockIfQueueFull(blockIfQueueFull); diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 0b30713f648832..ec945590301d9a 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -233,6 +233,8 @@ void export_config() { .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>()) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>()) .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy()) .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>()) .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy()) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 2f8524e4bfd81f..5dc46e9afbd0e1 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -518,20 +518,21 @@ TEST(BasicEndToEndTest, testInvalidUrlPassed) { ASSERT_EQ(ResultConnectError, result); } -TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { +void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) { Client client(lookupUrl); - std::string topicName = "testPartitionedProducerConsumer"; // call admin api to make it partitioned - std::string url = - adminUrl + "admin/v2/persistent/public/default/testPartitionedProducerConsumer/partitions"; + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + makeDeleteRequest(url); int res = makePutRequest(url, "3"); LOG_INFO("res = " << res); ASSERT_FALSE(res != 204 && res != 409); + ProducerConfiguration conf; + conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Producer producer; - Result result = client.createProducer(topicName, producer); + Result result = client.createProducer(topicName, conf, producer); ASSERT_EQ(ResultOk, result); Consumer consumer; @@ -558,6 +559,14 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { client.shutdown(); } +TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { + testPartitionedProducerConsumer(false, "testPartitionedProducerConsumer"); +} + +TEST(BasicEndToEndTest, testPartitionedLazyProducerConsumer) { + testPartitionedProducerConsumer(true, "testPartitionedProducerConsumerLazy"); +} + TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) { Client client(lookupUrl); std::string topicName = "testPartitionedProducerConsumerSubscriptionName" + unique_str(); @@ -2024,12 +2033,16 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { std::string url4 = adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions"; + makeDeleteRequest(url1); int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url2); res = makePutRequest(url2, "3"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url3); res = makePutRequest(url3, "4"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url4); res = makePutRequest(url4, "4"); ASSERT_FALSE(res != 204 && res != 409); @@ -2133,10 +2146,13 @@ TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) { std::string url3 = adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions"; + makeDeleteRequest(url1); int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url2); res = makePutRequest(url2, "3"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url3); res = makePutRequest(url3, "4"); ASSERT_FALSE(res != 204 && res != 409); @@ -2259,6 +2275,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { auto createProducer = [&client](Producer &producer, const std::string &topic, int numPartitions) { if (numPartitions > 0) { const std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions"; + makeDeleteRequest(url); int res = makePutRequest(url, std::to_string(numPartitions)); ASSERT_TRUE(res == 204 || res == 409); } @@ -2443,7 +2460,7 @@ static void simpleCallback(Result code, const MessageId &msgId) { LOG_INFO("Received code: " << code << " -- MsgID: " << msgId); } -TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { +void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProducers) { Client client(lookupUrl); std::string uniqueChunk = unique_str(); std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages" + uniqueChunk; @@ -2465,6 +2482,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { // set batch message number numOfMessages, and max delay 60s producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); producerConfiguration.setBatchingMaxPublishDelayMs(60000); + producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); @@ -2540,6 +2558,14 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { client.shutdown(); } +TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { + testSyncFlushBatchMessagesPartitionedTopic(false); +} + +TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopicLazyProducers) { + testSyncFlushBatchMessagesPartitionedTopic(true); +} + TEST(BasicEndToEndTest, testGetTopicPartitions) { Client client(lookupUrl); std::string topicName = "persistent://public/default/testGetPartitions"; @@ -2657,7 +2683,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) { client.shutdown(); } -TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { +void testFlushInPartitionedProducer(bool lazyStartPartitionedProducers) { Client client(lookupUrl); std::string uniqueChunk = unique_str(); std::string topicName = @@ -2682,6 +2708,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); producerConfiguration.setBatchingMaxPublishDelayMs(60000); producerConfiguration.setMessageRouter(std::make_shared()); + producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); @@ -2760,6 +2787,10 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { client.shutdown(); } +TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { testFlushInPartitionedProducer(false); } + +TEST(BasicEndToEndTest, testFlushInLazyPartitionedProducer) { testFlushInPartitionedProducer(true); } + TEST(BasicEndToEndTest, testReceiveAsync) { ClientConfiguration config; Client client(lookupUrl); diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc index af473a2c36cd2a..845e44771bd719 100644 --- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc +++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc @@ -32,11 +32,6 @@ using namespace pulsar; static const std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; -static const std::string topicNameSuffix = "public/default/partitions-update-test-topic"; -static const std::string topicName = "persistent://" + topicNameSuffix; -static const std::string topicOperateUrl = - adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions"; - static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) { ClientConfiguration clientConfig; if (enablePartitionsUpdate) { @@ -55,14 +50,16 @@ class PartitionsSet { public: size_t size() const { return names_.size(); } - Result initProducer(bool enablePartitionsUpdate) { + Result initProducer(std::string topicName, bool enablePartitionsUpdate, + bool lazyStartPartitionedProducers) { clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate))); - const auto producerConfig = - ProducerConfiguration().setMessageRouter(std::make_shared()); + const auto producerConfig = ProducerConfiguration() + .setMessageRouter(std::make_shared()) + .setLazyStartPartitionedProducers(lazyStartPartitionedProducers); return clientForProducer_->createProducer(topicName, producerConfig, producer_); } - Result initConsumer(bool enablePartitionsUpdate) { + Result initConsumer(std::string topicName, bool enablePartitionsUpdate) { clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate))); return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_); } @@ -118,7 +115,10 @@ TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) { ASSERT_EQ(static_cast(-1), clientConfig.getPartitionsUpdateInterval()); } -TEST(PartitionsUpdateTest, testPartitionsUpdate) { +void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicNameSuffix) { + std::string topicName = "persistent://" + topicNameSuffix; + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions"; + // Ensure `topicName` doesn't exist before created makeDeleteRequest(topicOperateUrl); // Create a 2 partitions topic @@ -128,8 +128,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { PartitionsSet partitionsSet; // 1. Both producer and consumer enable partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(true)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -140,8 +140,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 2. Only producer enables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(true)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -152,8 +152,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 3. Only consumer enables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(false)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -164,8 +164,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 4. Both producer and consumer disables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(false)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -175,3 +175,11 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { ASSERT_EQ(7, partitionsSet.size()); partitionsSet.close(); } + +TEST(PartitionsUpdateTest, testPartitionsUpdate) { + testPartitionsUpdate(false, "public/default/partitions-update-test-topic"); +} + +TEST(PartitionsUpdateTest, testPartitionsUpdateWithLazyProducers) { + testPartitionsUpdate(true, "public/default/partitions-update-test-topic-lazy"); +} diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 61cac57fa59b14..0a51213d1755ea 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -18,11 +18,13 @@ */ #include #include +#include #include "HttpHelper.h" #include "lib/Future.h" #include "lib/Utils.h" +#include "lib/Latch.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -126,3 +128,114 @@ TEST(ProducerTest, testIsConnected) { client.close(); } + +TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) { + Client client(serviceUrl); + const std::string partitionedTopic = + "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); + + int res = makePutRequest( + adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ProducerConfiguration producerConfiguration; + producerConfiguration.setLazyStartPartitionedProducers(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer)); + + Message msg = MessageBuilder().setContent("test").build(); + + Promise promiseClose; + producer.closeAsync(WaitForCallback(promiseClose)); + + Promise promise; + producer.sendAsync(msg, WaitForCallbackValue(promise)); + + MessageId mi; + ASSERT_EQ(ResultAlreadyClosed, promise.getFuture().get(mi)); + + Result result; + promiseClose.getFuture().get(result); + ASSERT_EQ(ResultOk, result); +} + +TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { + // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called + // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed. + for (int run = 0; run < 20; run++) { + Client client(serviceUrl); + const std::string partitionedTopic = + "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); + + int res = makePutRequest( + adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ProducerConfiguration producerConfiguration; + producerConfiguration.setLazyStartPartitionedProducers(true); + producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + producerConfiguration.setBatchingEnabled(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer)); + + int sendCount = 100; + std::vector> promises(sendCount); + Promise promiseClose; + + // only call closeAsync once at least 10 messages have been sent + Latch sendStartLatch(10); + Latch closeLatch(1); + int closedAt = 0; + + std::thread t1([&]() { + for (int i = 0; i < sendCount; i++) { + sendStartLatch.countdown(); + Message msg = MessageBuilder().setContent("test").build(); + + if (closeLatch.getCount() == 0 && closedAt == 0) { + closedAt = i; + LOG_INFO("closedAt set to " << closedAt) + } + + producer.sendAsync(msg, WaitForCallbackValue(promises[i])); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + + std::thread t2([&]() { + sendStartLatch.wait(std::chrono::milliseconds(1000)); + LOG_INFO("Closing"); + producer.closeAsync(WaitForCallback(promiseClose)); + LOG_INFO("Close called"); + closeLatch.countdown(); + Result result; + promiseClose.getFuture().get(result); + ASSERT_EQ(ResultOk, result); + LOG_INFO("Closed"); + }); + + t1.join(); + t2.join(); + + // make sure that all messages after the moment when closeAsync was invoked + // return AlreadyClosed + for (int i = 0; i < sendCount; i++) { + LOG_DEBUG("Checking " << i) + + // whether a message was sent successfully or not, it's callback + // must have been invoked + ASSERT_EQ(true, promises[i].isComplete()); + MessageId mi; + Result res = promises[i].getFuture().get(mi); + LOG_DEBUG("Result is " << res); + + // for the messages sent after closeAsync was invoked, they + // should all return ResultAlreadyClosed + if (i >= closedAt) { + ASSERT_EQ(ResultAlreadyClosed, res); + } + } + + client.close(); + } +}