Skip to content

Commit

Permalink
Refrain from sending duplicate TMGetLedger messages to the same peer
Browse files Browse the repository at this point in the history
* Allow a retry after 5s in case of peer or network congestion.
* Addresses RIPD-1870
* (Changes levelization. That is not desirable, and will need to be
  fixed.)
  • Loading branch information
ximinez committed Aug 13, 2024
1 parent e6ef0fc commit 326da5b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Loop: ripple.app ripple.nodestore
ripple.app > ripple.nodestore

Loop: ripple.app ripple.overlay
ripple.overlay ~= ripple.app
ripple.overlay == ripple.app

Loop: ripple.app ripple.peerfinder
ripple.app > ripple.peerfinder
Expand Down
14 changes: 14 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), tx_interval);
}

bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);

auto& entry = emplace(key).first;

return entry.shouldProcessForPeer(
peer, suppressionMap_.clock().now(), interval);
}

int
HashRouter::getFlags(uint256 const& key)
{
Expand Down
28 changes: 28 additions & 0 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,29 @@ class HashRouter
return true;
}

bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) &&
((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}

private:
int flags_ = 0;
std::set<PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};

public:
Expand Down Expand Up @@ -180,6 +196,18 @@ class HashRouter
int& flags,
std::chrono::seconds tx_interval);

/** Determines whether the hashed item should be process for the given peer.
Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval);

/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
Expand Down
33 changes: 31 additions & 2 deletions src/ripple/overlay/impl/PeerSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//==============================================================================

#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/core/JobQueue.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/PeerSet.h>
#include <ripple/protocol/digest.h>

namespace ripple {

Expand Down Expand Up @@ -104,16 +106,43 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);

auto const messageHash = [&]() {
auto const packetBuffer =
packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();

// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 5s;

if (peer)
{
peer->send(packet);
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, peer->id(), interval))
peer->send(packet);
else
// Don't merge this as warning. Change it to debug.
JLOG(journal_.warn())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << peer->id() << "]";
return;
}

for (auto id : peers_)
{
if (auto p = app_.overlay().findPeerByShortID(id))
p->send(packet);
{
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, p->id(), interval))
p->send(packet);
else
// Don't merge this as warning. Change it to debug.
JLOG(journal_.warn()) << "Suppressing sending duplicate "
<< protocolMessageName(type)
<< " message to [" << p->id() << "]";
}
}
}

Expand Down

0 comments on commit 326da5b

Please sign in to comment.