Skip to content

Commit

Permalink
[FOLD] Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Oct 5, 2018
1 parent 17e0d09 commit 05275ee
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/ripple/net/impl/RPCCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ class RPCParser
{ "submit_multisigned", &RPCParser::parseSubmitMultiSigned, 1, 1 },
{ "server_info", &RPCParser::parseServerInfo, 0, 1 },
{ "server_state", &RPCParser::parseServerInfo, 0, 1 },
{ "shards", &RPCParser::parseAsIs, 0, 0 },
{ "crawl_shards", &RPCParser::parseAsIs, 0, 2 },
{ "stop", &RPCParser::parseAsIs, 0, 0 },
{ "transaction_entry", &RPCParser::parseTransactionEntry, 2, 2 },
{ "tx", &RPCParser::parseTx, 1, 2 },
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)

// Update peers with new shard index
protocol::TMShardInfo message;
auto const& publicKey {app_.nodeIdentity().first};
PublicKey const& publicKey {app_.nodeIdentity().first};
message.set_nodepubkey(publicKey.data(), publicKey.size());
message.set_shardindexes(std::to_string(shardIndex));
app_.overlay().foreach(send_always(
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ namespace Resource {
class Charge;
}

// Maximum hops to attempt when crawling shards. cs = crawl shards
static constexpr std::uint32_t csHopLimit = 3;

/** Represents a peer connection in the overlay. */
class Peer
{
Expand Down
12 changes: 8 additions & 4 deletions src/ripple/overlay/impl/OverlayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
{
// Prevent crawl spamming
clock_type::time_point const last(csLast_.load());
if (duration_cast<seconds>(clock_type::now() - last) > 60s)
if ((clock_type::now() - last) > 60s)
{
auto const timeout(seconds((hops * hops) * 10));
std::unique_lock<std::mutex> l {csMutex_};
Expand Down Expand Up @@ -759,7 +759,7 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)

// Combine the shard info from peers and their sub peers
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp>&& peer)
for_each([&](std::shared_ptr<PeerImp>const& peer)
{
if (auto psi = peer->getPeerShardInfo())
{
Expand All @@ -782,7 +782,11 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
auto& pv {av.append(Json::Value(Json::objectValue))};
if (pubKey)
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);
pv[jss::ip] = e.second.endpoint.address().to_string();

auto const& address {e.second.endpoint.address()};
if (!address.is_unspecified())
pv[jss::ip] = address.to_string();

pv[jss::complete_shards] = to_string(e.second.shardIndexes);
}

Expand Down Expand Up @@ -894,7 +898,7 @@ OverlayImpl::crawl()
std::to_string(maxSeq);

if (auto shardIndexes = sp->getShardIndexes())
pv[jss::complete_shards] = to_string(shardIndexes);
pv[jss::complete_shards] = to_string(*shardIndexes);
});

return jv;
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/overlay/impl/OverlayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class OverlayImpl : public Overlay
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};

// Last time we crawled peers for shard info
// Last time we crawled peers for shard info. 'cs' = crawl shards
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
std::mutex csMutex_;
std::condition_variable csCV_;
Expand Down
174 changes: 111 additions & 63 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ bool
PeerImp::hasShard (std::uint32_t shardIndex) const
{
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey_)};
auto const it {shardInfo_.find(publicKey_)};
if (it != shardInfo_.end())
return boost::icl::contains(it->second.shardIndexes, shardIndex);
return false;
Expand Down Expand Up @@ -1019,17 +1019,24 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMCluster> const& m)
void
PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
{
fee_ = Resource::feeMediumBurdenPeer;
if (m->hops() > csHopLimit || m->peerchain_size() > csHopLimit)
{
fee_ = Resource::feeInvalidRequest;
JLOG(p_journal_.warn()) <<
(m->hops() > csHopLimit ?
"Hops (" + std::to_string(m->hops()) + ") exceed limit" :
"Invalid Peerchain");
return;
}

// Reply with shard info we may have
if (auto shardStore = app_.getShardStore())
{
fee_ = Resource::feeLightPeer;
auto shards {shardStore->getCompleteShards()};
if (!shards.empty())
{
protocol::TMShardInfo reply;
auto const& publicKey {app_.nodeIdentity().first};
reply.set_nodepubkey(publicKey.data(), publicKey.size());
reply.set_shardindexes(shards);

if (m->has_lastlink())
Expand All @@ -1041,16 +1048,15 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
send(std::make_shared<Message>(reply, protocol::mtSHARD_INFO));

JLOG(p_journal_.trace()) <<
"Sent shard info to peer with IP " <<
remote_address_.address().to_string() <<
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << shards;
"Sent shard indexes " << shards;
}
}

// Relay request to peers
if (m->hops() > 0)
{
fee_ = Resource::feeMediumBurdenPeer;

m->set_hops(m->hops() - 1);
if (m->hops() == 0)
m->set_lastlink(true);
Expand All @@ -1065,14 +1071,33 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetShardInfo> const& m)
void
PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
{
if (m->shardindexes().empty() || m->peerchain_size() > csHopLimit)
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
(m->shardindexes().empty() ?
"Missing shard indexes" :
"Invalid Peerchain");
return;
}

// Check if the message should be forwarded to another peer
if (m->peerchain_size() > 0)
{
auto const peerId {m->peerchain(m->peerchain_size() - 1)};
if (auto peer = overlay_.findPeerByShortID(peerId))
{
if (!m->has_nodepubkey())
m->set_nodepubkey(publicKey_.data(), publicKey_.size());

if (!m->has_endpoint())
m->set_endpoint(remote_address_.address().to_string());
{
// Check if peer will share IP publicly
if (crawl())
m->set_endpoint(remote_address_.address().to_string());
else
m->set_endpoint("0");
}

m->mutable_peerchain()->RemoveLast();
peer->send(std::make_shared<Message>(*m, protocol::mtSHARD_INFO));
Expand All @@ -1084,81 +1109,104 @@ PeerImp::onMessage(std::shared_ptr <protocol::TMShardInfo> const& m)
else
{
// Peer is no longer available so the relay ends
fee_ = Resource::feeUnwantedData;
JLOG(p_journal_.info()) <<
"Unable to route shard info";
fee_ = Resource::feeUnwantedData;
}
return;
}

// Consume the shard info received
if (m->shardindexes().empty())
// Parse the shard indexes received in the shard info
RangeSet<std::uint32_t> shardIndexes;
{
JLOG(p_journal_.error()) <<
"Node response missing shard indexes";
return;
std::vector<std::string> tokens;
boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.empty() || seqs.size() > 2)
{
fee_ = Resource::feeBadData;
return;
}

std::uint32_t first;
if (!beast::lexicalCastChecked(first, seqs.front()))
{
fee_ = Resource::feeBadData;
return;
}

if (seqs.size() == 1)
shardIndexes.insert(first);
else
{
std::uint32_t second;
if (!beast::lexicalCastChecked(second, seqs.back()))
{
fee_ = Resource::feeBadData;
return;
}
shardIndexes.insert(range(first, second));
}
}
}

