From 1d404fef67197a60c8759a12d4236b3819860d1b Mon Sep 17 00:00:00 2001 From: Ed Hennis Date: Mon, 12 Aug 2024 20:15:44 -0400 Subject: [PATCH] Refrain from sending duplicate TMGetLedger messages to the same peer * Allow a retry after 5s in case of peer or network congestion. * Addresses RIPD-1870 * (Changes levelization. That is not desirable, and will need to be fixed.) --- Builds/levelization/results/loops.txt | 2 +- src/ripple/app/misc/HashRouter.cpp | 14 ++++++++ src/ripple/app/misc/HashRouter.h | 28 ++++++++++++++++ src/ripple/overlay/impl/PeerSet.cpp | 47 +++++++++++++++++++++++++-- 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/Builds/levelization/results/loops.txt b/Builds/levelization/results/loops.txt index cb137f497cb..da141e6a628 100644 --- a/Builds/levelization/results/loops.txt +++ b/Builds/levelization/results/loops.txt @@ -11,7 +11,7 @@ Loop: ripple.app ripple.nodestore ripple.app > ripple.nodestore Loop: ripple.app ripple.overlay - ripple.overlay ~= ripple.app + ripple.overlay == ripple.app Loop: ripple.app ripple.peerfinder ripple.app > ripple.peerfinder diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 8085d6892ab..bd286d186b7 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -90,6 +90,20 @@ HashRouter::shouldProcess( return s.shouldProcess(suppressionMap_.clock().now(), tx_interval); } +bool +HashRouter::shouldProcessForPeer( + uint256 const& key, + PeerShortID peer, + std::chrono::seconds interval) +{ + std::lock_guard lock(mutex_); + + auto& entry = emplace(key).first; + + return entry.shouldProcessForPeer( + peer, suppressionMap_.clock().now(), interval); +} + int HashRouter::getFlags(uint256 const& key) { diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 8c546b2c51d..889f0458522 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -125,6 +125,21 @@ class HashRouter return true; } + bool + shouldProcessForPeer( + PeerShortID peer, + Stopwatch::time_point now, + std::chrono::seconds interval) + { + if (peerProcessed_.contains(peer) && + ((peerProcessed_[peer] + interval) > now)) + return false; + // Peer may already be in the list, but adding it again doesn't hurt + addPeer(peer); + peerProcessed_[peer] = now; + return true; + } + private: int flags_ = 0; std::set peers_; @@ -132,6 +147,7 @@ class HashRouter // than one flag needs to expire independently. std::optional relayed_; std::optional processed_; + std::map peerProcessed_; }; public: @@ -180,6 +196,18 @@ class HashRouter int& flags, std::chrono::seconds tx_interval); + /** Determines whether the hashed item should be process for the given peer. + Could be an incoming or outgoing message. + + Items filtered with this function should only be processed for the given + peer once. Unlike shouldProcess, it can be processed for other peers. + */ + bool + shouldProcessForPeer( + uint256 const& key, + PeerShortID peer, + std::chrono::seconds interval); + /** Set the flags on a hash. @return `true` if the flags were changed. `false` if unchanged. diff --git a/src/ripple/overlay/impl/PeerSet.cpp b/src/ripple/overlay/impl/PeerSet.cpp index de5c3cd9f93..34992203327 100644 --- a/src/ripple/overlay/impl/PeerSet.cpp +++ b/src/ripple/overlay/impl/PeerSet.cpp @@ -18,9 +18,11 @@ //============================================================================== #include +#include #include #include #include +#include namespace ripple { @@ -104,16 +106,57 @@ PeerSetImpl::sendRequest( std::shared_ptr const& peer) { auto packet = std::make_shared(message, type); + + auto const messageHash = [&]() { + auto const packetBuffer = + packet->getBuffer(compression::Compressed::Off); + return sha512Half(Slice(packetBuffer.data(), packetBuffer.size())); + }(); + auto const shortHash = to_string(messageHash).substr(0, 6); + + // Allow messages to be re-sent to the same peer after a delay + using namespace std::chrono_literals; + constexpr std::chrono::seconds interval = 5s; + if (peer) { - peer->send(packet); + if (app_.getHashRouter().shouldProcessForPeer( + messageHash, peer->id(), interval)) + { + // Don't merge this as warning. Change it to trace or remove. + JLOG(journal_.warn()) + << "Sending " << protocolMessageName(type) << " message " + << shortHash << " to [" << peer->id() << "]"; + peer->send(packet); + } + else + // Don't merge this as warning. Change it to debug. + JLOG(journal_.warn()) + << "Suppressing sending duplicate " << protocolMessageName(type) + << " message " << shortHash << " to [" << peer->id() << "]"; return; } for (auto id : peers_) { if (auto p = app_.overlay().findPeerByShortID(id)) - p->send(packet); + { + if (app_.getHashRouter().shouldProcessForPeer( + messageHash, p->id(), interval)) + { + // Don't merge this as warning. Change it to trace or remove. + JLOG(journal_.warn()) + << "Sending " << protocolMessageName(type) << " message " + << shortHash << " to [" << peer->id() << "]"; + p->send(packet); + } + else + // Don't merge this as warning. Change it to debug. + JLOG(journal_.warn()) + << "Suppressing sending duplicate " + << protocolMessageName(type) << " message " << shortHash + << " to [" << p->id() << "]"; + } } }