Skip to content

Commit

Permalink
[fix][cpp] Support retry and apply operation timeout for lookup reque…
Browse files Browse the repository at this point in the history
…sts (#17410)

### Motivation

Currently the operation timeout only works for requests other than
lookup, like SEND and FLOW. However, the lookup requests, which are sent
by `LookupService`, should also apply the operation timeout, see

https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L1019-L1025

In addition, no attempts would be retried if lookup failed even due to a
retryable reason. For example, if some of all configured brokers were
down, the C++ client would fail immediately.

Therefore, this PR intends to retry lookup for some certain cases:
- The connection cannot be established, except connection timeout, which
  is controlled by the connection timeout.
- A `ServiceNotReady` error is received, except it's caused by
  `PulsarServerException`, e.g. the listener name is wrong.

Then, apply the operation timeout to avoid infinite retries.

### Modifications

- Add a `ResultRetryable` error code, which should only be used
  internally. Complete the futures with this error in the cases said
  previously.
- Add a `RetryableLookupService` implementation to support retries based
  on the backoff policy. Replace the directly usages of `LookupService`
  implementations with this class in `ClientImpl`.

The following tests are added:
- `ClientTest.testMultiBrokerUrl`: verify when multiple brokers are
  configured, even if one of them is not available, the creation of
  producer or consumer could still succeed.
- `LookupService.testRetry`: verify all lookup methods could be retried.
- `LookupService.testTimeout`: verify all lookup methods could be
  completed with `ResultTimeout` if no brokers are available.

### TODO

In future, we should add lookup timeout instead of operation timeout for
lookup requests and separate lookup connection pool, see PIP-91.

* Handle ResultRetryable in handleDisconnection

(cherry picked from commit 86ea31b)
  • Loading branch information
BewareMyPower committed Sep 15, 2022
1 parent d0fcade commit 590d91f
Show file tree
Hide file tree
Showing 15 changed files with 348 additions and 46 deletions.
3 changes: 2 additions & 1 deletion pulsar-client-cpp/include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace pulsar {
*/
enum Result
{
ResultOk, /// Operation successful
ResultRetryable = -1, /// An internal error code used for retry
ResultOk = 0, /// Operation successful

ResultUnknownError, /// Unknown error happened on broker

Expand Down
3 changes: 1 addition & 2 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
Future<Result, LookupDataResultPtr> future = promise->getFuture();
return;
}
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
Expand Down Expand Up @@ -160,7 +159,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
promise->setFailed(result);
return;
}

Expand Down
54 changes: 32 additions & 22 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
static const int KeepAliveIntervalInSeconds = 30;

// Convert error codes from protobuf to client API Result
static Result getResult(ServerError serverError) {
static Result getResult(ServerError serverError, const std::string& message) {
switch (serverError) {
case UnknownError:
return ResultUnknownError;
Expand All @@ -75,7 +75,9 @@ static Result getResult(ServerError serverError) {
return ResultConsumerBusy;

case ServiceNotReady:
return ResultServiceUnitNotReady;
// If the error is not caused by a PulsarServerException, treat it as retryable.
return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable
: ResultServiceUnitNotReady;

case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
Expand Down Expand Up @@ -138,7 +140,7 @@ static Result getResult(ServerError serverError) {
}

inline std::ostream& operator<<(std::ostream& os, ServerError error) {
os << getResult(error);
os << getResult(error, "");
return os;
}

Expand Down Expand Up @@ -182,7 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to initialize connection: " << e.what());
close();
close(ResultRetryable);
return;
}

Expand Down Expand Up @@ -368,7 +370,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
cnxString_ = cnxStringStream.str();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
close();
close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
Expand Down Expand Up @@ -450,11 +452,16 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
} else {
close();
if (err == boost::asio::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
close(ResultRetryable);
}
}

Expand Down Expand Up @@ -913,7 +920,8 @@ void ClientConnection::handleIncomingCommand() {
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error());
lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(),
partitionMetadataResponse.message()));
} else {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
Expand Down Expand Up @@ -952,7 +960,8 @@ void ClientConnection::handleIncomingCommand() {
LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
<< consumerStatsResponse.error_message());
}
consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code()));
consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(),
consumerStatsResponse.error_message()));
} else {
LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
Expand Down Expand Up @@ -1000,7 +1009,8 @@ void ClientConnection::handleIncomingCommand() {
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error());
lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
lookupDataPromise->setFailed(
getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
Expand Down Expand Up @@ -1067,7 +1077,7 @@ void ClientConnection::handleIncomingCommand() {

case BaseCommand::ERROR: {
const CommandError& error = incomingCmd_.error();
Result result = getResult(error.error());
Result result = getResult(error.error(), error.message());
LOG_WARN(cnxString_ << "Received error response from server: " << result
<< (error.has_message() ? (" (" + error.message() + ")") : "")
<< " -- req_id: " << error.request_id());
Expand All @@ -1080,7 +1090,7 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();

requestData.promise.setFailed(getResult(error.error()));
requestData.promise.setFailed(result);
requestData.timer->cancel();
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
Expand All @@ -1090,7 +1100,7 @@ void ClientConnection::handleIncomingCommand() {
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

getLastMessageIdPromise.setFailed(getResult(error.error()));
getLastMessageIdPromise.setFailed(result);
} else {
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
Expand All @@ -1099,7 +1109,7 @@ void ClientConnection::handleIncomingCommand() {
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();

getNamespaceTopicsPromise.setFailed(getResult(error.error()));
getNamespaceTopicsPromise.setFailed(result);
} else {
lock.unlock();
}
Expand Down Expand Up @@ -1557,34 +1567,34 @@ void ClientConnection::close(Result result) {
}

lock.unlock();
LOG_INFO(cnxString_ << "Connection closed");
LOG_INFO(cnxString_ << "Connection closed with " << result);

for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}

for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}

connectPromise_.setFailed(result);

// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
kv.second.promise.setFailed(ResultConnectError);
kv.second.promise.setFailed(result);
}
for (auto& kv : pendingLookupRequests) {
kv.second.promise->setFailed(ResultConnectError);
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
kv.second.setFailed(ResultConnectError);
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second.setFailed(ResultConnectError);
kv.second.setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(ResultConnectError);
kv.second.setFailed(result);
}
}

Expand Down
14 changes: 10 additions & 4 deletions pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
#include <stdexcept>
#include <lib/BinaryProtoLookupService.h>
#include <lib/HTTPLookupService.h>
#include <lib/RetryableLookupService.h>
#include <lib/TopicName.h>
#include <algorithm>
#include <random>
Expand Down Expand Up @@ -106,17 +108,21 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
}
LogUtils::setLoggerFactory(std::move(loggerFactory));

