Skip to content

Commit

Permalink
Optimize when to acquire ledgers from the network.
Browse files Browse the repository at this point in the history
Particularly avoid acquiring ledgers likely to be
produced locally very soon.

Derived from XRPLF#4764

Co-authored-by: Mark Travis <[email protected]>
  • Loading branch information
ximinez and mtrippled committed Jan 9, 2025
1 parent 3d28098 commit 6bf00f3
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 43 deletions.
8 changes: 5 additions & 3 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class InboundLedger final : public TimeoutCounter,
~InboundLedger();

// Called when another attempt is made to fetch this same ledger
void
update(std::uint32_t seq);
//
// Returns true if this triggers requests to be sent
bool
update(std::uint32_t seq, bool broadcast);

/** Returns true if we got all the data. */
bool
Expand Down Expand Up @@ -89,7 +91,7 @@ class InboundLedger final : public TimeoutCounter,
bool
checkLocal();
void
init(ScopedLockType& collectionLock);
init(ScopedLockType& collectionLock, bool broadcast);

bool
gotData(
Expand Down
41 changes: 36 additions & 5 deletions src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ InboundLedger::InboundLedger(
}

void
InboundLedger::init(ScopedLockType& collectionLock)
InboundLedger::init(ScopedLockType& collectionLock, bool broadcast)
{
ScopedLockType sl(mtx_);
collectionLock.unlock();
Expand All @@ -113,8 +113,18 @@ InboundLedger::init(ScopedLockType& collectionLock)

if (!complete_)
{
addPeers();
queueJob(sl);
if (broadcast)
{
addPeers();
queueJob(sl);
}
else
{
// Delay to give time to build the ledger before sending
JLOG(journal_.debug()) << "init: Deferring peer requests";
deferred_ = true;
setTimer(sl);
}
return;
}

Expand Down Expand Up @@ -145,8 +155,8 @@ InboundLedger::getPeerCount() const
});
}

void
InboundLedger::update(std::uint32_t seq)
bool
InboundLedger::update(std::uint32_t seq, bool broadcast)
{
ScopedLockType sl(mtx_);

Expand All @@ -156,6 +166,27 @@ InboundLedger::update(std::uint32_t seq)

// Prevent this from being swept
touch();

// If the signal is to broadcast, and this request has never tried to
// broadcast before, cancel any waiting timer, then fire off the job to
// broadcast. Note that this is calling mPeerSet->getPeerIds(), not
// getPeerCount(), because the latter will filter out peers that have been
// tried, but are since lost. This wants to check if peers have _ever_ been
// tried. If they have, stick with the normal timer flow.
if (broadcast && mPeerSet->getPeerIds().empty())
{
if (cancelTimer(sl))
{
JLOG(journal_.debug())
<< "update: cancelling timer to send peer requests";
deferred_ = false;
skipNext_ = true;
addPeers();
queueJob(sl);
return true;
}
}
return false;
}

bool
Expand Down
45 changes: 31 additions & 14 deletions src/xrpld/app/ledger/detail/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,29 @@ class InboundLedgersImp : public InboundLedgers
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// fallingBehind means the last closed ledger is at least 2
// behind the validated ledger. If the node is falling
// behind the network, it probably needs information from
// the network to catch up.
//
// The reason this should not simply be only at least 1
// behind the validated ledger is that a slight lag is
// normal case because some nodes get there slightly later
// than others. A difference of 2 means that at least a full
// ledger interval has passed, so the node is beginning to
// fall behind.
bool const fallingBehind = app_.getOPs().isFallingBehind();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
// the validated ledger. Because validations lag behind
// consensus, if we get any further behind than this, we
// risk losing sync, because we don't have the preferred
// ledger available.
LedgerIndex const validSeq =
app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture =
(seq > validSeq) && (seq < validSeq + lagLeeway);
constexpr std::size_t lagLeeway = 2;
bool const nearFuture = (validSeq > 0) && (seq > validSeq) &&
(seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
Expand All @@ -129,6 +138,7 @@ class InboundLedgersImp : public InboundLedgers
reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false")
<< ". falling behind: " << (fallingBehind ? "true" : "false")
<< ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq
<< ". Lag leeway: " << lagLeeway
Expand All @@ -139,6 +149,9 @@ class InboundLedgersImp : public InboundLedgers
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the node is falling behind, send requests.
if (fallingBehind)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
Expand All @@ -149,7 +162,7 @@ class InboundLedgersImp : public InboundLedgers
return false;
return true;
}();
ss << ". Would broadcast to peers? "
ss << ". Broadcast to peers? "
<< (shouldBroadcast ? "true." : "false.");

if (!shouldAcquire)
Expand Down Expand Up @@ -184,7 +197,7 @@ class InboundLedgersImp : public InboundLedgers
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
inbound->init(sl, shouldBroadcast);
++mCounter;
}
}
Expand All @@ -196,8 +209,12 @@ class InboundLedgersImp : public InboundLedgers
return {};
}

if (!isNew)
inbound->update(seq);
bool const didBroadcast = [&]() {
if (!isNew)
return inbound->update(seq, shouldBroadcast);
return shouldBroadcast;
}();
ss << " First broadcast: " << (didBroadcast ? "true" : "false");

