Skip to content

Commit

Permalink
[FOLD] Address feedback from Scott Schurr
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Feb 19, 2021
1 parent 30a21be commit 57415a7
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 134 deletions.
1 change: 1 addition & 0 deletions src/ripple/basics/RangeSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ from_string(RangeSet<T>& rs, std::string const& s)
std::vector<std::string> tokens;
bool result{true};

rs.clear();
boost::split(tokens, s, boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class DatabaseShard : public Database
@return Information about shards held by this node
*/
[[nodiscard]] virtual std::unique_ptr<ShardInfo>
getShardInfo() = 0;
getShardInfo() const = 0;

/** Returns the root database directory
*/
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Database::Database(
DEFAULT_LEDGERS_PER_SHARD))
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_(seqToShardIndex(earliestLedgerSeq_))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
{
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>("Invalid ledgers_per_shard");
Expand Down
139 changes: 59 additions & 80 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ DatabaseShardImp::init()
if (!app_.config().standalone() && !historicalPaths_.empty())
{
// Check historical paths for duplicated file systems
if (!checkHistoricalPaths())
if (!checkHistoricalPaths(lock))
return false;
}

Expand Down Expand Up @@ -294,11 +294,9 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
std::lock_guard lock(mutex_);
shards_.emplace(*shardIndex, std::move(shard));
acquireIndex_ = *shardIndex;
updatePeers(lock);
}

// Update peers with shard info
updatePeers();

return ledgerSeq;
}

Expand Down Expand Up @@ -403,9 +401,7 @@ DatabaseShardImp::prepareShards(std::vector<std::uint32_t> const& shardIndexes)
for (auto const shardIndex : shardIndexes)
preparedIndexes_.emplace(shardIndex);

// Update peers with shard info
updatePeers();

updatePeers(lock);
return true;
}

Expand All @@ -416,10 +412,7 @@ DatabaseShardImp::removePreShard(std::uint32_t shardIndex)
assert(init_);

if (preparedIndexes_.erase(shardIndex))
{
// Update peers with shard info
updatePeers();
}
updatePeers(lock);
}

std::string
Expand Down Expand Up @@ -451,10 +444,7 @@ DatabaseShardImp::importShard(

// Remove the failed import shard index so it can be retried
preparedIndexes_.erase(shardIndex);

// Update peers with shard info
updatePeers();

updatePeers(lock);
return false;
};

Expand Down Expand Up @@ -698,35 +688,10 @@ DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
}

std::unique_ptr<ShardInfo>
DatabaseShardImp::getShardInfo()
DatabaseShardImp::getShardInfo() const
{
std::vector<std::weak_ptr<Shard>> shards;
std::set<std::uint32_t> preparedIndexes;

{
std::lock_guard lock(mutex_);

shards.reserve(shards_.size());
for (auto const& e : shards_)
shards.push_back(e.second);

preparedIndexes = preparedIndexes_;
}

auto shardInfo{std::make_unique<ShardInfo>()};
for (auto const& weak : shards)
{
if (auto const shard = weak.lock())
{
shardInfo->update(
shard->index(), shard->getState(), shard->getProgress());
}
}

for (auto const shardIndex : preparedIndexes)
shardInfo->update(shardIndex, ShardState::queued, 0);

return shardInfo;
std::lock_guard lock(mutex_);
return getShardInfo(lock);
}

