Skip to content

Commit

Permalink
[WIP] Toying with the idea of VL exchange
Browse files Browse the repository at this point in the history
* Log Validator Lists received from peers
  • Loading branch information
ximinez committed Aug 9, 2024
1 parent c19a88f commit 1f7d137
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 19 deletions.
16 changes: 14 additions & 2 deletions src/xrpld/app/misc/ValidatorList.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class ValidatorList
static std::vector<ValidatorBlobInfo>
parseBlobs(protocol::TMValidatorListCollection const& body);

static void
static std::optional<std::string>
sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
Expand All @@ -371,6 +371,18 @@ class ValidatorList
HashRouter& hashRouter,
beast::Journal j);

std::tuple<
std::string,
std::uint32_t,
std::map<std::size_t, ValidatorBlobInfo>,
uint256>
sendLatestValidatorLists(
Peer& peer,
std::uint64_t peerSequence,
PublicKey const& publisherKey,
HashRouter& hashRouter,
beast::Journal j) const;

[[nodiscard]] static std::pair<std::size_t, std::size_t>
buildValidatorListMessages(
std::size_t messageVersion,
Expand Down Expand Up @@ -804,7 +816,7 @@ class ValidatorList
HashRouter& hashRouter,
beast::Journal j);

static void
static std::optional<std::string>
sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
Expand Down
68 changes: 63 additions & 5 deletions src/xrpld/app/misc/detail/ValidatorList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,61 @@ ValidatorList::buildValidatorListMessages(
return {0, 0};
}

std::tuple<
std::string,
std::uint32_t,
std::map<std::size_t, ValidatorBlobInfo>,
uint256>
ValidatorList::sendLatestValidatorLists(
Peer& peer,
std::uint64_t peerSequence,
PublicKey const& publisherKey,
HashRouter& hashRouter,
beast::Journal j) const
{
std::vector<ValidatorList::MessageWithHash> messages;
std::map<std::size_t, ValidatorBlobInfo> blobInfos;

if (publisherLists_.count(publisherKey) == 0)
return {};
ValidatorList::PublisherListCollection const& lists =
publisherLists_.at(publisherKey);

auto const maxSequence = lists.current.sequence;
assert(
lists.current.sequence == maxSequence ||
lists.remaining.count(maxSequence) == 1);

if (peerSequence < maxSequence)
{
buildBlobInfos(blobInfos, lists);
sendValidatorList(
peer,
peerSequence,
publisherKey,
maxSequence,
lists.rawVersion,
lists.rawManifest,
blobInfos,
messages,
hashRouter,
j);

// Suppress the messages so they'll be ignored next time.
uint256 lasthash;
for (auto const& m : messages)
{
lasthash = m.hash;
hashRouter.addSuppressionPeer(lasthash, peer.id());
}
return std::make_tuple(
lists.rawManifest, lists.rawVersion, blobInfos, lasthash);
}
return {};
}

