Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC command shard crawl (RIPD-1663) #2697

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,12 +628,6 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
if (auto shardStore = app_.getShardStore())
{
auto shards = shardStore->getCompleteShards();
if (! shards.empty())
s.set_shardseqs(shards);
}
app_.overlay ().foreach (send_always (
std::make_shared <Message> (
s, protocol::mtSTATUS_CHANGE)));
Expand Down
1 change: 1 addition & 0 deletions src/ripple/net/impl/RPCCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,7 @@ class RPCParser
{ "submit_multisigned", &RPCParser::parseSubmitMultiSigned, 1, 1 },
{ "server_info", &RPCParser::parseServerInfo, 0, 1 },
{ "server_state", &RPCParser::parseServerInfo, 0, 1 },
{ "crawl_shards", &RPCParser::parseAsIs, 0, 2 },
{ "stop", &RPCParser::parseAsIs, 0, 0 },
{ "transaction_entry", &RPCParser::parseTransactionEntry, 2, 2 },
{ "tx", &RPCParser::parseTx, 1, 2 },
Expand Down
10 changes: 10 additions & 0 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <ripple/basics/random.h>
#include <ripple/nodestore/DummyScheduler.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/HashPrefix.h>

namespace ripple {
Expand Down Expand Up @@ -513,6 +515,14 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
complete_.emplace(incomplete_->index(), std::move(incomplete_));
incomplete_.reset();
updateStats(l);

// Update peers with new shard index
protocol::TMShardInfo message;
PublicKey const& publicKey {app_.nodeIdentity().first};
message.set_nodepubkey(publicKey.data(), publicKey.size());
message.set_shardindexes(std::to_string(shardIndex));
app_.overlay().foreach(send_always(
std::make_shared<Message>(message, protocol::mtSHARD_INFO)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't arbitrarily send this message out to other peers; if they don't support mtSHARD_INFO it will result in the closing their connection to this server.

We need to know if a peer supports sharding before we send them shard-related messages.

Copy link
Contributor Author

@miguelportilla miguelportilla Oct 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbougalis I can't find the code that disconnects upon receiving an unknown message and I've been unable to reproduce that behavior using the tip of develop. Peerimp::onMessageUnknown is called by invokeProtocolMessage when an unknown message is received but the function is just a stub with a TODO. It seems someone intended on adding the behavior or I did I miss something else entirely? Thanks!

}
}

Expand Down
9 changes: 9 additions & 0 deletions src/ripple/overlay/Overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ class Overlay
virtual std::uint64_t getPeerDisconnect() const = 0;
virtual void incPeerDisconnectCharges() = 0;
virtual std::uint64_t getPeerDisconnectCharges() const = 0;

/** Returns information reported to the crawl shard RPC command.

@param hops the maximum jumps the crawler will attempt.
The number of hops achieved is not guaranteed.
*/
virtual
Json::Value
crawlShards(bool pubKey, std::uint32_t hops) = 0;
};

struct ScoreHasLedger
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ namespace Resource {
class Charge;
}

// Maximum hops to attempt when crawling shards. cs = crawl shards
static constexpr std::uint32_t csHopLimit = 3;

/** Represents a peer connection in the overlay. */
class Peer
{
Expand Down Expand Up @@ -100,7 +103,6 @@ class Peer
virtual bool hasLedger (uint256 const& hash, std::uint32_t seq) const = 0;
virtual void ledgerRange (std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0;
virtual bool hasShard (std::uint32_t shardIndex) const = 0;
virtual std::string getShards() const = 0;
virtual bool hasTxSet (uint256 const& hash) const = 0;
virtual void cycleStatus () = 0;
virtual bool supportsVersion (int version) = 0;
Expand Down
110 changes: 104 additions & 6 deletions src/ripple/overlay/impl/OverlayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,102 @@ OverlayImpl::reportTraffic (
m_traffic.addCount (cat, isInbound, number);
}

Json::Value
OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops)
{
using namespace std::chrono;
using namespace std::chrono_literals;

Json::Value jv(Json::objectValue);
auto const numPeers {size()};
if (numPeers == 0)
return jv;

// If greater than a hop away, we may need to gather or freshen data
if (hops > 0)
{
// Prevent crawl spamming
clock_type::time_point const last(csLast_.load());
if ((clock_type::now() - last) > 60s)
{
auto const timeout(seconds((hops * hops) * 10));
std::unique_lock<std::mutex> l {csMutex_};

// Check if already requested
if (csIDs_.empty())
{
{
std::lock_guard <decltype(mutex_)> lock {mutex_};
for (auto& id : ids_)
csIDs_.emplace(id.first);
}

// Relay request to active peers
protocol::TMGetShardInfo tmGS;
tmGS.set_hops(hops);
foreach(send_always(std::make_shared<Message>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern as above: we can't just send this to every peer we have unless we are prepared to have that connection close if they aren't running a version capable of understanding TMGetShardInfo.

tmGS, protocol::mtGET_SHARD_INFO)));

if (csCV_.wait_for(l, timeout) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
csLast_ = duration_cast<seconds>(
clock_type::now().time_since_epoch());
}
else
csCV_.wait_for(l, timeout);
}
}

// Combine the shard info from peers and their sub peers
hash_map<PublicKey, PeerImp::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp> const& peer)
{
if (auto psi = peer->getPeerShardInfo())
{
for (auto const& e : *psi)
{
auto it {peerShardInfo.find(e.first)};
if (it != peerShardInfo.end())
// The key exists so join the shard indexes.
it->second.shardIndexes += e.second.shardIndexes;
else
peerShardInfo.emplace(std::move(e));
}
}
});