void
Expand Down Expand Up @@ -1330,43 +1295,43 @@ DatabaseShardImp::finalizeShard(
return;

auto const boundaryIndex{shardBoundaryIndex()};
if (shard->index() < boundaryIndex)
{
// This is a historical shard
if (!historicalPaths_.empty() &&
shard->getDir().parent_path() == dir_)
{
// Shard wasn't placed at a separate historical path
JLOG(j_.warn()) << "shard " << shard->index()
<< " is not stored at a historical path";
}
}
else
{
// Not a historical shard. Shift recent shards if necessary
assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
std::lock_guard lock(mutex_);

relocateOutdatedShards(lock);

auto& recentShard = shard->index() == boundaryIndex
? secondLatestShardIndex_
: latestShardIndex_;
if (shard->index() < boundaryIndex)
{
// This is a historical shard
if (!historicalPaths_.empty() &&
shard->getDir().parent_path() == dir_)
{
// Shard wasn't placed at a separate historical path
JLOG(j_.warn()) << "shard " << shard->index()
<< " is not stored at a historical path";
}
}
else
{
// Not a historical shard. Shift recent shards if necessary
assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
relocateOutdatedShards(lock);

// Set the appropriate recent shard index
recentShard = shard->index();
// Set the appropriate recent shard index
if (boundaryIndex == shard->index())
secondLatestShardIndex_ = boundaryIndex;
else
latestShardIndex_ = boundaryIndex;

if (shard->getDir().parent_path() != dir_)
{
JLOG(j_.warn()) << "shard " << shard->index()
<< " is not stored at the path";
if (shard->getDir().parent_path() != dir_)
{
JLOG(j_.warn()) << "shard " << shard->index()
<< " is not stored at the path";
}
}

updatePeers(lock);
}

updateFileStats();

// Update peers with shard info
updatePeers();
});
}

Expand Down Expand Up @@ -1788,7 +1753,7 @@ DatabaseShardImp::chooseHistoricalPath(std::lock_guard<std::mutex> const&) const
}

bool
DatabaseShardImp::checkHistoricalPaths() const
DatabaseShardImp::checkHistoricalPaths(std::lock_guard<std::mutex> const&) const
{
#if BOOST_OS_LINUX
// Each historical shard path must correspond
Expand Down Expand Up @@ -1868,18 +1833,32 @@ DatabaseShardImp::checkHistoricalPaths() const
return true;
}

void
DatabaseShardImp::updatePeers()
std::unique_ptr<ShardInfo>
DatabaseShardImp::getShardInfo(std::lock_guard<std::mutex> const&) const
{
if (app_.config().standalone() ||
app_.getOPs().getOperatingMode() == OperatingMode::DISCONNECTED)
auto shardInfo{std::make_unique<ShardInfo>()};
for (auto const& [_, shard] : shards_)
{
return;
shardInfo->update(
shard->index(), shard->getState(), shard->getPercentProgress());
}

auto const message{getShardInfo()->makeMessage(app_)};
app_.overlay().foreach(send_always(
std::make_shared<Message>(message, protocol::mtPEER_SHARD_INFO_V2)));
for (auto const shardIndex : preparedIndexes_)
shardInfo->update(shardIndex, ShardState::queued, 0);

return shardInfo;
}

void
DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
{
if (!app_.config().standalone() &&
app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED)
{
auto const message{getShardInfo(lock)->makeMessage(app_)};
app_.overlay().foreach(send_always(std::make_shared<Message>(
message, protocol::mtPEER_SHARD_INFO_V2)));
}
}

