Skip to content

Commit

Permalink
Allow partitioned producers to start lazily
Browse files Browse the repository at this point in the history
Avoids performing lookups and producer registration on all partitions of a partitioned
topic when using SinglePartition routing without keyed messages
  • Loading branch information
Jack Vanlightly committed Aug 6, 2021
1 parent ce9a6d2 commit d761f7b
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 47 deletions.
19 changes: 19 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
HashingScheme getHashingScheme() const;

/**
* This config affects producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. Lazy starts occur when a message needs to be routed
* to a partition that the producer has not yet registered and connected to.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition routing policy is used without keyed messages.
* Because producer registration and connection is on demand, this can produce extra
* latency while the registration is being carried out.
* @param true/false as to whether to start partition producers lazily
* @return
*/
ProducerConfiguration& setLazyStartPartitionedProducers(bool);

/**
* The getter associated with setLazyStartPartitionedProducers()
*/
bool getLazyStartPartitionedProducers() const;

/**
* The setter associated with getBlockIfQueueFull()
*/
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
PULSAR_PUBLIC pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);

PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
pulsar_producer_configuration_t *conf, int blockIfQueueFull);

Expand Down
14 changes: 12 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
mutex_(),
creationTimestamp_(TimeUtils::now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

void HandlerBase::start() { grabCnx(); }
void HandlerBase::start() {
Lock lock(mutex_);
// guard against concurrent state changes such as closing
if (state_ == NotStarted) {
state_ = Pending;
lock.unlock();

grabCnx();
}
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
Expand Down Expand Up @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
scheduleReconnection(handler);
break;

case NotStarted:
case Closing:
case Closed:
case Failed:
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HandlerBase {

enum State
{
NotStarted,
Pending,
Ready,
Closing,
Expand Down
69 changes: 58 additions & 11 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}

ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));

if (conf_.getLazyStartPartitionedProducers()) {
createLazyPartitionProducer(partition);
} else {
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}

LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
return producer;
Expand All @@ -108,8 +113,10 @@ void PartitionedProducerImpl::start() {
producers_.push_back(newInternalProducer(i));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
if (!conf_.getLazyStartPartitionedProducers()) {
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
}

Expand Down Expand Up @@ -147,6 +154,20 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
}
}

void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
const auto numPartitions = getNumPartitions();
assert(numProducersCreated_ <= numPartitions);
assert(partitionIndex <= numPartitions);
numProducersCreated_++;
if (numProducersCreated_ == numPartitions) {
state_ = Ready;
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}

// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// get partition for this message from router policy
Expand All @@ -161,7 +182,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];

// if the producer is not started (lazy producer), then kick-off the start process
// but only if we're still in the Ready state (i.e not closing or closed)
if (!producer->isStarted()) {
if (assertState(Ready)) {
producer->start();
} else {
callback(ResultAlreadyClosed, msg.getMessageId());
}
}

producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
}
Expand All @@ -175,6 +208,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
lock.unlock();
}

bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
Lock lock(mutex_);
return state_ == state;
}

const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
return producers_[0]->getProducerName();
Expand Down Expand Up @@ -285,7 +323,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
void PartitionedProducerImpl::triggerFlush() {
Lock producersLock(producersMutex_);
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->triggerFlush();
if ((*prod)->isStarted()) {
(*prod)->triggerFlush();
}
}
}

Expand Down Expand Up @@ -322,7 +362,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
if ((*prod)->isStarted()) {
(*prod)->flushAsync(subFlushCallback);
} else {
subFlushCallback(ResultOk);
}
}
}

Expand Down Expand Up @@ -356,7 +400,10 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,

for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
auto producer = newInternalProducer(i);
producer->start();

if (!conf_.getLazyStartPartitionedProducers()) {
producer->start();
}
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
Expand All @@ -379,8 +426,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
if (producer->isStarted() && !producer->isConnected()) {
return false;
}
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

void createLazyPartitionProducer(const unsigned int partitionIndex);
void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
CloseCallback callback);

Expand Down Expand Up @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;

ProducerImplPtr newInternalProducer(unsigned int partition) const;
ProducerImplPtr newInternalProducer(unsigned int partition);

MessageRoutingPolicyPtr routerPolicy_;

Expand All @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
bool assertState(const PartitionedProducerState state);
};

} // namespace pulsar
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
bool useLazyStartPartitionedProducers) {
impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
return *this;
}

bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
return impl_->useLazyStartPartitionedProducers;
}

ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
Expand Down
48 changes: 41 additions & 7 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ ProducerImpl::~ProducerImpl() {
LOG_DEBUG(getName() << "~ProducerImpl");
cancelTimers();
printStats();
if (state_ == Ready) {
if (state_ == Ready || state_ == Pending) {
LOG_WARN(getName() << "Destroyed producer which was not properly closed");
}
}
Expand Down Expand Up @@ -169,6 +169,14 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
const ResponseData& responseData) {
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));

// make sure we're still in the Pending state, closeAsync could have been invoked
// while waiting for this response if using lazy producers
if (!assertState(Pending)) {
LOG_DEBUG("Producer created response received but producer already closed");
failPendingMessages(ResultAlreadyClosed);
return;
}

if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
Expand Down Expand Up @@ -389,7 +397,8 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
}

Lock lock(mutex_);
if (state_ != Ready) {
// producers may be lazily starting and be in the pending state
if (state_ != Ready && state_ != Pending) {
lock.unlock();
releaseSemaphore(payloadSize);
cb(ResultAlreadyClosed, msg.getMessageId());
Expand Down Expand Up @@ -550,10 +559,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
return;
}
LOG_DEBUG(getName() << " - Batch Message Timer expired");
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();

// ignore if the producer is already closing/closed
if (assertState(Ready)) {
Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
}
}

void ProducerImpl::printStats() {
Expand All @@ -568,12 +581,19 @@ void ProducerImpl::printStats() {
void ProducerImpl::closeAsync(CloseCallback callback) {
Lock lock(mutex_);

// if the producer was never started then there is nothing to clean up
if (state_ == NotStarted) {
state_ = Closed;
callback(ResultOk);
return;
}

// Keep a reference to ensure object is kept alive
ProducerImplPtr ptr = shared_from_this();

cancelTimers();

if (state_ != Ready) {
if (state_ != Ready && state_ != Pending) {
lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
Expand Down Expand Up @@ -631,6 +651,10 @@ void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerI
} else {
LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
}

// ensure any remaining send callbacks are called before calling the close callback
failPendingMessages(ResultAlreadyClosed);

if (callback) {
callback(result);
}
Expand Down Expand Up @@ -828,5 +852,15 @@ bool ProducerImpl::isConnected() const {

uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }

bool ProducerImpl::isStarted() const {
Lock lock(mutex_);
return state_ != NotStarted;
}

bool ProducerImpl::assertState(const State state) {
Lock lock(mutex_);
return state_ == state;
}

} // namespace pulsar
/* namespace pulsar */
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ProducerImpl : public HandlerBase,
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;
bool isStarted() const;

bool removeCorruptMessage(uint64_t sequenceId);

Expand Down Expand Up @@ -166,6 +167,8 @@ class ProducerImpl : public HandlerBase,
uint32_t dataKeyGenIntervalSec_;

MemoryLimitController& memoryLimitController_;

bool assertState(const State state);
};

struct ProducerImplCmp {
Expand Down
Loading

0 comments on commit d761f7b

Please sign in to comment.