// Get the Public key of the node reporting the shard info
PublicKey publicKey;
if (m->has_nodepubkey())
publicKey = PublicKey(makeSlice(m->nodepubkey()));
else
publicKey = publicKey_;

// Get the IP of the node reporting the shard info
beast::IP::Endpoint address;
beast::IP::Endpoint endpoint;
if (m->has_endpoint())
{
auto result {beast::IP::Endpoint::from_string_checked(m->endpoint())};
if (!result.second)
if (m->endpoint() != "0")
{
JLOG(p_journal_.error()) <<
"failed to parse incoming endpoint: {" <<
m->endpoint() << "}";
return;
auto result {
beast::IP::Endpoint::from_string_checked(m->endpoint())};
if (!result.second)
{
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) <<
"failed to parse incoming endpoint: {" <<
m->endpoint() << "}";
return;
}
endpoint = std::move(result.first);
}
address = std::move(result.first);
}
else
address = remote_address_;
else if (crawl()) // Check if peer will share IP publicly
endpoint = remote_address_;

RangeSet<std::uint32_t>* shardIndexes {nullptr};
PublicKey const publicKey(makeSlice(m->nodepubkey()));

std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey)};
if (it != shardInfo_.end())
{
// Update the IP address for the node
it->second.endpoint = address;

// Update the shard indexes held by the node
shardIndexes = &(it->second.shardIndexes);
}
else
{
// Add a new node
ShardInfo shardInfo;
shardInfo.endpoint = address;
shardIndexes = &(shardInfo_.emplace(std::move(publicKey),
std::move(shardInfo)).first->second.shardIndexes);
}
std::lock_guard<std::mutex> l {shardInfoMutex_};
auto it {shardInfo_.find(publicKey)};
if (it != shardInfo_.end())
{
// Update the IP address for the node
it->second.endpoint = std::move(endpoint);

// Parse shard indexes
std::vector<std::string> tokens;
boost::split(tokens, m->shardindexes(), boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
std::vector<std::string> seqs;
boost::split(seqs, t, boost::algorithm::is_any_of("-"));
if (seqs.size() == 1)
shardIndexes->insert(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()));
else if (seqs.size() == 2)
shardIndexes->insert(range(
beast::lexicalCastThrow<std::uint32_t>(seqs.front()),
beast::lexicalCastThrow<std::uint32_t>(seqs.back())));
// Join the shard index range set
it->second.shardIndexes += shardIndexes;
}
else
{
// Add a new node
ShardInfo shardInfo;
shardInfo.endpoint = std::move(endpoint);
shardInfo.shardIndexes = std::move(shardIndexes);
shardInfo_.emplace(publicKey, std::move(shardInfo));
}
}

