Skip to content

Commit

Permalink
Improve ledger-fetching logic:
Browse files Browse the repository at this point in the history
When fetching ledgers, the existing code would isolate the peer
that sent the most useful responses and issue follow up queries
only to that peer.

This commit increases the query aggressiveness, and changes the
mechanism used to select which peers to issue follow-up queries
to so as to more evenly spread the load along those peers which
provided useful responses.
  • Loading branch information
nbougalis committed Mar 29, 2022
1 parent 1b9387e commit 48803a4
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 119 deletions.
7 changes: 3 additions & 4 deletions src/ripple/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ class InboundLedger final : public TimeoutCounter,
public:
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;

using PeerDataPairType =
std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>;

// These are the reasons we might acquire a ledger
enum class Reason {
HISTORY, // Acquiring past ledger
Expand Down Expand Up @@ -193,7 +190,9 @@ class InboundLedger final : public TimeoutCounter,

// Data we have received from peers
std::mutex mReceivedDataLock;
std::vector<PeerDataPairType> mReceivedData;
std::vector<
std::pair<std::weak_ptr<Peer>, std::shared_ptr<protocol::TMLedgerData>>>
mReceivedData;
bool mReceiveDispatched;
std::unique_ptr<PeerSet> mPeerSet;
};
Expand Down
156 changes: 114 additions & 42 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
#include <ripple/resource/Fees.h>
#include <ripple/shamap/SHAMapNodeID.h>

#include <boost/iterator/function_output_iterator.hpp>

#include <algorithm>
#include <random>

