Skip to content

Commit

Permalink
[FOLD] Address feedback from Peng Wang
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Feb 10, 2021
1 parent 47605c0 commit 30a21be
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 19 deletions.
17 changes: 16 additions & 1 deletion src/ripple/nodestore/ShardInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ class ShardInfo
};

public:
[[nodiscard]] NetClock::time_point const&
msgTimestamp() const
{
return msgTimestamp_;
}

void
setMsgTimestamp(NetClock::time_point const& timestamp)
{
msgTimestamp_ = timestamp;
}

[[nodiscard]] std::string
finalizedToString() const;

Expand Down Expand Up @@ -79,14 +91,17 @@ class ShardInfo
std::uint32_t percentProgress);

[[nodiscard]] protocol::TMPeerShardInfoV2
makeMessage(Application& app) const;
makeMessage(Application& app);

private:
// Finalized immutable shards
RangeSet<std::uint32_t> finalized_;

// Incomplete shards being acquired or finalized
std::map<std::uint32_t, Incomplete> incomplete_;

// Message creation time
NetClock::time_point msgTimestamp_;
};

} // namespace NodeStore
Expand Down
10 changes: 9 additions & 1 deletion src/ripple/nodestore/impl/ShardInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ ShardInfo::update(
}

protocol::TMPeerShardInfoV2
ShardInfo::makeMessage(Application& app) const
ShardInfo::makeMessage(Application& app)
{
protocol::TMPeerShardInfoV2 message;
Serializer s;
s.add32(HashPrefix::shardInfo);

// Set the message creation time
msgTimestamp_ = app.timeKeeper().now();
{
auto const timestamp{msgTimestamp_.time_since_epoch().count()};
message.set_timestamp(timestamp);
s.add32(timestamp);
}

if (!incomplete_.empty())
{
message.mutable_incomplete()->Reserve(incomplete_.size());
Expand Down
8 changes: 7 additions & 1 deletion src/ripple/overlay/impl/OverlayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,13 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays)
for_each([&](std::shared_ptr<PeerImp>&& peer) {
auto const& psi{peer->getPeerShardInfos()};
for (auto const& [publicKey, shardInfo] : psi)
peerShardInfo.emplace(publicKey, shardInfo);
{
auto const it{peerShardInfo.find(publicKey)};
if (it == peerShardInfo.end())
peerShardInfo.emplace(publicKey, shardInfo);
else if (shardInfo.msgTimestamp() > it->second.msgTimestamp())
it->second = shardInfo;
}
});

// Add shard info to json result
Expand Down
51 changes: 40 additions & 11 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1199,11 +1199,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
// Find the earliest and latest shard indexes
auto const& db{app_.getNodeStore()};
auto const earliestShardIndex{db.earliestShardIndex()};
auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()};
boost::optional<std::uint32_t> latestShardIndex;

if (curLedgerSeq >= db.earliestLedgerSeq())
latestShardIndex = db.seqToShardIndex(curLedgerSeq);
auto const latestShardIndex{[&]() -> std::optional<std::uint32_t> {
auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()};
if (curLedgerSeq >= db.earliestLedgerSeq())
return db.seqToShardIndex(curLedgerSeq);
return std::nullopt;
}()};

auto badData = [&](std::string msg) {
fee_ = Resource::feeBadData;
Expand All @@ -1214,12 +1215,29 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
Serializer s;
s.add32(HashPrefix::shardInfo);

// Verify incomplete shards
// Verify message creation time
NodeStore::ShardInfo shardInfo;
{
auto const timestamp{
NetClock::time_point{std::chrono::seconds{m->timestamp()}}};
auto const now{app_.timeKeeper().now()};
if (timestamp > now)
return badData("Invalid timestamp");

// Check if stale
using namespace std::chrono_literals;
if (timestamp < (now - 5min))
return badData("Stale timestamp");

s.add32(m->timestamp());
shardInfo.setMsgTimestamp(timestamp);
}

// Verify incomplete shards
auto const numIncomplete{m->incomplete_size()};
if (numIncomplete > 0)
{
if (numIncomplete > curLedgerSeq)
if (latestShardIndex && numIncomplete > *latestShardIndex)
return badData("Invalid number of incomplete shards");

// Verify each incomplete shard
Expand Down Expand Up @@ -1280,14 +1298,21 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
return badData("Invalid finalized shard indexes");

auto const& finalized{shardInfo.finalized()};
if (boost::icl::length(finalized) == 0 ||
auto const numFinalized{boost::icl::length(finalized)};
if (numFinalized == 0 ||
boost::icl::first(finalized) < earliestShardIndex ||
(latestShardIndex &&
boost::icl::last(finalized) > latestShardIndex))
{
return badData("Invalid finalized shard indexes");
}

if (latestShardIndex &&
(numFinalized + numIncomplete) > *latestShardIndex)
{
return badData("Invalid number of finalized and incomplete shards");
}

s.addRaw(str.data(), str.size());
}

Expand Down Expand Up @@ -1338,7 +1363,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)
{
m->mutable_peerchain()->RemoveLast();
peer->send(
std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO));
std::make_shared<Message>(*m, protocol::mtPEER_SHARD_INFO_V2));
JLOG(p_journal_.trace())
<< "Relayed TMPeerShardInfoV2 from peer IP "
<< remote_address_.address().to_string() << " to peer IP "
Expand All @@ -1361,8 +1386,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPeerShardInfoV2> const& m)

// Consume the message
{
std::lock_guard l{shardInfoMutex_};
shardInfos_.insert_or_assign(publicKey, std::move(shardInfo));
std::lock_guard lock{shardInfoMutex_};
auto const it{shardInfos_.find(publicKey_)};
if (it == shardInfos_.end())
shardInfos_.emplace(publicKey, std::move(shardInfo));
else if (shardInfo.msgTimestamp() > it->second.msgTimestamp())
it->second = std::move(shardInfo);
}

// Notify overlay a reply was received from the last peer in this chain
Expand Down
13 changes: 8 additions & 5 deletions src/ripple/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,23 @@ message TMPeerShardInfoV2
optional uint32 progress = 3;
}

// Message creation time
required uint32 timestamp = 1;

// Incomplete shards being acquired or verified
repeated TMIncomplete incomplete = 1;
repeated TMIncomplete incomplete = 2;

// Verified immutable shards (RangeSet)
optional string finalized = 2;
optional string finalized = 3;

// Public key of node that authored the shard info
required bytes publicKey = 3;
required bytes publicKey = 4;

// Digital signature of node that authored the shard info
required bytes signature = 4;
required bytes signature = 5;

// Peer public keys used to route messages
repeated TMPublicKey peerChain = 5;
repeated TMPublicKey peerChain = 6;
}

// A transaction can have only one input and one output.
Expand Down

0 comments on commit 30a21be

Please sign in to comment.