From 774148389467781aca7c01bac90af2fba870570c Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Sun, 25 Aug 2024 19:10:07 -0700 Subject: [PATCH] Allow only 1 job queue slot for acquiring inbound ledger. * Log when duplicate concurrent inbound ledger are filtered. * RAII for containers that track concurrent inbound ledger. * Comment on when to asynchronously acquire inbound ledgers, which is possible to be always OK, but should have further review. * Other small logging changes Co-authored-by: Ed Hennis --- src/ripple/app/consensus/RCLConsensus.cpp | 8 +++-- src/ripple/app/consensus/RCLValidations.cpp | 6 ++-- src/ripple/app/ledger/InboundLedgers.h | 14 ++++++-- src/ripple/app/ledger/impl/InboundLedger.cpp | 2 +- src/ripple/app/ledger/impl/InboundLedgers.cpp | 35 +++++++++++++++++++ src/ripple/app/misc/NetworkOPs.cpp | 3 +- src/test/app/LedgerReplay_test.cpp | 8 +++++ 7 files changed, 68 insertions(+), 8 deletions(-) diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index c0ebe06dd7e..0335e9979d2 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -135,8 +135,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) acquiringLedger_ = hash; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() { - app.getInboundLedgers().acquire( + jtADVANCE, + "getConsensusLedger1", + [id = hash, &app = app_, this]() { + JLOG(j_.debug()) + << "JOB advanceLedger getConsensusLedger1 started"; + app.getInboundLedgers().acquireAsync( id, 0, InboundLedger::Reason::CONSENSUS); }); } diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index 6b626569e69..9ffd8829ccc 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -142,8 +142,10 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash) Application* pApp = &app_; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [pApp, hash]() { - pApp->getInboundLedgers().acquire( + jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() { + JLOG(j_.debug()) + << "JOB advanceLedger getConsensusLedger2 started"; + pApp->getInboundLedgers().acquireAsync( hash, 0, InboundLedger::Reason::CONSENSUS); }); return std::nullopt; diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h index b12760153e2..aa173925893 100644 --- a/src/ripple/app/ledger/InboundLedgers.h +++ b/src/ripple/app/ledger/InboundLedgers.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace ripple { @@ -37,11 +38,20 @@ class InboundLedgers virtual ~InboundLedgers() = default; - // VFALCO TODO Should this be called findOrAdd ? - // + // Callers should use this if they possibly need an authoritative + // response immediately. virtual std::shared_ptr acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0; + // Callers should use this if they are known to be executing on the Job + // Queue. TODO review whether all callers of acquire() can use this + // instead. Inbound ledger acquisition is asynchronous anyway. + virtual void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) = 0; + virtual std::shared_ptr find(LedgerHash const& hash) = 0; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 53475988cbf..c8dc005097b 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -536,7 +536,7 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) return; } - if (auto stream = journal_.trace()) + if (auto stream = journal_.debug()) { stream << "Trigger acquiring ledger " << hash_; if (peer) diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index b9b8b9fcfd2..f88137e8501 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -149,6 +150,37 @@ class InboundLedgersImp : public InboundLedgers return ledger; } + void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) override + { + std::unique_lock lock(acquiresMutex_); + try + { + if (pendingAcquires_.contains(hash)) + return; + pendingAcquires_.insert(hash); + lock.unlock(); + acquire(hash, seq, reason); + } + catch (std::exception const& e) + { + JLOG(j_.warn()) + << "Exception thrown for acquiring new inbound ledger " << hash + << ": " << e.what(); + } + catch (...) + { + JLOG(j_.warn()) + << "Unknown exception thrown for acquiring new inbound ledger " + << hash; + } + lock.lock(); + pendingAcquires_.erase(hash); + } + std::shared_ptr find(uint256 const& hash) override { @@ -441,6 +473,9 @@ class InboundLedgersImp : public InboundLedgers beast::insight::Counter mCounter; std::unique_ptr mPeerSetBuilder; + + std::set pendingAcquires_; + std::mutex acquiresMutex_; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index cd85bc9e4e1..5f72ce9cfa0 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1730,7 +1730,8 @@ NetworkOPsImp::checkLastClosedLedger( } JLOG(m_journal.warn()) << "We are not running on the consensus ledger"; - JLOG(m_journal.info()) << "Our LCL: " << getJson({*ourClosed, {}}); + JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash + << getJson({*ourClosed, {}}); JLOG(m_journal.info()) << "Net LCL " << closedLedger; if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index aee24cd7d57..b312fb69fe0 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -106,6 +106,14 @@ class MagicInboundLedgers : public InboundLedgers return {}; } + virtual void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) override + { + } + virtual std::shared_ptr find(LedgerHash const& hash) override {