LookupServicePtr underlyingLookupServicePtr;
if (serviceNameResolver_.useHttp()) {
LOG_DEBUG("Using HTTP Lookup");
lookupServicePtr_ = std::make_shared<HTTPLookupService>(std::ref(serviceNameResolver_),
std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ =
underlyingLookupServicePtr =
std::make_shared<BinaryProtoLookupService>(std::ref(serviceNameResolver_), std::ref(pool_),
std::cref(clientConfiguration_.getListenerName()));
}

lookupServicePtr_ = RetryableLookupService::create(
underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
}

ClientImpl::~ClientImpl() { shutdown(); }
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <pulsar/Client.h>
#include "ExecutorService.h"
#include "BinaryProtoLookupService.h"
#include "LookupService.h"
#include "MemoryLimitController.h"
#include "ConnectionPool.h"
#include "LookupDataResult.h"
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
}
break;
case CURLE_COULDNT_CONNECT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultRetryable;
break;
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
Expand Down
14 changes: 6 additions & 8 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con

handler->connection_.reset();

if (result == ResultRetryable) {
scheduleReconnection(handler);
return;
}

switch (state) {
case Pending:
case Ready:
Expand All @@ -121,14 +126,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
}
}

bool HandlerBase::isRetriableError(Result result) {
switch (result) {
case ResultServiceUnitNotReady:
return true;
default:
return false;
}
}
bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; }

void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
const auto state = handler->state_.load();
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace pulsar {

const char* strResult(Result result) {
switch (result) {
case ResultRetryable:
return "Retryable";

case ResultOk:
return "Ok";

Expand Down
Loading

0 comments on commit 590d91f

Please sign in to comment.