From 590d91f4a70a67415b0ccd232c3294fcec80af5a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 15 Sep 2022 14:38:02 +0800 Subject: [PATCH] [fix][cpp] Support retry and apply operation timeout for lookup requests (#17410) ### Motivation Currently the operation timeout only works for requests other than lookup, like SEND and FLOW. However, the lookup requests, which are sent by `LookupService`, should also apply the operation timeout, see https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L1019-L1025 In addition, no attempts would be retried if lookup failed even due to a retryable reason. For example, if some of all configured brokers were down, the C++ client would fail immediately. Therefore, this PR intends to retry lookup for some certain cases: - The connection cannot be established, except connection timeout, which is controlled by the connection timeout. - A `ServiceNotReady` error is received, except it's caused by `PulsarServerException`, e.g. the listener name is wrong. Then, apply the operation timeout to avoid infinite retries. ### Modifications - Add a `ResultRetryable` error code, which should only be used internally. Complete the futures with this error in the cases said previously. - Add a `RetryableLookupService` implementation to support retries based on the backoff policy. Replace the directly usages of `LookupService` implementations with this class in `ClientImpl`. The following tests are added: - `ClientTest.testMultiBrokerUrl`: verify when multiple brokers are configured, even if one of them is not available, the creation of producer or consumer could still succeed. - `LookupService.testRetry`: verify all lookup methods could be retried. - `LookupService.testTimeout`: verify all lookup methods could be completed with `ResultTimeout` if no brokers are available. ### TODO In future, we should add lookup timeout instead of operation timeout for lookup requests and separate lookup connection pool, see PIP-91. * Handle ResultRetryable in handleDisconnection (cherry picked from commit 86ea31b2409a7b62624278dc133d443ca62ffb76) --- pulsar-client-cpp/include/pulsar/Result.h | 3 +- .../lib/BinaryProtoLookupService.cc | 3 +- pulsar-client-cpp/lib/ClientConnection.cc | 54 ++++--- pulsar-client-cpp/lib/ClientImpl.cc | 14 +- pulsar-client-cpp/lib/ClientImpl.h | 2 +- pulsar-client-cpp/lib/HTTPLookupService.cc | 3 + pulsar-client-cpp/lib/HandlerBase.cc | 14 +- pulsar-client-cpp/lib/Result.cc | 3 + .../lib/RetryableLookupService.h | 151 ++++++++++++++++++ pulsar-client-cpp/lib/ServiceNameResolver.h | 2 + pulsar-client-cpp/lib/SynchronizedHashMap.h | 8 +- pulsar-client-cpp/lib/Utils.h | 2 + pulsar-client-cpp/tests/ClientTest.cc | 29 +++- pulsar-client-cpp/tests/LookupServiceTest.cc | 95 ++++++++++- pulsar-client-cpp/tests/PulsarFriend.h | 11 ++ 15 files changed, 348 insertions(+), 46 deletions(-) create mode 100644 pulsar-client-cpp/lib/RetryableLookupService.h diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index a7fdd04f99aa3..cc7b457528e1f 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -29,7 +29,8 @@ namespace pulsar { */ enum Result { - ResultOk, /// Operation successful + ResultRetryable = -1, /// An internal error code used for retry + ResultOk = 0, /// Operation successful ResultUnknownError, /// Unknown error happened on broker diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index bff29237c7164..ff42b91b3f748 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -111,7 +111,6 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str LookupDataResultPromisePtr promise) { if (result != ResultOk) { promise->setFailed(result); - Future future = promise->getFuture(); return; } LookupDataResultPromisePtr lookupPromise = std::make_shared(); @@ -160,7 +159,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise) { if (result != ResultOk) { - promise->setFailed(ResultConnectError); + promise->setFailed(result); return; } diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index e6cab07cb9b08..45733248c1e76 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -51,7 +51,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024; static const int KeepAliveIntervalInSeconds = 30; // Convert error codes from protobuf to client API Result -static Result getResult(ServerError serverError) { +static Result getResult(ServerError serverError, const std::string& message) { switch (serverError) { case UnknownError: return ResultUnknownError; @@ -75,7 +75,9 @@ static Result getResult(ServerError serverError) { return ResultConsumerBusy; case ServiceNotReady: - return ResultServiceUnitNotReady; + // If the error is not caused by a PulsarServerException, treat it as retryable. + return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable + : ResultServiceUnitNotReady; case ProducerBlockedQuotaExceededError: return ResultProducerBlockedQuotaExceededError; @@ -138,7 +140,7 @@ static Result getResult(ServerError serverError) { } inline std::ostream& operator<<(std::ostream& os, ServerError error) { - os << getResult(error); + os << getResult(error, ""); return os; } @@ -182,7 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: consumerStatsRequestTimer_ = executor_->createDeadlineTimer(); } catch (const boost::system::system_error& e) { LOG_ERROR("Failed to initialize connection: " << e.what()); - close(); + close(ResultRetryable); return; } @@ -368,7 +370,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, cnxString_ = cnxStringStream.str(); } catch (const boost::system::system_error& e) { LOG_ERROR("Failed to get endpoint: " << e.what()); - close(); + close(ResultRetryable); return; } if (logicalAddress_ == physicalAddress_) { @@ -450,11 +452,16 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(), std::placeholders::_1, ++endpointIterator)); } else { - close(); + if (err == boost::asio::error::operation_aborted) { + // TCP connect timeout, which is not retryable + close(); + } else { + close(ResultRetryable); + } } } else { LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); - close(); + close(ResultRetryable); } } @@ -913,7 +920,8 @@ void ClientConnection::handleIncomingCommand() { << " error: " << partitionMetadataResponse.error() << " msg: " << partitionMetadataResponse.message()); checkServerError(partitionMetadataResponse.error()); - lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error())); + lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(), + partitionMetadataResponse.message())); } else { LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() @@ -952,7 +960,8 @@ void ClientConnection::handleIncomingCommand() { LOG_ERROR(cnxString_ << " Failed to get consumer stats - " << consumerStatsResponse.error_message()); } - consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code())); + consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(), + consumerStatsResponse.error_message())); } else { LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats " "response from server. req_id: " @@ -1000,7 +1009,8 @@ void ClientConnection::handleIncomingCommand() { << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); checkServerError(lookupTopicResponse.error()); - lookupDataPromise->setFailed(getResult(lookupTopicResponse.error())); + lookupDataPromise->setFailed( + getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); } else { LOG_ERROR(cnxString_ << "Failed lookup req_id: " << lookupTopicResponse.request_id() @@ -1067,7 +1077,7 @@ void ClientConnection::handleIncomingCommand() { case BaseCommand::ERROR: { const CommandError& error = incomingCmd_.error(); - Result result = getResult(error.error()); + Result result = getResult(error.error(), error.message()); LOG_WARN(cnxString_ << "Received error response from server: " << result << (error.has_message() ? (" (" + error.message() + ")") : "") << " -- req_id: " << error.request_id()); @@ -1080,7 +1090,7 @@ void ClientConnection::handleIncomingCommand() { pendingRequests_.erase(it); lock.unlock(); - requestData.promise.setFailed(getResult(error.error())); + requestData.promise.setFailed(result); requestData.timer->cancel(); } else { PendingGetLastMessageIdRequestsMap::iterator it = @@ -1090,7 +1100,7 @@ void ClientConnection::handleIncomingCommand() { pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - getLastMessageIdPromise.setFailed(getResult(error.error())); + getLastMessageIdPromise.setFailed(result); } else { PendingGetNamespaceTopicsMap::iterator it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); @@ -1099,7 +1109,7 @@ void ClientConnection::handleIncomingCommand() { pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); - getNamespaceTopicsPromise.setFailed(getResult(error.error())); + getNamespaceTopicsPromise.setFailed(result); } else { lock.unlock(); } @@ -1557,34 +1567,34 @@ void ClientConnection::close(Result result) { } lock.unlock(); - LOG_INFO(cnxString_ << "Connection closed"); + LOG_INFO(cnxString_ << "Connection closed with " << result); for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) { - HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second); + HandlerBase::handleDisconnection(result, shared_from_this(), it->second); } for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) { - HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second); + HandlerBase::handleDisconnection(result, shared_from_this(), it->second); } connectPromise_.setFailed(result); // Fail all pending requests, all these type are map whose value type contains the Promise object for (auto& kv : pendingRequests) { - kv.second.promise.setFailed(ResultConnectError); + kv.second.promise.setFailed(result); } for (auto& kv : pendingLookupRequests) { - kv.second.promise->setFailed(ResultConnectError); + kv.second.promise->setFailed(result); } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later"); - kv.second.setFailed(ResultConnectError); + kv.second.setFailed(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second.setFailed(ResultConnectError); + kv.second.setFailed(result); } for (auto& kv : pendingGetNamespaceTopicsRequests) { - kv.second.setFailed(ResultConnectError); + kv.second.setFailed(result); } } diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 2c983421e67c0..08adb1d6423ea 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -30,7 +30,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -106,17 +108,21 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& } LogUtils::setLoggerFactory(std::move(loggerFactory)); + LookupServicePtr underlyingLookupServicePtr; if (serviceNameResolver_.useHttp()) { LOG_DEBUG("Using HTTP Lookup"); - lookupServicePtr_ = std::make_shared(std::ref(serviceNameResolver_), - std::cref(clientConfiguration_), - std::cref(clientConfiguration_.getAuthPtr())); + underlyingLookupServicePtr = std::make_shared( + std::ref(serviceNameResolver_), std::cref(clientConfiguration_), + std::cref(clientConfiguration_.getAuthPtr())); } else { LOG_DEBUG("Using Binary Lookup"); - lookupServicePtr_ = + underlyingLookupServicePtr = std::make_shared(std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_.getListenerName())); } + + lookupServicePtr_ = RetryableLookupService::create( + underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_); } ClientImpl::~ClientImpl() { shutdown(); } diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 1f046e2427c04..466461ae71ea0 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -21,7 +21,7 @@ #include #include "ExecutorService.h" -#include "BinaryProtoLookupService.h" +#include "LookupService.h" #include "MemoryLimitController.h" #include "ConnectionPool.h" #include "LookupDataResult.h" diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 5b98663239201..61392666a93fa 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -265,6 +265,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & } break; case CURLE_COULDNT_CONNECT: + LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res); + retResult = ResultRetryable; + break; case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_RESOLVE_HOST: case CURLE_HTTP_RETURNED_ERROR: diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index 5d2244f7552df..506207ea13289 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -105,6 +105,11 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con handler->connection_.reset(); + if (result == ResultRetryable) { + scheduleReconnection(handler); + return; + } + switch (state) { case Pending: case Ready: @@ -121,14 +126,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con } } -bool HandlerBase::isRetriableError(Result result) { - switch (result) { - case ResultServiceUnitNotReady: - return true; - default: - return false; - } -} +bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; } void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { const auto state = handler->state_.load(); diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc index d3322aa380557..6682341b2a396 100644 --- a/pulsar-client-cpp/lib/Result.cc +++ b/pulsar-client-cpp/lib/Result.cc @@ -25,6 +25,9 @@ namespace pulsar { const char* strResult(Result result) { switch (result) { + case ResultRetryable: + return "Retryable"; + case ResultOk: return "Ok"; diff --git a/pulsar-client-cpp/lib/RetryableLookupService.h b/pulsar-client-cpp/lib/RetryableLookupService.h new file mode 100644 index 0000000000000..a8f7bfcec3b3c --- /dev/null +++ b/pulsar-client-cpp/lib/RetryableLookupService.h @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include +#include "lib/Backoff.h" +#include "lib/ExecutorService.h" +#include "lib/LookupService.h" +#include "lib/SynchronizedHashMap.h" +#include "lib/LogUtils.h" + +namespace pulsar { + +class RetryableLookupService : public LookupService, + public std::enable_shared_from_this { + private: + friend class PulsarFriend; + struct PassKey { + explicit PassKey() {} + }; + + public: + template + explicit RetryableLookupService(PassKey, Args&&... args) + : RetryableLookupService(std::forward(args)...) {} + + template + static std::shared_ptr create(Args&&... args) { + return std::make_shared(PassKey{}, std::forward(args)...); + } + + LookupResultFuture getBroker(const TopicName& topicName) override { + return executeAsync("get-broker-" + topicName.toString(), + [this, topicName] { return lookupService_->getBroker(topicName); }); + } + + Future getPartitionMetadataAsync(const TopicNamePtr& topicName) override { + return executeAsync( + "get-partition-metadata-" + topicName->toString(), + [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); }); + } + + Future getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override { + return executeAsync( + "get-topics-of-namespace-" + nsName->toString(), + [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); }); + } + + template + Future executeAsync(const std::string& key, std::function()> f) { + Promise promise; + executeAsyncImpl(key, f, promise, timeout_); + return promise.getFuture(); + } + + private: + const std::shared_ptr lookupService_; + const TimeDuration timeout_; + Backoff backoff_; + const ExecutorServiceProviderPtr executorProvider_; + + using Timer = boost::asio::deadline_timer; + using TimerPtr = std::unique_ptr; + SynchronizedHashMap backoffTimers_; + + RetryableLookupService(std::shared_ptr lookupService, int timeoutSeconds, + ExecutorServiceProviderPtr executorProvider) + : lookupService_(lookupService), + timeout_(boost::posix_time::seconds(timeoutSeconds)), + backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_, + boost::posix_time::milliseconds(0)), + executorProvider_(executorProvider) {} + + std::weak_ptr weak_from_this() noexcept { return shared_from_this(); } + + // NOTE: Set the visibility to fix compilation error in GCC 6 + template +#ifndef _WIN32 + __attribute__((visibility("hidden"))) +#endif + void + executeAsyncImpl(const std::string& key, std::function()> f, Promise promise, + TimeDuration remainingTime) { + auto weakSelf = weak_from_this(); + f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + + if (result == ResultOk) { + backoffTimers_.remove(key); + promise.setValue(value); + } else if (result == ResultRetryable) { + if (remainingTime.total_milliseconds() <= 0) { + backoffTimers_.remove(key); + promise.setFailed(ResultTimeout); + return; + } + + auto it = backoffTimers_.emplace( + key, TimerPtr{new Timer(executorProvider_->get()->getIOService())}); + auto& timer = *(it.first->second); + auto delay = std::min(backoff_.next(), remainingTime); + timer.expires_from_now(delay); + + auto nextRemainingTime = remainingTime - delay; + LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds() + << " ms, remaining time: " << nextRemainingTime.total_milliseconds() + << " ms"); + timer.async_wait([this, weakSelf, key, f, promise, + nextRemainingTime](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (!self || ec) { + if (self && ec != boost::asio::error::operation_aborted) { + LOG_ERROR("The timer for " << key << " failed: " << ec.message()); + } + // The lookup service has been destructed or the timer has been cancelled + promise.setFailed(ResultTimeout); + return; + } + executeAsyncImpl(key, f, promise, nextRemainingTime); + }); + } else { + backoffTimers_.remove(key); + promise.setFailed(result); + } + }); + } + + DECLARE_LOG_OBJECT() +}; + +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ServiceNameResolver.h b/pulsar-client-cpp/lib/ServiceNameResolver.h index 60351d8037fa3..cf7a5832697cb 100644 --- a/pulsar-client-cpp/lib/ServiceNameResolver.h +++ b/pulsar-client-cpp/lib/ServiceNameResolver.h @@ -52,6 +52,8 @@ class ServiceNameResolver { const ServiceURI serviceUri_; const size_t numAddresses_; std::atomic_size_t index_{0}; + + friend class PulsarFriend; }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h index 184ca6a283623..831d1e83bbd6e 100644 --- a/pulsar-client-cpp/lib/SynchronizedHashMap.h +++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h @@ -36,6 +36,8 @@ class SynchronizedHashMap { public: using OptValue = Optional; using PairVector = std::vector>; + using MapType = std::unordered_map; + using Iterator = typename MapType::iterator; SynchronizedHashMap() = default; @@ -46,9 +48,9 @@ class SynchronizedHashMap { } template - void emplace(Args&&... args) { + std::pair emplace(Args&&... args) { Lock lock(mutex_); - data_.emplace(std::forward(args)...); + return data_.emplace(std::forward(args)...); } void forEach(std::function f) const { @@ -105,7 +107,7 @@ class SynchronizedHashMap { Lock lock(mutex_); auto it = data_.find(key); if (it != data_.end()) { - auto result = OptValue::of(it->second); + auto result = OptValue::of(std::move(it->second)); data_.erase(it); return result; } else { diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h index e662ecf08f4cc..b0f500ef0acab 100644 --- a/pulsar-client-cpp/lib/Utils.h +++ b/pulsar-client-cpp/lib/Utils.h @@ -85,6 +85,7 @@ class Optional { * Create an Optional with the bound value */ static Optional of(const T& value) { return Optional(value); } + static Optional of(T&& value) { return Optional(std::move(value)); } /** * Create an empty optional @@ -95,6 +96,7 @@ class Optional { private: Optional(const T& value) : value_(value), present_(true) {} + Optional(T&& value) : value_(std::move(value)), present_(true) {} T value_; bool present_; diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 58c889f074a92..9270a61afd42e 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -97,14 +97,14 @@ TEST(ClientTest, testSwHwChecksum) { TEST(ClientTest, testServerConnectError) { const std::string topic = "test-server-connect-error"; - Client client("pulsar://localhost:65535"); + Client client("pulsar://localhost:65535", ClientConfiguration().setOperationTimeoutSeconds(1)); Producer producer; - ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer)); + ASSERT_EQ(ResultTimeout, client.createProducer(topic, producer)); Consumer consumer; - ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer)); + ASSERT_EQ(ResultTimeout, client.subscribe(topic, "sub", consumer)); Reader reader; ReaderConfiguration readerConf; - ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(ResultTimeout, client.createReader(topic, MessageId::earliest(), readerConf, reader)); client.close(); } @@ -133,6 +133,9 @@ TEST(ClientTest, testConnectTimeout) { clientLow.close(); clientDefault.close(); + + ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::ready); + ASSERT_EQ(futureDefault.get(), ResultConnectError); } TEST(ClientTest, testGetNumberOfReferences) { @@ -281,3 +284,21 @@ TEST(ClientTest, testWrongListener) { ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); ASSERT_EQ(ResultOk, client.close()); } + +TEST(ClientTest, testMultiBrokerUrl) { + const std::string topic = "client-test-multi-broker-url-" + std::to_string(time(nullptr)); + Client client("pulsar://localhost:6000,localhost"); // the 1st address is not reachable + + Producer producer; + PulsarFriend::setServiceUrlIndex(client, 0); + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + + Consumer consumer; + PulsarFriend::setServiceUrlIndex(client, 0); + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Reader reader; + PulsarFriend::setServiceUrlIndex(client, 0); + ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + client.close(); +} diff --git a/pulsar-client-cpp/tests/LookupServiceTest.cc b/pulsar-client-cpp/tests/LookupServiceTest.cc index 14c5695a82866..77c1e1aaef234 100644 --- a/pulsar-client-cpp/tests/LookupServiceTest.cc +++ b/pulsar-client-cpp/tests/LookupServiceTest.cc @@ -28,6 +28,8 @@ #include #include #include "LogUtils.h" +#include "RetryableLookupService.h" +#include "PulsarFriend.h" #include @@ -135,7 +137,7 @@ static void testMultiAddresses(LookupService& lookupService) { auto verifySuccessCount = [&results] { // Only half of them succeeded ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2); - ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultConnectError), numRequests / 2); + ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2); }; for (int i = 0; i < numRequests; i++) { @@ -179,3 +181,94 @@ TEST(LookupServiceTest, testMultiAddresses) { std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled()); testMultiAddresses(*httpLookupServicePtr); } +TEST(LookupServiceTest, testRetry) { + auto executorProvider = std::make_shared(1); + ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true); + ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost"); + + auto lookupService = RetryableLookupService::create( + std::make_shared(serviceNameResolver, pool, ""), 30 /* seconds */, + executorProvider); + + PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); + auto topicNamePtr = TopicName::get("lookup-service-test-retry"); + auto future1 = lookupService->getBroker(*topicNamePtr); + LookupService::LookupResult lookupResult; + ASSERT_EQ(ResultOk, future1.get(lookupResult)); + LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress + << ", physicalAddress: " << lookupResult.physicalAddress); + + PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); + auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); + LookupDataResultPtr lookupDataResultPtr; + ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr)); + LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions"); + + PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); + auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName()); + NamespaceTopicsPtr namespaceTopicsPtr; + ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); + LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); + + std::atomic_int retryCount{0}; + constexpr int totalRetryCount = 3; + auto future4 = lookupService->executeAsync("key", [&retryCount]() -> Future { + Promise promise; + if (++retryCount < totalRetryCount) { + LOG_INFO("Retry count: " << retryCount); + promise.setFailed(ResultRetryable); + } else { + LOG_INFO("Retry done with " << retryCount << " times"); + promise.setValue(100); + } + return promise.getFuture(); + }); + int customResult = 0; + ASSERT_EQ(ResultOk, future4.get(customResult)); + ASSERT_EQ(customResult, 100); + ASSERT_EQ(retryCount.load(), totalRetryCount); + + ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); +} + +TEST(LookupServiceTest, testTimeout) { + auto executorProvider = std::make_shared(1); + ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true); + ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904"); + + constexpr int timeoutInSeconds = 2; + auto lookupService = RetryableLookupService::create( + std::make_shared(serviceNameResolver, pool, ""), timeoutInSeconds, + executorProvider); + auto topicNamePtr = TopicName::get("lookup-service-test-retry"); + + decltype(std::chrono::high_resolution_clock::now()) startTime; + auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); }; + auto afterMethod = [&startTime](const std::string& name) { + auto timeInterval = std::chrono::duration_cast( + std::chrono::high_resolution_clock::now() - startTime) + .count(); + LOG_INFO(name << " took " << timeInterval << " seconds"); + ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L); + }; + + beforeMethod(); + auto future1 = lookupService->getBroker(*topicNamePtr); + LookupService::LookupResult lookupResult; + ASSERT_EQ(ResultTimeout, future1.get(lookupResult)); + afterMethod("getBroker"); + + beforeMethod(); + auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); + LookupDataResultPtr lookupDataResultPtr; + ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr)); + afterMethod("getPartitionMetadataAsync"); + + beforeMethod(); + auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName()); + NamespaceTopicsPtr namespaceTopicsPtr; + ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); + afterMethod("getTopicsOfNamespaceAsync"); + + ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); +} diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index b6fb219eabb72..d9f9923c7ce2d 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -25,6 +25,7 @@ #include "lib/ConsumerImpl.h" #include "lib/MultiTopicsConsumerImpl.h" #include "lib/ReaderImpl.h" +#include "lib/RetryableLookupService.h" using std::string; @@ -118,5 +119,15 @@ class PulsarFriend { static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) { return backoff.firstBackoffTime_; } + + static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; } + + static void setServiceUrlIndex(const Client& client, size_t index) { + setServiceUrlIndex(client.impl_->serviceNameResolver_, index); + } + + static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) { + return lookupService.backoffTimers_.size(); + } }; } // namespace pulsar