// Prepare json reply
auto& av = jv[jss::peers] = Json::Value(Json::arrayValue);
for (auto const& e : peerShardInfo)
{
auto& pv {av.append(Json::Value(Json::objectValue))};
if (pubKey)
pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first);

auto const& address {e.second.endpoint.address()};
if (!address.is_unspecified())
pv[jss::ip] = address.to_string();

pv[jss::complete_shards] = to_string(e.second.shardIndexes);
}

return jv;
}

void
OverlayImpl::lastLink(std::uint32_t id)
{
// Notify threads when every peer has received a last link.
// This doesn't account for every node that might reply but
// it is adequate.
std::lock_guard<std::mutex> l {csMutex_};
if (csIDs_.erase(id) && csIDs_.empty())
csCV_.notify_all();
}

std::size_t
OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score)
Expand Down Expand Up @@ -787,9 +883,12 @@ OverlayImpl::crawl()
sp->getRemoteAddress().port());
}
}
auto version = sp->getVersion ();
if (! version.empty ())
pv[jss::version] = version;

{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra scope? If it's lifetime management, why not just change to:

if(auto version = sp->getVersion())
    pv[jss::version] = std::move(version);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbougalis a std::string won't convert to a bool. I'm in favor of the extra scope because of the move. As a side note, this would be a good place for c++-17's init-statements for if when we move to 17.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, d'oh.

auto version {sp->getVersion()};
if (!version.empty())
pv[jss::version] = std::move(version);
}

std::uint32_t minSeq, maxSeq;
sp->ledgerRange(minSeq, maxSeq);
Expand All @@ -798,9 +897,8 @@ OverlayImpl::crawl()
std::to_string(minSeq) + "-" +
std::to_string(maxSeq);

auto shards = sp->getShards();
if (! shards.empty())
pv[jss::complete_shards] = shards;
if (auto shardIndexes = sp->getShardIndexes())
pv[jss::complete_shards] = to_string(*shardIndexes);
});

return jv;
Expand Down
34 changes: 27 additions & 7 deletions src/ripple/overlay/impl/OverlayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class OverlayImpl : public Overlay
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};

// Last time we crawled peers for shard info. 'cs' = crawl shards
std::atomic<std::chrono::seconds> csLast_{std::chrono::seconds{0}};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does cs stand for? "Crawled shards"? I'm OK with the member name, but I'd like a quick comment to define what cs is meant to be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is a duration and not a time_point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::atomic doesn't play well time_point. I can't remember the reason why. It may have been that atomic requires trivially copyable types, and time_point isn't trivially copyable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paging @HowardHinnant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember this happening, and I don't recall the reason either. One can always load the duration into a time_point, do the computations, and then extract the duration from the time_point for storing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HowardHinnant Yeah, that is the solution I went with.

std::mutex csMutex_;
std::condition_variable csCV_;
// Peer IDs expecting to receive a last link notification
std::set<std::uint32_t> csIDs_;

//--------------------------------------------------------------------------

public:
Expand Down Expand Up @@ -221,15 +228,17 @@ class OverlayImpl : public Overlay
void
for_each (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);

// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
std::vector<std::weak_ptr<PeerImp>> wp;
wp.reserve(ids_.size());
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change has be carefully audited to make sure callers aren't expecting the lock to be held. I expect this is an OK change, but we can't push until we audit. I'll do so when I do another pass of the code, but I wanted to make a note so others can audit as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean as the caller's function is called. Good point, requires an audit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miguelportilla Yes, that's what I mean - the parameter f in for_each (UnaryFunc&& f).

I just audited this and the change looks good to me. However, I think it's important enough that at least one other reviewer should also confirm that this change is safe.


for (auto& x : ids_)
wp.push_back(x.second);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
wp.reserve(ids_.size());

for (auto& x : ids_)
wp.push_back(x.second);
}

for (auto& w : wp)
{
Expand Down Expand Up @@ -340,6 +349,17 @@ class OverlayImpl : public Overlay
return peerDisconnectsCharges_;
}

Json::Value
crawlShards(bool pubKey, std::uint32_t hops) override;


/** Called when the last link from a peer chain is received.

@param id peer id that received the shard info.
*/
void
lastLink(std::uint32_t id);

private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
Expand Down
Loading