namespace ripple {

Expand All @@ -57,15 +60,15 @@ enum {

// Number of nodes to find initially
,
missingNodesFind = 256
missingNodesFind = 512

// Number of nodes to request for a reply
,
reqNodesReply = 128
reqNodesReply = 256

// Number of nodes to request blindly
,
reqNodes = 8
reqNodes = 12
};

// millisecond for each ledger timeout
Expand Down Expand Up @@ -601,7 +604,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
for (auto const& p : need)
{
JLOG(journal_.warn()) << "Want: " << p.second;
JLOG(journal_.debug()) << "Want: " << p.second;

if (!typeSet)
{
Expand Down Expand Up @@ -661,15 +664,15 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (reason != TriggerReason::reply)
{
// If we're querying blind, don't query deep
tmGL.set_querydepth(0);
tmGL.set_querydepth(1);
}
else if (peer && peer->isHighLatency())
{
// If the peer has high latency, query extra deep
tmGL.set_querydepth(2);
tmGL.set_querydepth(3);
}
else
tmGL.set_querydepth(1);
tmGL.set_querydepth(2);

// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
Expand Down Expand Up @@ -952,22 +955,23 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)

try
{
auto const f = filter.get();

for (auto const& node : packet.nodes())
{
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());

if (!nodeID)
{
san.incInvalid();
return;
}
throw std::runtime_error("data does not properly deserialize");

if (nodeID->isRoot())
san += map.addRootNode(
rootHash, makeSlice(node.nodedata()), filter.get());
{
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
}
else
san += map.addKnownNode(
*nodeID, makeSlice(node.nodedata()), filter.get());
{
san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f);
}

if (!san.isGood())
{
Expand Down Expand Up @@ -1120,19 +1124,19 @@ InboundLedger::processData(
std::shared_ptr<Peer> peer,
protocol::TMLedgerData& packet)
{
ScopedLockType sl(mtx_);

if (packet.type() == protocol::liBASE)
{
if (packet.nodes_size() < 1)
if (packet.nodes().empty())
{
JLOG(journal_.warn()) << "Got empty header data";
JLOG(journal_.warn()) << peer->id() << ": empty header data";
peer->charge(Resource::feeInvalidRequest);
return -1;
}

SHAMapAddNode san;

ScopedLockType sl(mtx_);

try
{
if (!mHaveHeader)
Expand Down Expand Up @@ -1177,13 +1181,18 @@ InboundLedger::processData(
if ((packet.type() == protocol::liTX_NODE) ||
(packet.type() == protocol::liAS_NODE))
{
if (packet.nodes().size() == 0)
std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: "
: "liAS_NODE: ";

if (packet.nodes().empty())
{
JLOG(journal_.info()) << "Got response with no nodes";
JLOG(journal_.info()) << peer->id() << ": response with no nodes";
peer->charge(Resource::feeInvalidRequest);
return -1;
}

ScopedLockType sl(mtx_);

// Verify node IDs and data are complete
for (auto const& node : packet.nodes())
{
Expand All @@ -1198,14 +1207,10 @@ InboundLedger::processData(
SHAMapAddNode san;
receiveNode(packet, san);

if (packet.type() == protocol::liTX_NODE)
{
JLOG(journal_.debug()) << "Ledger TX node stats: " << san.get();
}
else
{
JLOG(journal_.debug()) << "Ledger AS node stats: " << san.get();
}
JLOG(journal_.debug())
<< "Ledger "
<< ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS")
<< " node stats: " << san.get();

if (san.isUseful())
progress_ = true;
Expand All @@ -1217,20 +1222,89 @@ InboundLedger::processData(
return -1;
}

namespace detail {
// Track the amount of useful data that each peer returns
struct PeerDataCounts
{
// Map from peer to amount of useful the peer returned
std::unordered_map<std::shared_ptr<Peer>, int> counts;
// The largest amount of useful data that any peer returned
int maxCount = 0;

// Update the data count for a peer
void
update(std::shared_ptr<Peer>&& peer, int dataCount)
{
if (dataCount <= 0)
return;
maxCount = std::max(maxCount, dataCount);
auto i = counts.find(peer);
if (i == counts.end())
{
counts.emplace(std::move(peer), dataCount);
return;
}
i->second = std::max(i->second, dataCount);
}

// Prune all the peers that didn't return enough data.
void
prune()
{
// Remove all the peers that didn't return at least half as much data as
// the best peer
auto const thresh = maxCount / 2;
auto i = counts.begin();
while (i != counts.end())
{
if (i->second < thresh)
i = counts.erase(i);
else
++i;
}
}

// call F with the `peer` parameter with a random sample of at most n values
// of the counts vector.
template <class F>
void
sampleN(std::size_t n, F&& f)
{
if (counts.empty())
return;

std::minstd_rand rng{std::random_device{}()};
std::sample(
counts.begin(),
counts.end(),
boost::make_function_output_iterator(
[&f](auto&& v) { f(v.first); }),
n,
rng);
}
};
} // namespace detail

/** Process pending TMLedgerData
Query the 'best' peer
Query the a random sample of the 'best' peers
*/
void
InboundLedger::runData()
{
std::shared_ptr<Peer> chosenPeer;
int chosenPeerCount = -1;
// Maximum number of peers to request data from
constexpr std::size_t maxUsefulPeers = 6;

std::vector<PeerDataPairType> data;
decltype(mReceivedData) data;

// Reserve some memory so the first couple iterations don't reallocate
data.reserve(8);

detail::PeerDataCounts dataCounts;

for (;;)
{
data.clear();

{
std::lock_guard sl(mReceivedDataLock);

Expand All @@ -1243,24 +1317,22 @@ InboundLedger::runData()
data.swap(mReceivedData);
}

// Select the peer that gives us the most nodes that are useful,
// breaking ties in favor of the peer that responded first.
for (auto& entry : data)
{
if (auto peer = entry.first.lock())
{
int count = processData(peer, *(entry.second));
if (count > chosenPeerCount)
{
chosenPeerCount = count;
chosenPeer = std::move(peer);
}
dataCounts.update(std::move(peer), count);
}
}
}

if (chosenPeer)
trigger(chosenPeer, TriggerReason::reply);
// Select a random sample of the peers that gives us the most nodes that are
// useful
dataCounts.prune();
dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr<Peer> const& peer) {
trigger(peer, TriggerReason::reply);
});
}

Json::Value
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class InboundLedgersImp : public InboundLedgers
reason != InboundLedger::Reason::SHARD ||
(seq != 0 && app_.getShardStore()));

// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
return {};

bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
Expand All @@ -82,6 +88,7 @@ class InboundLedgersImp : public InboundLedgers
{
return {};
}

auto it = mLedgers.find(hash);
if (it != mLedgers.end())
{
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/core/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ enum JobType {
jtLEDGER_REQ, // Peer request ledger/txnset data
jtPROPOSAL_ut, // A proposal from an untrusted source
jtREPLAY_TASK, // A Ledger replay task/subtask
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtTRANSACTION, // A transaction received from the network
jtMISSING_TXN, // Request missing transactions
jtREQUESTED_TXN, // Reply with requested transactions
jtBATCH, // Apply batched transactions
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger
jtTXN_DATA, // Fetch a proposed set
Expand Down
Loading

0 comments on commit 48803a4

Please sign in to comment.