Skip to content

Commit

Permalink
Allow partitioned producers to start lazily
Browse files Browse the repository at this point in the history
Avoids performing lookups and producer registration on all partitions of a partitioned
topic when using SinglePartition routing without keyed messages
  • Loading branch information
Jack Vanlightly committed Aug 5, 2021
1 parent ce9a6d2 commit 8938ec2
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 45 deletions.
19 changes: 19 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
HashingScheme getHashingScheme() const;

/**
* This config affects producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. Lazy starts occur when a message needs to be routed
* to a partition that the producer has not yet registered and connected to.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition routing policy is used without keyed messages.
* Because producer registration and connection is on demand, this can produce extra
* latency while the registration is being carried out.
* @param true/false as to whether to start partition producers lazily
* @return
*/
ProducerConfiguration& setLazyStartPartitionedProducers(bool);

/**
* The getter associated with setLazyStartPartitionedProducers()
*/
bool getLazyStartPartitionedProducers() const;

/**
* The setter associated with getBlockIfQueueFull()
*/
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
PULSAR_PUBLIC pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);

PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
pulsar_producer_configuration_t *conf, int blockIfQueueFull);

Expand Down
8 changes: 6 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
mutex_(),
creationTimestamp_(TimeUtils::now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

void HandlerBase::start() { grabCnx(); }
void HandlerBase::start() {
state_ = Pending;
grabCnx();
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
Expand Down Expand Up @@ -106,6 +109,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
scheduleReconnection(handler);
break;

case NotStarted:
case Closing:
case Closed:
case Failed:
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HandlerBase {

enum State
{
NotStarted,
Pending,
Ready,
Closing,
Expand Down
63 changes: 51 additions & 12 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}

ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));

if (conf_.getLazyStartPartitionedProducers()) {
createLazyPartitionProducer(partition);
} else {
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}

LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
return producer;
Expand All @@ -108,8 +113,10 @@ void PartitionedProducerImpl::start() {
producers_.push_back(newInternalProducer(i));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
if (!conf_.getLazyStartPartitionedProducers()) {
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
}

Expand Down Expand Up @@ -147,6 +154,20 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
}
}

void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
const auto numPartitions = getNumPartitions();
assert(numProducersCreated_ <= numPartitions);
assert(partitionIndex <= numPartitions);
numProducersCreated_++;
if (numProducersCreated_ == numPartitions) {
state_ = Ready;
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}

// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// get partition for this message from router policy
Expand All @@ -161,7 +182,13 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];

if (conf_.getLazyStartPartitionedProducers() && !producer->isStarted()) {
producer->start();
}

producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
}
Expand Down Expand Up @@ -209,7 +236,10 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
for (auto& producer : producers_) {
if (!producer->isClosed()) {
if (!producer->isStarted()) {
const auto partition = static_cast<unsigned int>(producer->partition());
handleSinglePartitionProducerClose(ResultOk, partition, closeCallback);
} else if (!producer->isClosed()) {
auto self = shared_from_this();
const auto partition = static_cast<unsigned int>(producer->partition());
producer->closeAsync([this, self, partition, closeCallback](Result result) {
Expand Down Expand Up @@ -285,7 +315,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
void PartitionedProducerImpl::triggerFlush() {
Lock producersLock(producersMutex_);
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->triggerFlush();
if ((*prod)->isStarted()) {
(*prod)->triggerFlush();
}
}
}

Expand Down Expand Up @@ -322,7 +354,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
if ((*prod)->isStarted()) {
(*prod)->flushAsync(subFlushCallback);
} else {
subFlushCallback(ResultOk);
}
}
}

Expand Down Expand Up @@ -356,7 +392,10 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,

for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
auto producer = newInternalProducer(i);
producer->start();

if (!conf_.getLazyStartPartitionedProducers()) {
producer->start();
}
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
Expand All @@ -379,8 +418,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
if (producer->isStarted() && !producer->isConnected()) {
return false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

void createLazyPartitionProducer(const unsigned int partitionIndex);
void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
CloseCallback callback);

Expand Down Expand Up @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;

ProducerImplPtr newInternalProducer(unsigned int partition) const;
ProducerImplPtr newInternalProducer(unsigned int partition);

MessageRoutingPolicyPtr routerPolicy_;

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
bool useLazyStartPartitionedProducers) {
impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
return *this;
}

bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
return impl_->useLazyStartPartitionedProducers;
}

ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
Expand Down
12 changes: 9 additions & 3 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ProducerImpl::~ProducerImpl() {
LOG_DEBUG(getName() << "~ProducerImpl");
cancelTimers();
printStats();
if (state_ == Ready) {
if (state_ == Ready || state_ == Pending) {
LOG_WARN(getName() << "Destroyed producer which was not properly closed");
}
}
Expand Down Expand Up @@ -389,7 +389,8 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
}

Lock lock(mutex_);
if (state_ != Ready) {
// producers may be lazily starting and be in the pending state
if (state_ != Ready && state_ != Pending) {
lock.unlock();
releaseSemaphore(payloadSize);
cb(ResultAlreadyClosed, msg.getMessageId());
Expand Down Expand Up @@ -573,7 +574,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {

cancelTimers();

if (state_ != Ready) {
if (state_ != Ready && state_ != Pending) {
lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
Expand Down Expand Up @@ -828,5 +829,10 @@ bool ProducerImpl::isConnected() const {

uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }

bool ProducerImpl::isStarted() const {
Lock lock(mutex_);
return state_ != NotStarted;
}

} // namespace pulsar
/* namespace pulsar */
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ class ProducerImpl : public HandlerBase,
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;
bool isStarted() const;

bool removeCorruptMessage(uint64_t sequenceId);

bool ackReceived(uint64_t sequenceId, MessageId& messageId);

virtual void disconnectProducer();
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ void pulsar_producer_configuration_set_message_router(pulsar_producer_configurat
conf->conf.setMessageRouter(std::make_shared<MessageRoutingPolicy>(router, ctx));
}

void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers) {
conf->conf.setLazyStartPartitionedProducers(useLazyStartPartitionedProducers);
}

int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf) {
return conf->conf.getLazyStartPartitionedProducers();
}

void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
int blockIfQueueFull) {
conf->conf.setBlockIfQueueFull(blockIfQueueFull);
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ void export_config() {
.def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
.def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
.def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
.def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>())
.def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy<copy_const_reference>())
.def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
.def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy<copy_const_reference>())
Expand Down
Loading

0 comments on commit 8938ec2

Please sign in to comment.