if (!inbound->isComplete())
{
Expand Down
45 changes: 28 additions & 17 deletions src/xrpld/app/ledger/detail/TimeoutCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,31 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
<< "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
return;

if (auto ptr = wptr.lock())
timer_.async_wait([wptr =
pmDowncast()](boost::system::error_code const& ec) {
if (auto ptr = wptr.lock())
{
ScopedLockType sl(ptr->mtx_);
if (ec == boost::asio::error::operation_aborted || ptr->skipNext_)
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec << " (operation_aborted: "
<< boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted"
: "other")
<< ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
<< "Aborting setTimer: " << ec
<< ", skip: " << (ptr->skipNext_ ? "true" : "false");
ptr->skipNext_ = false;
return;
}
});

ptr->queueJob(sl);
}
});
}

std::size_t
TimeoutCounter::cancelTimer(ScopedLockType& sl)
{
auto const ret = timer_.cancel();
JLOG(journal_.debug()) << "Cancelled " << ret << " timer(s)";
return ret;
}

void
Expand Down Expand Up @@ -110,9 +118,12 @@ TimeoutCounter::invokeOnTimer()

if (!progress_)
{
++timeouts_;
JLOG(journal_.debug())
<< "Timeout(" << timeouts_ << ") " << " acquiring " << hash_;
if (deferred_)
deferred_ = false;
else
++timeouts_;
JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
<< " acquiring " << hash_;
onTimer(false, sl);
}
else
Expand Down
9 changes: 9 additions & 0 deletions src/xrpld/app/ledger/detail/TimeoutCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class TimeoutCounter
void
setTimer(ScopedLockType&);

/** Cancel any waiting timer */
std::size_t
cancelTimer(ScopedLockType&);

/** Queue a job to call invokeOnTimer(). */
void
queueJob(ScopedLockType&);
Expand Down Expand Up @@ -133,6 +137,11 @@ class TimeoutCounter
int timeouts_;
bool complete_;
bool failed_;
/** Whether the initialization deferred doing any work until the first
* timeout. */
bool deferred_ = false;
/** Skip the next timeout, regardless of ec */
bool skipNext_ = false;
/** Whether forward progress has been made. */
bool progress_;
/** The minimum time to wait between calls to execute(). */
Expand Down
29 changes: 25 additions & 4 deletions src/xrpld/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ class NetworkOPsImp final : public NetworkOPs
clearLedgerFetch() override;
Json::Value
getLedgerFetchInfo() override;
bool
isFallingBehind() const override;
std::uint32_t
acceptLedger(
std::optional<std::chrono::milliseconds> consensusDelay) override;
Expand Down Expand Up @@ -724,6 +726,7 @@ class NetworkOPsImp final : public NetworkOPs
std::atomic<bool> amendmentBlocked_{false};
std::atomic<bool> amendmentWarned_{false};
std::atomic<bool> unlBlocked_{false};
std::atomic<bool> fallingBehind_{false};

ClosureCounter<void, boost::system::error_code const&> waitHandlerCounter_;
boost::asio::steady_timer heartbeatTimer_;
Expand Down Expand Up @@ -1828,13 +1831,25 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)

auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();

JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
JLOG(m_journal.info()) << "beginConsensus time for #" << closingInfo.seq
<< " with LCL " << closingInfo.parentHash;

auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
fallingBehind_ = false;
if (closingInfo.seq < m_ledgerMaster.getValidLedgerIndex() - 1)
{
fallingBehind_ = true;
JLOG(m_journal.warn())
<< "beginConsensus Current ledger " << closingInfo.seq
<< " is at least 2 behind validated "
<< m_ledgerMaster.getValidLedgerIndex();
}

auto const prevLedger =
m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);

if (!prevLedger)
{
fallingBehind_ = true;
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
Expand Down Expand Up @@ -1886,7 +1901,7 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
mLastConsensusPhase = currPhase;
}

JLOG(m_journal.debug()) << "Initiating consensus engine";
JLOG(m_journal.debug()) << "beginConsensus Initiating consensus engine";
return true;
}

Expand Down Expand Up @@ -1959,7 +1974,7 @@ NetworkOPsImp::endConsensus()
{
// check if the ledger is good enough to go to FULL
// Note: Do not go to FULL if we don't have the previous ledger
// check if the ledger is bad enough to go to CONNECTE D -- TODO
// check if the ledger is bad enough to go to CONNECTED -- TODO
auto current = m_ledgerMaster.getCurrentLedger();
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2 * current->info().closeTimeResolution))
Expand Down Expand Up @@ -2772,6 +2787,12 @@ NetworkOPsImp::getLedgerFetchInfo()
return app_.getInboundLedgers().getInfo();
}

bool
NetworkOPsImp::isFallingBehind() const
{
return fallingBehind_;
}

void
NetworkOPsImp::pubProposedTransaction(
std::shared_ptr<ReadView const> const& ledger,
Expand Down
2 changes: 2 additions & 0 deletions src/xrpld/app/misc/NetworkOPs.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ class NetworkOPs : public InfoSub::Source
clearLedgerFetch() = 0;
virtual Json::Value
getLedgerFetchInfo() = 0;
virtual bool
isFallingBehind() const = 0;

/** Accepts the current transaction tree, return the new ledger's sequence
Expand Down

0 comments on commit 6bf00f3

Please sign in to comment.