From 34ca45713244d0defc39549dd43821784b2a5c1d Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Wed, 16 Feb 2022 22:09:56 -0800 Subject: [PATCH] Improve stop signaling for Application --- src/ripple/app/main/Application.cpp | 206 +++++++++++++--------------- src/ripple/rpc/handlers/Stop.cpp | 2 - 2 files changed, 94 insertions(+), 114 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 933c4939118..be64a35cd85 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -225,9 +225,11 @@ class ApplicationImp : public Application, public BasicApp boost::asio::signal_set m_signals; - std::condition_variable cv_; - mutable std::mutex mut_; - bool isTimeToStop = false; + // Once we get C++20, we could use `std::atomic_flag` for `isTimeToStop` + // and eliminate the need for the condition variable and the mutex. + std::condition_variable stoppingCondition_; + mutable std::mutex stoppingMutex_; + std::atomic isTimeToStop = false; std::atomic checkSigs_; @@ -970,100 +972,6 @@ class ApplicationImp : public Application, public BasicApp return true; } - //-------------------------------------------------------------------------- - - // Called to indicate shutdown. - void - stop() - { - JLOG(m_journal.debug()) << "Application stopping"; - - m_io_latency_sampler.cancel_async(); - - // VFALCO Enormous hack, we have to force the probe to cancel - // before we stop the io_service queue or else it never - // unblocks in its destructor. The fix is to make all - // io_objects gracefully handle exit so that we can - // naturally return from io_service::run() instead of - // forcing a call to io_service::stop() - m_io_latency_sampler.cancel(); - - m_resolver->stop_async(); - - // NIKB This is a hack - we need to wait for the resolver to - // stop. before we stop the io_server_queue or weird - // things will happen. - m_resolver->stop(); - - { - boost::system::error_code ec; - sweepTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "Application: sweepTimer cancel error: " << ec.message(); - } - - ec.clear(); - entropyTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "Application: entropyTimer cancel error: " - << ec.message(); - } - } - // Make sure that any waitHandlers pending in our timers are done - // before we declare ourselves stopped. - using namespace std::chrono_literals; - waitHandlerCounter_.join("Application", 1s, m_journal); - - mValidations.flush(); - - validatorSites_->stop(); - - // TODO Store manifests in manifests.sqlite instead of wallet.db - validatorManifests_->save( - getWalletDB(), - "ValidatorManifests", - [this](PublicKey const& pubKey) { - return validators().listed(pubKey); - }); - - publisherManifests_->save( - getWalletDB(), - "PublisherManifests", - [this](PublicKey const& pubKey) { - return validators().trustedPublisher(pubKey); - }); - - // The order of these stop calls is delicate. - // Re-ordering them risks undefined behavior. - m_loadManager->stop(); - m_shaMapStore->stop(); - m_jobQueue->stop(); - if (shardArchiveHandler_) - shardArchiveHandler_->stop(); - if (overlay_) - overlay_->stop(); - if (shardStore_) - shardStore_->stop(); - grpcServer_->stop(); - m_networkOPs->stop(); - serverHandler_->stop(); - m_ledgerReplayer->stop(); - m_inboundTransactions->stop(); - m_inboundLedgers->stop(); - ledgerCleaner_->stop(); - if (reportingETL_) - reportingETL_->stop(); - if (auto pg = dynamic_cast( - &*mRelationalDBInterface)) - pg->stop(); - m_nodeStore->stop(); - perfLog_->stop(); - } - //-------------------------------------------------------------------------- // // PropertyStream @@ -1636,27 +1544,102 @@ ApplicationImp::run() } { - std::unique_lock lk{mut_}; - cv_.wait(lk, [this] { return isTimeToStop; }); + std::unique_lock lk{stoppingMutex_}; + stoppingCondition_.wait(lk, [this] { return isTimeToStop.load(); }); + } + + JLOG(m_journal.debug()) << "Application stopping"; + + m_io_latency_sampler.cancel_async(); + + // VFALCO Enormous hack, we have to force the probe to cancel + // before we stop the io_service queue or else it never + // unblocks in its destructor. The fix is to make all + // io_objects gracefully handle exit so that we can + // naturally return from io_service::run() instead of + // forcing a call to io_service::stop() + m_io_latency_sampler.cancel(); + + m_resolver->stop_async(); + + // NIKB This is a hack - we need to wait for the resolver to + // stop. before we stop the io_server_queue or weird + // things will happen. + m_resolver->stop(); + + { + boost::system::error_code ec; + sweepTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "Application: sweepTimer cancel error: " << ec.message(); + } + + ec.clear(); + entropyTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "Application: entropyTimer cancel error: " << ec.message(); + } } - JLOG(m_journal.info()) << "Received shutdown request"; - stop(); + // Make sure that any waitHandlers pending in our timers are done + // before we declare ourselves stopped. + using namespace std::chrono_literals; + + waitHandlerCounter_.join("Application", 1s, m_journal); + + mValidations.flush(); + + validatorSites_->stop(); + + // TODO Store manifests in manifests.sqlite instead of wallet.db + validatorManifests_->save( + getWalletDB(), "ValidatorManifests", [this](PublicKey const& pubKey) { + return validators().listed(pubKey); + }); + + publisherManifests_->save( + getWalletDB(), "PublisherManifests", [this](PublicKey const& pubKey) { + return validators().trustedPublisher(pubKey); + }); + + // The order of these stop calls is delicate. + // Re-ordering them risks undefined behavior. + m_loadManager->stop(); + m_shaMapStore->stop(); + m_jobQueue->stop(); + if (shardArchiveHandler_) + shardArchiveHandler_->stop(); + if (overlay_) + overlay_->stop(); + if (shardStore_) + shardStore_->stop(); + grpcServer_->stop(); + m_networkOPs->stop(); + serverHandler_->stop(); + m_ledgerReplayer->stop(); + m_inboundTransactions->stop(); + m_inboundLedgers->stop(); + ledgerCleaner_->stop(); + if (reportingETL_) + reportingETL_->stop(); + if (auto pg = dynamic_cast( + &*mRelationalDBInterface)) + pg->stop(); + m_nodeStore->stop(); + perfLog_->stop(); + JLOG(m_journal.info()) << "Done."; } void ApplicationImp::signalStop() { - // Unblock the main thread (which is sitting in run()). - // When we get C++20 this can use std::latch. - std::lock_guard lk{mut_}; - - if (!isTimeToStop) - { - isTimeToStop = true; - cv_.notify_all(); - } + if (!isTimeToStop.exchange(true)) + stoppingCondition_.notify_all(); } bool @@ -1674,8 +1657,7 @@ ApplicationImp::checkSigs(bool check) bool ApplicationImp::isStopping() const { - std::lock_guard lk{mut_}; - return isTimeToStop; + return isTimeToStop.load(); } int diff --git a/src/ripple/rpc/handlers/Stop.cpp b/src/ripple/rpc/handlers/Stop.cpp index dc62ff2ebea..9467556969d 100644 --- a/src/ripple/rpc/handlers/Stop.cpp +++ b/src/ripple/rpc/handlers/Stop.cpp @@ -32,9 +32,7 @@ struct JsonContext; Json::Value doStop(RPC::JsonContext& context) { - std::unique_lock lock{context.app.getMasterMutex()}; context.app.signalStop(); - return RPC::makeObjectValue(systemName() + " server stopping"); }