Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ci/getledger' into ci/mt/noacquire
Browse files Browse the repository at this point in the history
* origin/ci/getledger:
  Refrain from sending duplicate TMGetLedger messages to the same peer
  • Loading branch information
ximinez committed Aug 13, 2024
2 parents 853c01c + 1d404fe commit 2a389d3
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Loop: ripple.app ripple.nodestore
ripple.app > ripple.nodestore

Loop: ripple.app ripple.overlay
ripple.overlay ~= ripple.app
ripple.overlay == ripple.app

Loop: ripple.app ripple.peerfinder
ripple.app > ripple.peerfinder
Expand Down
14 changes: 14 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), tx_interval);
}

bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);

auto& entry = emplace(key).first;

return entry.shouldProcessForPeer(
peer, suppressionMap_.clock().now(), interval);
}

int
HashRouter::getFlags(uint256 const& key)
{
Expand Down
28 changes: 28 additions & 0 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,29 @@ class HashRouter
return true;
}

bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) &&
((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}

private:
int flags_ = 0;
std::set<PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};

public:
Expand Down Expand Up @@ -180,6 +196,18 @@ class HashRouter
int& flags,
std::chrono::seconds tx_interval);

/** Determines whether the hashed item should be process for the given peer.
Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval);

/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
Expand Down
47 changes: 45 additions & 2 deletions src/ripple/overlay/impl/PeerSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//==============================================================================

#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/core/JobQueue.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/PeerSet.h>
#include <ripple/protocol/digest.h>

namespace ripple {

Expand Down Expand Up @@ -104,16 +106,57 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);

auto const messageHash = [&]() {
auto const packetBuffer =
packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();
auto const shortHash = to_string(messageHash).substr(0, 6);

// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 5s;

if (peer)
{
peer->send(packet);
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, peer->id(), interval))
{
// Don't merge this as warning. Change it to trace or remove.
JLOG(journal_.warn())
<< "Sending " << protocolMessageName(type) << " message "
<< shortHash << " to [" << peer->id() << "]";
peer->send(packet);
}
else
// Don't merge this as warning. Change it to debug.
JLOG(journal_.warn())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message " << shortHash << " to [" << peer->id() << "]";
return;
}

for (auto id : peers_)
{
if (auto p = app_.overlay().findPeerByShortID(id))
p->send(packet);
{
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, p->id(), interval))
{
// Don't merge this as warning. Change it to trace or remove.
JLOG(journal_.warn())
<< "Sending " << protocolMessageName(type) << " message "
<< shortHash << " to [" << peer->id() << "]";
p->send(packet);
}
else
// Don't merge this as warning. Change it to debug.
JLOG(journal_.warn())
<< "Suppressing sending duplicate "
<< protocolMessageName(type) << " message " << shortHash
<< " to [" << p->id() << "]";
}
}
}

Expand Down

0 comments on commit 2a389d3

Please sign in to comment.