// static
void
std::optional<std::string>
ValidatorList::sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
Expand All @@ -694,7 +747,7 @@ ValidatorList::sendValidatorList(
: peer.supportsFeature(ProtocolFeature::ValidatorListPropagation) ? 1
: 0;
if (!messageVersion)
return;
return {};
auto const [newPeerSequence, numVLs] = buildValidatorListMessages(
messageVersion,
peerSequence,
Expand Down Expand Up @@ -725,6 +778,7 @@ ValidatorList::sendValidatorList(
if (sent)
{
if (messageVersion > 1)
{
JLOG(j.debug())
<< "Sent " << messages.size()
<< " validator list collection(s) containing " << numVLs
Expand All @@ -733,6 +787,8 @@ ValidatorList::sendValidatorList(
<< newPeerSequence << " to "
<< peer.getRemoteAddress().to_string() << " [" << peer.id()
<< "]";
return "ValidatorListCollection";
}
else
{
assert(numVLs == 1);
Expand All @@ -741,13 +797,15 @@ ValidatorList::sendValidatorList(
<< " with sequence " << newPeerSequence << " to "
<< peer.getRemoteAddress().to_string() << " [" << peer.id()
<< "]";
return "ValidatorList";
}
}
}
return {};
}

// static
void
std::optional<std::string>
ValidatorList::sendValidatorList(
Peer& peer,
std::uint64_t peerSequence,
Expand All @@ -760,7 +818,7 @@ ValidatorList::sendValidatorList(
beast::Journal j)
{
std::vector<ValidatorList::MessageWithHash> messages;
sendValidatorList(
return sendValidatorList(
peer,
peerSequence,
publisherKey,
Expand Down Expand Up @@ -831,7 +889,7 @@ ValidatorList::broadcastBlobs(
std::map<std::size_t, ValidatorBlobInfo> blobInfos;

assert(
lists.current.sequence == maxSequence ||
lists.current.sequence <= maxSequence ||
lists.remaining.count(maxSequence) == 1);
// Can't use overlay.foreach here because we need to modify
// the peer, and foreach provides a const&
Expand Down
83 changes: 71 additions & 12 deletions src/xrpld/overlay/detail/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,46 @@ PeerImp::domain() const

// Protocol logic

void
logVLBlob(beast::Journal j, ValidatorBlobInfo const& blob, std::size_t count)
{
auto const stream = j.trace();
JLOG(stream) << "Blob " << count << " Signature: " << blob.signature;
JLOG(stream) << "Blob " << count << " blob: " << base64_decode(blob.blob);
JLOG(stream) << "Blob " << count << " manifest: "
<< (blob.manifest ? base64_decode(*blob.manifest) : "NONE");
}

void
logVLBlob(
beast::Journal j,
std::pair<std::size_t, ValidatorBlobInfo> const& blob,
std::size_t count)
{
logVLBlob(j, blob.second, count);
}

template <class TBlobs>
void
logVL(
beast::Journal j,
std::string const& manifest,
std::uint32_t version,
TBlobs const& blobs,
uint256 const& hash)
{
auto const stream = j.trace();
JLOG(stream) << "Manifest: " << manifest;
JLOG(stream) << "Version: " << version;
JLOG(stream) << "Hash: " << hash;
std::size_t count = 1;
for (auto const& blob : blobs)
{
logVLBlob(j, blob, count);
++count;
}
}

void
PeerImp::doProtocolStart()
{
Expand Down Expand Up @@ -1964,6 +2004,8 @@ PeerImp::onValidatorListMessage(
return;
}

logVL(p_journal_, manifest, version, blobs, hash);

auto const applyResult = app_.validators().applyListsAndBroadcast(
manifest,
version,
Expand Down Expand Up @@ -1994,14 +2036,8 @@ PeerImp::onValidatorListMessage(

assert(applyResult.publisherKey);
auto const& pubKey = *applyResult.publisherKey;
#ifndef NDEBUG
if (auto const iter = publisherListSequences_.find(pubKey);
iter != publisherListSequences_.end())
{
assert(iter->second < applyResult.sequence);
}
#endif
publisherListSequences_[pubKey] = applyResult.sequence;
if (publisherListSequences_[pubKey] < applyResult.sequence)
publisherListSequences_[pubKey] = applyResult.sequence;
}
break;
case ListDisposition::same_sequence:
Expand All @@ -2016,8 +2052,31 @@ PeerImp::onValidatorListMessage(
}
#endif // !NDEBUG

[[fallthrough]];
case ListDisposition::stale: {
auto const [pubKey, currentPeerSeq] = [&]() {
std::lock_guard<std::mutex> sl(recentLock_);
auto const& pubKey = *applyResult.publisherKey;
auto const& current = publisherListSequences_[pubKey];
assert(applyResult.sequence && applyResult.publisherKey);
assert(current <= applyResult.sequence);
return std::make_pair(
pubKey, current ? current : applyResult.sequence);
}();
if (currentPeerSeq <= applyResult.sequence)
{
auto const [sentmanifest, sentversion, sentblobs, senthash] =
app_.validators().sendLatestValidatorLists(
*this,
currentPeerSeq,
pubKey,
app_.getHashRouter(),
p_journal_);
logVL(
p_journal_, sentmanifest, sentversion, sentblobs, senthash);
}
}
break;
case ListDisposition::stale:
case ListDisposition::untrusted:
case ListDisposition::invalid:
case ListDisposition::unsupported_version:
Expand Down Expand Up @@ -2101,7 +2160,7 @@ PeerImp::onValidatorListMessage(
break;
case ListDisposition::stale:
JLOG(p_journal_.warn())
<< "Ignored " << count << "stale " << messageType
<< "Ignored " << count << " stale " << messageType
<< "(s) from peer " << remote_address_;
break;
case ListDisposition::untrusted:
Expand All @@ -2111,12 +2170,12 @@ PeerImp::onValidatorListMessage(
break;
case ListDisposition::unsupported_version:
JLOG(p_journal_.warn())
<< "Ignored " << count << "unsupported version "
<< "Ignored " << count << " unsupported version "
<< messageType << "(s) from peer " << remote_address_;
break;
case ListDisposition::invalid:
JLOG(p_journal_.warn())
<< "Ignored " << count << "invalid " << messageType
<< "Ignored " << count << " invalid " << messageType
<< "(s) from peer " << remote_address_;
break;
default:
Expand Down

0 comments on commit 1f7d137

Please sign in to comment.