JLOG(p_journal_.trace()) <<
"Consumed TMShardInfo originating from peer with IP " <<
address.address().to_string() <<
" public key " << toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << to_string(*shardIndexes);
"Consumed TMShardInfo originating from public key " <<
toBase58(TokenType::NodePublic, publicKey) <<
" shard indexes " << m->shardindexes();

if (m->has_lastlink())
overlay_.lastLink(id_);
Expand Down
1 change: 0 additions & 1 deletion src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <deque>
#include <queue>


namespace ripple {

class PeerImp
Expand Down
12 changes: 7 additions & 5 deletions src/ripple/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ enum MessageType
mtPING = 3;
mtPROOFOFWORK = 4;
mtCLUSTER = 5;
mtGET_SHARD_INFO = 10;
mtSHARD_INFO = 11;
mtGET_PEERS = 12;
mtPEERS = 13;
mtENDPOINTS = 15;
Expand All @@ -21,7 +19,11 @@ enum MessageType
mtHAVE_SET = 35;
mtVALIDATION = 41;
mtGET_OBJECTS = 42;
mtGET_SHARD_INFO = 50;
mtSHARD_INFO = 51;

// <available> = 10;
// <available> = 11;
// <available> = 14;
// <available> = 20;
// <available> = 21;
Expand Down Expand Up @@ -138,11 +140,11 @@ message TMGetShardInfo
// Info about shards held
message TMShardInfo
{
required bytes nodePubKey = 1;
required string shardIndexes = 2; // rangeSet of shard indexes
required string shardIndexes = 1; // rangeSet of shard indexes
optional bytes nodePubKey = 2; // The node's public key
optional string endpoint = 3; // ipv6 or ipv4 address
optional bool lastLink = 4; // true if last link in the peer chain
repeated uint32 peerchain = 5;
repeated uint32 peerchain = 5; // List of IDs used to route messages
}

// A transaction can have only one input and one output.
Expand Down
Loading

0 comments on commit 05275ee

Please sign in to comment.