//------------------------------------------------------------------------------
Expand Down
11 changes: 7 additions & 4 deletions src/ripple/nodestore/impl/DatabaseShardImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.`
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

Expand Down Expand Up @@ -74,7 +74,7 @@ class DatabaseShardImp : public DatabaseShard
setStored(std::shared_ptr<Ledger const> const& ledger) override;

std::unique_ptr<ShardInfo>
getShardInfo() override;
getShardInfo() const override;

boost::filesystem::path const&
getRootDir() const override
Expand Down Expand Up @@ -278,11 +278,14 @@ class DatabaseShardImp : public DatabaseShard
chooseHistoricalPath(std::lock_guard<std::mutex> const&) const;

bool
checkHistoricalPaths() const;
checkHistoricalPaths(std::lock_guard<std::mutex> const&) const;

std::unique_ptr<ShardInfo>
getShardInfo(std::lock_guard<std::mutex> const&) const;

// Update peers with the status of every complete and incomplete shard
void
updatePeers();
updatePeers(std::lock_guard<std::mutex> const& lock) const;
};

} // namespace NodeStore
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/impl/Shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class Shard final
the current state of the shard is.
*/
[[nodiscard]] std::uint32_t
getProgress() const noexcept
getPercentProgress() const noexcept
{
return calculatePercent(progress_, maxLedgers_);
}
Expand Down
46 changes: 24 additions & 22 deletions src/ripple/overlay/impl/OverlayImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,6 @@ Json::Value
OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays)
{
using namespace std::chrono;
using namespace std::chrono_literals;

Json::Value jv(Json::objectValue);

Expand All @@ -748,33 +747,36 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays)
if (relays == 0 || size() == 0)
return jv;

// Wait if a request is in progress
std::unique_lock<std::mutex> lock{csMutex_};
if (!csIDs_.empty())
csCV_.wait(lock);

{
std::lock_guard lock{mutex_};
for (auto const& id : ids_)
csIDs_.emplace(id.first);
}
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(relays);

// Request peer shard info
protocol::TMGetPeerShardInfoV2 tmGPS;
tmGPS.set_relays(relays);
foreach(send_always(
std::make_shared<Message>(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)));
// Wait if a request is in progress
std::unique_lock<std::mutex> csLock{csMutex_};
if (!csIDs_.empty())
csCV_.wait(csLock);

if (csCV_.wait_for(lock, seconds(60)) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
{
std::lock_guard lock{mutex_};
for (auto const& id : ids_)
csIDs_.emplace(id.first);
}

// Request peer shard info
foreach(send_always(std::make_shared<Message>(
tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)));

if (csCV_.wait_for(csLock, seconds(60)) == std::cv_status::timeout)
{
csIDs_.clear();
csCV_.notify_all();
}
}

// Combine shard info from peers
hash_map<PublicKey, NodeStore::ShardInfo> peerShardInfo;
for_each([&](std::shared_ptr<PeerImp>&& peer) {
auto const& psi{peer->getPeerShardInfos()};
auto const psi{peer->getPeerShardInfos()};
for (auto const& [publicKey, shardInfo] : psi)
{
auto const it{peerShardInfo.find(publicKey)};
Expand Down Expand Up @@ -812,7 +814,7 @@ void
OverlayImpl::endOfPeerChain(std::uint32_t id)
{
// Notify threads if all peers have received a reply from all peer chains
std::lock_guard lock{csMutex_};
std::lock_guard csLock{csMutex_};
csIDs_.erase(id);
if (csIDs_.empty())
csCV_.notify_all();
Expand Down Expand Up @@ -876,7 +878,7 @@ OverlayImpl::getOverlayInfo()
pv[jss::complete_ledgers] =
std::to_string(minSeq) + "-" + std::to_string(maxSeq);

auto const& peerShardInfos{sp->getPeerShardInfos()};
auto const peerShardInfos{sp->getPeerShardInfos()};
auto const it{peerShardInfos.find(sp->getNodePublic())};
if (it != peerShardInfos.end())
{
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ PeerImp::fail(std::string const& name, error_code ec)
close();
}

hash_map<PublicKey, NodeStore::ShardInfo> const&
hash_map<PublicKey, NodeStore::ShardInfo> const
PeerImp::getPeerShardInfos() const
{
std::lock_guard l{shardInfoMutex_};
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class PeerImp : public Peer,
fail(std::string const& reason);

// Return any known shard info from this peer and its sub peers
[[nodiscard]] hash_map<PublicKey, NodeStore::ShardInfo> const&
[[nodiscard]] hash_map<PublicKey, NodeStore::ShardInfo> const
getPeerShardInfos() const;

bool
Expand Down
Loading

0 comments on commit 57415a7

Please sign in to comment.