Skip to content

Commit

Permalink
Allow partitioned producers to start lazily
Browse files Browse the repository at this point in the history
Avoids performing lookups and producer registration on all partitions of a partitioned
topic when using SinglePartition routing without keyed messages
  • Loading branch information
Jack Vanlightly authored and Vanlightly committed Aug 10, 2021
1 parent 9311f7f commit 0fa7215
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 53 deletions.
19 changes: 19 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
HashingScheme getHashingScheme() const;

/**
* This config affects producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. Lazy starts occur when a message needs to be routed
* to a partition that the producer has not yet registered and connected to.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition routing policy is used without keyed messages.
* Because producer registration and connection is on demand, this can produce extra
* latency while the registration is being carried out.
* @param true/false as to whether to start partition producers lazily
* @return
*/
ProducerConfiguration& setLazyStartPartitionedProducers(bool);

/**
* The getter associated with setLazyStartPartitionedProducers()
*/
bool getLazyStartPartitionedProducers() const;

/**
* The setter associated with getBlockIfQueueFull()
*/
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
PULSAR_PUBLIC pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);

PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
pulsar_producer_configuration_t *conf, int blockIfQueueFull);

Expand Down
14 changes: 12 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
mutex_(),
creationTimestamp_(TimeUtils::now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

void HandlerBase::start() { grabCnx(); }
void HandlerBase::start() {
Lock lock(mutex_);
// guard against concurrent state changes such as closing
if (state_ == NotStarted) {
state_ = Pending;
lock.unlock();

grabCnx();
}
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
Expand Down Expand Up @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
scheduleReconnection(handler);
break;

case NotStarted:
case Closing:
case Closed:
case Failed:
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HandlerBase {

enum State
{
NotStarted,
Pending,
Ready,
Closing,
Expand Down
69 changes: 58 additions & 11 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}

ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));

if (conf_.getLazyStartPartitionedProducers()) {
createLazyPartitionProducer(partition);
} else {
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}

LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
return producer;
Expand All @@ -108,8 +113,10 @@ void PartitionedProducerImpl::start() {
producers_.push_back(newInternalProducer(i));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
if (!conf_.getLazyStartPartitionedProducers()) {
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
}

Expand Down Expand Up @@ -147,6 +154,20 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
}
}

void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
const auto numPartitions = getNumPartitions();
assert(numProducersCreated_ <= numPartitions);
assert(partitionIndex <= numPartitions);
numProducersCreated_++;
if (numProducersCreated_ == numPartitions) {
state_ = Ready;
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}

// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// get partition for this message from router policy
Expand All @@ -161,7 +182,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];

// if the producer is not started (lazy producer), then kick-off the start process
// but only if we're still in the Ready state (i.e not closing or closed)
if (!producer->isStarted()) {
if (assertState(Ready)) {
producer->start();
} else {
callback(ResultAlreadyClosed, msg.getMessageId());
}
}

producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
}
Expand All @@ -175,6 +208,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
lock.unlock();
}

bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
Lock lock(mutex_);
return state_ == state;
}

const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
return producers_[0]->getProducerName();
Expand Down Expand Up @@ -285,7 +323,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
void PartitionedProducerImpl::triggerFlush() {
Lock producersLock(producersMutex_);
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->triggerFlush();
if ((*prod)->isStarted()) {
(*prod)->triggerFlush();
}
}
}

Expand Down Expand Up @@ -322,7 +362,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
if ((*prod)->isStarted()) {
(*prod)->flushAsync(subFlushCallback);
} else {
subFlushCallback(ResultOk);
}
}
}

Expand Down Expand Up @@ -356,7 +400,10 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,

for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
auto producer = newInternalProducer(i);
producer->start();

if (!conf_.getLazyStartPartitionedProducers()) {
producer->start();
}
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
Expand All @@ -379,8 +426,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
if (producer->isStarted() && !producer->isConnected()) {
return false;
}
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

void createLazyPartitionProducer(const unsigned int partitionIndex);
void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
CloseCallback callback);

Expand Down Expand Up @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;

ProducerImplPtr newInternalProducer(unsigned int partition) const;
ProducerImplPtr newInternalProducer(unsigned int partition);

MessageRoutingPolicyPtr routerPolicy_;

Expand All @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
bool assertState(const PartitionedProducerState state);
};

} // namespace pulsar
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
bool useLazyStartPartitionedProducers) {
impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
return *this;
}

bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
return impl_->useLazyStartPartitionedProducers;
}

ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
Expand Down
Loading

0 comments on commit 0fa7215

Please sign in to comment.