From 48803a48afc3bede55d71618c2ee38fd9dbfd3b0 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Sun, 19 Dec 2021 23:47:33 -0800 Subject: [PATCH] Improve ledger-fetching logic: When fetching ledgers, the existing code would isolate the peer that sent the most useful responses and issue follow up queries only to that peer. This commit increases the query aggressiveness, and changes the mechanism used to select which peers to issue follow-up queries to so as to more evenly spread the load along those peers which provided useful responses. --- src/ripple/app/ledger/InboundLedger.h | 7 +- src/ripple/app/ledger/impl/InboundLedger.cpp | 156 +++++++++++++----- src/ripple/app/ledger/impl/InboundLedgers.cpp | 7 + src/ripple/core/Job.h | 2 +- src/ripple/overlay/impl/PeerImp.cpp | 34 ++-- src/ripple/shamap/SHAMap.h | 5 +- src/ripple/shamap/impl/SHAMapSync.cpp | 38 ++--- src/test/shamap/SHAMapSync_test.cpp | 42 ++--- 8 files changed, 172 insertions(+), 119 deletions(-) diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 25f64447649..287dbaf7f16 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -39,9 +39,6 @@ class InboundLedger final : public TimeoutCounter, public: using clock_type = beast::abstract_clock; - using PeerDataPairType = - std::pair, std::shared_ptr>; - // These are the reasons we might acquire a ledger enum class Reason { HISTORY, // Acquiring past ledger @@ -193,7 +190,9 @@ class InboundLedger final : public TimeoutCounter, // Data we have received from peers std::mutex mReceivedDataLock; - std::vector mReceivedData; + std::vector< + std::pair, std::shared_ptr>> + mReceivedData; bool mReceiveDispatched; std::unique_ptr mPeerSet; }; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 979c1454410..6609759d00e 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -33,7 +33,10 @@ #include #include +#include + #include +#include namespace ripple { @@ -57,15 +60,15 @@ enum { // Number of nodes to find initially , - missingNodesFind = 256 + missingNodesFind = 512 // Number of nodes to request for a reply , - reqNodesReply = 128 + reqNodesReply = 256 // Number of nodes to request blindly , - reqNodes = 8 + reqNodes = 12 }; // millisecond for each ledger timeout @@ -601,7 +604,7 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) tmBH.set_ledgerhash(hash_.begin(), hash_.size()); for (auto const& p : need) { - JLOG(journal_.warn()) << "Want: " << p.second; + JLOG(journal_.debug()) << "Want: " << p.second; if (!typeSet) { @@ -661,15 +664,15 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (reason != TriggerReason::reply) { // If we're querying blind, don't query deep - tmGL.set_querydepth(0); + tmGL.set_querydepth(1); } else if (peer && peer->isHighLatency()) { // If the peer has high latency, query extra deep - tmGL.set_querydepth(2); + tmGL.set_querydepth(3); } else - tmGL.set_querydepth(1); + tmGL.set_querydepth(2); // Get the state data first because it's the most likely to be useful // if we wind up abandoning this fetch. @@ -952,22 +955,23 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san) try { + auto const f = filter.get(); + for (auto const& node : packet.nodes()) { auto const nodeID = deserializeSHAMapNodeID(node.nodeid()); if (!nodeID) - { - san.incInvalid(); - return; - } + throw std::runtime_error("data does not properly deserialize"); if (nodeID->isRoot()) - san += map.addRootNode( - rootHash, makeSlice(node.nodedata()), filter.get()); + { + san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f); + } else - san += map.addKnownNode( - *nodeID, makeSlice(node.nodedata()), filter.get()); + { + san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f); + } if (!san.isGood()) { @@ -1120,19 +1124,19 @@ InboundLedger::processData( std::shared_ptr peer, protocol::TMLedgerData& packet) { - ScopedLockType sl(mtx_); - if (packet.type() == protocol::liBASE) { - if (packet.nodes_size() < 1) + if (packet.nodes().empty()) { - JLOG(journal_.warn()) << "Got empty header data"; + JLOG(journal_.warn()) << peer->id() << ": empty header data"; peer->charge(Resource::feeInvalidRequest); return -1; } SHAMapAddNode san; + ScopedLockType sl(mtx_); + try { if (!mHaveHeader) @@ -1177,13 +1181,18 @@ InboundLedger::processData( if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE)) { - if (packet.nodes().size() == 0) + std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: " + : "liAS_NODE: "; + + if (packet.nodes().empty()) { - JLOG(journal_.info()) << "Got response with no nodes"; + JLOG(journal_.info()) << peer->id() << ": response with no nodes"; peer->charge(Resource::feeInvalidRequest); return -1; } + ScopedLockType sl(mtx_); + // Verify node IDs and data are complete for (auto const& node : packet.nodes()) { @@ -1198,14 +1207,10 @@ InboundLedger::processData( SHAMapAddNode san; receiveNode(packet, san); - if (packet.type() == protocol::liTX_NODE) - { - JLOG(journal_.debug()) << "Ledger TX node stats: " << san.get(); - } - else - { - JLOG(journal_.debug()) << "Ledger AS node stats: " << san.get(); - } + JLOG(journal_.debug()) + << "Ledger " + << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS") + << " node stats: " << san.get(); if (san.isUseful()) progress_ = true; @@ -1217,20 +1222,89 @@ InboundLedger::processData( return -1; } +namespace detail { +// Track the amount of useful data that each peer returns +struct PeerDataCounts +{ + // Map from peer to amount of useful the peer returned + std::unordered_map, int> counts; + // The largest amount of useful data that any peer returned + int maxCount = 0; + + // Update the data count for a peer + void + update(std::shared_ptr&& peer, int dataCount) + { + if (dataCount <= 0) + return; + maxCount = std::max(maxCount, dataCount); + auto i = counts.find(peer); + if (i == counts.end()) + { + counts.emplace(std::move(peer), dataCount); + return; + } + i->second = std::max(i->second, dataCount); + } + + // Prune all the peers that didn't return enough data. + void + prune() + { + // Remove all the peers that didn't return at least half as much data as + // the best peer + auto const thresh = maxCount / 2; + auto i = counts.begin(); + while (i != counts.end()) + { + if (i->second < thresh) + i = counts.erase(i); + else + ++i; + } + } + + // call F with the `peer` parameter with a random sample of at most n values + // of the counts vector. + template + void + sampleN(std::size_t n, F&& f) + { + if (counts.empty()) + return; + + std::minstd_rand rng{std::random_device{}()}; + std::sample( + counts.begin(), + counts.end(), + boost::make_function_output_iterator( + [&f](auto&& v) { f(v.first); }), + n, + rng); + } +}; +} // namespace detail + /** Process pending TMLedgerData - Query the 'best' peer + Query the a random sample of the 'best' peers */ void InboundLedger::runData() { - std::shared_ptr chosenPeer; - int chosenPeerCount = -1; + // Maximum number of peers to request data from + constexpr std::size_t maxUsefulPeers = 6; - std::vector data; + decltype(mReceivedData) data; + + // Reserve some memory so the first couple iterations don't reallocate + data.reserve(8); + + detail::PeerDataCounts dataCounts; for (;;) { data.clear(); + { std::lock_guard sl(mReceivedDataLock); @@ -1243,24 +1317,22 @@ InboundLedger::runData() data.swap(mReceivedData); } - // Select the peer that gives us the most nodes that are useful, - // breaking ties in favor of the peer that responded first. for (auto& entry : data) { if (auto peer = entry.first.lock()) { int count = processData(peer, *(entry.second)); - if (count > chosenPeerCount) - { - chosenPeerCount = count; - chosenPeer = std::move(peer); - } + dataCounts.update(std::move(peer), count); } } } - if (chosenPeer) - trigger(chosenPeer, TriggerReason::reply); + // Select a random sample of the peers that gives us the most nodes that are + // useful + dataCounts.prune(); + dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr const& peer) { + trigger(peer, TriggerReason::reply); + }); } Json::Value diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 76681ea0a9d..7ee49b4547a 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -74,6 +74,12 @@ class InboundLedgersImp : public InboundLedgers reason != InboundLedger::Reason::SHARD || (seq != 0 && app_.getShardStore())); + // probably not the right rule + if (app_.getOPs().isNeedNetworkLedger() && + (reason != InboundLedger::Reason::GENERIC) && + (reason != InboundLedger::Reason::CONSENSUS)) + return {}; + bool isNew = true; std::shared_ptr inbound; { @@ -82,6 +88,7 @@ class InboundLedgersImp : public InboundLedgers { return {}; } + auto it = mLedgers.find(hash); if (it != mLedgers.end()) { diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 43514806280..c4f2eddf35a 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -60,11 +60,11 @@ enum JobType { jtLEDGER_REQ, // Peer request ledger/txnset data jtPROPOSAL_ut, // A proposal from an untrusted source jtREPLAY_TASK, // A Ledger replay task/subtask - jtLEDGER_DATA, // Received data for a ledger we're acquiring jtTRANSACTION, // A transaction received from the network jtMISSING_TXN, // Request missing transactions jtREQUESTED_TXN, // Reply with requested transactions jtBATCH, // Apply batched transactions + jtLEDGER_DATA, // Received data for a ledger we're acquiring jtADVANCE, // Advance validated/acquired ledgers jtPUBLEDGER, // Publish a fully-accepted ledger jtTXN_DATA, // Fetch a proposed set diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index b6e07a0f1b6..5bdaa01213d 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1339,7 +1339,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // case ShardState::finalized: default: return badData("Invalid incomplete shard state"); - }; + } s.add32(incomplete.state()); // Verify progress @@ -3523,8 +3523,8 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) { auto const queryDepth{ m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)}; - std::vector nodeIds; - std::vector rawNodes; + + std::vector> data; for (int i = 0; i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes; @@ -3532,30 +3532,22 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) { auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))}; - nodeIds.clear(); - rawNodes.clear(); + data.clear(); + data.reserve(Tuning::softMaxReplyNodes); + try { - if (map->getNodeFat( - *shaMapNodeId, - nodeIds, - rawNodes, - fatLeaves, - queryDepth)) + if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth)) { - assert(nodeIds.size() == rawNodes.size()); JLOG(p_journal_.trace()) << "processLedgerRequest: getNodeFat got " - << rawNodes.size() << " nodes"; + << data.size() << " nodes"; - auto rawNodeIter{rawNodes.begin()}; - for (auto const& nodeId : nodeIds) + for (auto const& d : data) { protocol::TMLedgerNode* node{ledgerData.add_nodes()}; - node->set_nodeid(nodeId.getRawString()); - node->set_nodedata( - &rawNodeIter->front(), rawNodeIter->size()); - ++rawNodeIter; + node->set_nodeid(d.first.getRawString()); + node->set_nodedata(d.second.data(), d.second.size()); } } else @@ -3607,9 +3599,7 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) << ledgerData.nodes_size() << " nodes"; } - auto message{ - std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; - send(message); + send(std::make_shared(ledgerData, protocol::mtLEDGER_DATA)); } int diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index b913bd5b1d9..1d221179c16 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -238,7 +238,7 @@ class SHAMap void visitDifferences( SHAMap const* have, - std::function) const; + std::function const&) const; /** Visit every leaf node in this SHAMap @@ -267,8 +267,7 @@ class SHAMap bool getNodeFat( SHAMapNodeID const& wanted, - std::vector& nodeIDs, - std::vector& rawNodes, + std::vector>& data, bool fatLeaves, std::uint32_t depth) const; diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 1e233d55f23..1bada85133d 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -52,7 +52,7 @@ SHAMap::visitNodes(std::function const& function) const auto node = std::static_pointer_cast(root_); int pos = 0; - while (1) + while (true) { while (pos < 16) { @@ -99,7 +99,7 @@ SHAMap::visitNodes(std::function const& function) const void SHAMap::visitDifferences( SHAMap const* have, - std::function function) const + std::function const& function) const { // Visit every node in this SHAMap that is not present // in the specified SHAMap @@ -426,8 +426,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) bool SHAMap::getNodeFat( SHAMapNodeID const& wanted, - std::vector& nodeIDs, - std::vector& rawNodes, + std::vector>& data, bool fatLeaves, std::uint32_t depth) const { @@ -443,16 +442,15 @@ SHAMap::getNodeFat( auto inner = static_cast(node); if (inner->isEmptyBranch(branch)) return false; - node = descendThrow(inner, branch); nodeID = nodeID.getChildNodeID(branch); } if (node == nullptr || wanted != nodeID) { - JLOG(journal_.warn()) << "peer requested node that is not in the map:\n" - << wanted << " but found\n" - << nodeID; + JLOG(journal_.info()) + << "peer requested node that is not in the map: " << wanted + << " but found " << nodeID; return false; } @@ -465,18 +463,17 @@ SHAMap::getNodeFat( std::stack> stack; stack.emplace(node, nodeID, depth); + Serializer s(8192); + while (!stack.empty()) { std::tie(node, nodeID, depth) = stack.top(); stack.pop(); - { - // Add this node to the reply - Serializer s; - node->serializeForWire(s); - nodeIDs.push_back(nodeID); - rawNodes.push_back(std::move(s.modData())); - } + // Add this node to the reply + s.erase(); + node->serializeForWire(s); + data.emplace_back(std::make_pair(nodeID, s.getData())); if (node->isInner()) { @@ -484,6 +481,7 @@ SHAMap::getNodeFat( // without decrementing the depth auto inner = static_cast(node); int bc = inner->getBranchCount(); + if ((depth > 0) || (bc == 1)) { // We need to process this node's children @@ -492,7 +490,7 @@ SHAMap::getNodeFat( if (!inner->isEmptyBranch(i)) { auto const childNode = descendThrow(inner, i); - SHAMapNodeID const childID = nodeID.getChildNodeID(i); + auto const childID = nodeID.getChildNodeID(i); if (childNode->isInner() && ((depth > 1) || (bc == 1))) { @@ -506,10 +504,10 @@ SHAMap::getNodeFat( else if (childNode->isInner() || fatLeaves) { // Just include this node - Serializer ns; - childNode->serializeForWire(ns); - nodeIDs.push_back(childID); - rawNodes.push_back(std::move(ns.modData())); + s.erase(); + childNode->serializeForWire(s); + data.emplace_back( + std::make_pair(childID, s.getData())); } } } diff --git a/src/test/shamap/SHAMapSync_test.cpp b/src/test/shamap/SHAMapSync_test.cpp index f262f5f8bff..ba32f6e80dc 100644 --- a/src/test/shamap/SHAMapSync_test.cpp +++ b/src/test/shamap/SHAMapSync_test.cpp @@ -124,24 +124,18 @@ class SHAMapSync_test : public beast::unit_test::suite destination.setSynching(); { - std::vector gotNodeIDs_a; - std::vector gotNodes_a; + std::vector> a; BEAST_EXPECT(source.getNodeFat( - SHAMapNodeID(), - gotNodeIDs_a, - gotNodes_a, - rand_bool(eng_), - rand_int(eng_, 2))); - - unexpected(gotNodes_a.size() < 1, "NodeSize"); - - BEAST_EXPECT(destination - .addRootNode( - source.getHash(), - makeSlice(*gotNodes_a.begin()), - nullptr) - .isGood()); + SHAMapNodeID(), a, rand_bool(eng_), rand_int(eng_, 2))); + + unexpected(a.size() < 1, "NodeSize"); + + BEAST_EXPECT( + destination + .addRootNode( + source.getHash(), makeSlice(a[0].second), nullptr) + .isGood()); } do @@ -155,8 +149,7 @@ class SHAMapSync_test : public beast::unit_test::suite break; // get as many nodes as possible based on this information - std::vector gotNodeIDs_b; - std::vector gotNodes_b; + std::vector> b; for (auto& it : nodesMissing) { @@ -164,29 +157,24 @@ class SHAMapSync_test : public beast::unit_test::suite // non-deterministic number of times and the number of tests run // should be deterministic if (!source.getNodeFat( - it.first, - gotNodeIDs_b, - gotNodes_b, - rand_bool(eng_), - rand_int(eng_, 2))) + it.first, b, rand_bool(eng_), rand_int(eng_, 2))) fail("", __FILE__, __LINE__); } // Don't use BEAST_EXPECT here b/c it will be called a // non-deterministic number of times and the number of tests run // should be deterministic - if (gotNodeIDs_b.size() != gotNodes_b.size() || - gotNodeIDs_b.empty()) + if (b.empty()) fail("", __FILE__, __LINE__); - for (std::size_t i = 0; i < gotNodeIDs_b.size(); ++i) + for (std::size_t i = 0; i < b.size(); ++i) { // Don't use BEAST_EXPECT here b/c it will be called a // non-deterministic number of times and the number of tests run // should be deterministic if (!destination .addKnownNode( - gotNodeIDs_b[i], makeSlice(gotNodes_b[i]), nullptr) + b[i].first, makeSlice(b[i].second), nullptr) .isUseful()) fail("", __FILE__, __LINE__); }