From 05b009898edb15445b69b8b7292afbdc7b81455a Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 9 Oct 2020 17:14:51 -0400 Subject: [PATCH] Extend peer shard info --- Builds/CMake/RippledCore.cmake | 1 + Builds/levelization/results/loops.txt | 2 +- src/ripple/basics/RangeSet.h | 18 +- src/ripple/nodestore/Database.h | 92 ++- src/ripple/nodestore/DatabaseShard.h | 72 +- src/ripple/nodestore/ShardInfo.h | 122 ++++ src/ripple/nodestore/Types.h | 10 + src/ripple/nodestore/impl/Database.cpp | 21 + .../nodestore/impl/DatabaseShardImp.cpp | 493 ++++++------- src/ripple/nodestore/impl/DatabaseShardImp.h | 75 +- src/ripple/nodestore/impl/Shard.cpp | 188 ++--- src/ripple/nodestore/impl/Shard.h | 69 +- src/ripple/nodestore/impl/ShardInfo.cpp | 138 ++++ src/ripple/nodestore/impl/TaskQueue.cpp | 14 +- src/ripple/nodestore/impl/TaskQueue.h | 8 +- src/ripple/overlay/Overlay.h | 3 +- src/ripple/overlay/Peer.h | 6 +- src/ripple/overlay/impl/Message.cpp | 4 +- src/ripple/overlay/impl/OverlayImpl.cpp | 145 ++-- src/ripple/overlay/impl/OverlayImpl.h | 9 +- src/ripple/overlay/impl/PeerImp.cpp | 405 ++++++----- src/ripple/overlay/impl/PeerImp.h | 31 +- src/ripple/overlay/impl/ProtocolMessage.h | 48 +- src/ripple/overlay/impl/TrafficCount.cpp | 8 +- src/ripple/proto/ripple.proto | 117 +-- src/ripple/protocol/HashPrefix.h | 3 + src/ripple/protocol/SystemParameters.h | 5 +- src/ripple/protocol/jss.h | 1 + src/ripple/rpc/handlers/CrawlShards.cpp | 33 +- src/test/app/LedgerReplay_test.cpp | 5 - src/test/basics/RangeSet_test.cpp | 6 - src/test/nodestore/DatabaseShard_test.cpp | 677 +++++++++++------- src/test/nodestore/Database_test.cpp | 52 +- src/test/overlay/reduce_relay_test.cpp | 5 - src/test/rpc/ShardArchiveHandler_test.cpp | 121 ++-- 35 files changed, 1779 insertions(+), 1228 deletions(-) create mode 100644 src/ripple/nodestore/ShardInfo.h create mode 100644 src/ripple/nodestore/impl/ShardInfo.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index fd8f69177cc..cf0f0c1f441 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -515,6 +515,7 @@ target_sources (rippled PRIVATE src/ripple/nodestore/impl/ManagerImp.cpp src/ripple/nodestore/impl/NodeObject.cpp src/ripple/nodestore/impl/Shard.cpp + src/ripple/nodestore/impl/ShardInfo.cpp src/ripple/nodestore/impl/TaskQueue.cpp #[===============================[ main sources: diff --git a/Builds/levelization/results/loops.txt b/Builds/levelization/results/loops.txt index c4589932bd9..995ac026dc9 100644 --- a/Builds/levelization/results/loops.txt +++ b/Builds/levelization/results/loops.txt @@ -38,7 +38,7 @@ Loop: ripple.net ripple.rpc ripple.rpc > ripple.net Loop: ripple.nodestore ripple.overlay - ripple.overlay == ripple.nodestore + ripple.overlay ~= ripple.nodestore Loop: ripple.overlay ripple.rpc ripple.rpc ~= ripple.overlay diff --git a/src/ripple/basics/RangeSet.h b/src/ripple/basics/RangeSet.h index 994bbf4abd1..e003a229af0 100644 --- a/src/ripple/basics/RangeSet.h +++ b/src/ripple/basics/RangeSet.h @@ -98,18 +98,15 @@ template std::string to_string(RangeSet const& rs) { - using ripple::to_string; - if (rs.empty()) return "empty"; - std::string res = ""; + + std::string s; for (auto const& interval : rs) - { - if (!res.empty()) - res += ","; - res += to_string(interval); - } - return res; + s += ripple::to_string(interval) + ","; + s.pop_back(); + + return s; } /** Convert the given styled string to a RangeSet. @@ -122,13 +119,14 @@ to_string(RangeSet const& rs) @return True on successfully converting styled string */ template -bool +[[nodiscard]] bool from_string(RangeSet& rs, std::string const& s) { std::vector intervals; std::vector tokens; bool result{true}; + rs.clear(); boost::split(tokens, s, boost::algorithm::is_any_of(",")); for (auto const& t : tokens) { diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 25c4702e3fe..44d40a55232 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -226,14 +226,79 @@ class Database : public Stoppable void onChildrenStopped() override; + /** @return The maximum number of ledgers stored in a shard + */ + [[nodiscard]] std::uint32_t + ledgersPerShard() const noexcept + { + return ledgersPerShard_; + } + /** @return The earliest ledger sequence allowed */ - std::uint32_t - earliestLedgerSeq() const + [[nodiscard]] std::uint32_t + earliestLedgerSeq() const noexcept { return earliestLedgerSeq_; } + /** @return The earliest shard index + */ + [[nodiscard]] std::uint32_t + earliestShardIndex() const noexcept + { + return earliestShardIndex_; + } + + /** Calculates the first ledger sequence for a given shard index + + @param shardIndex The shard index considered + @return The first ledger sequence pertaining to the shard index + */ + [[nodiscard]] std::uint32_t + firstLedgerSeq(std::uint32_t shardIndex) const noexcept + { + assert(shardIndex >= earliestShardIndex_); + if (shardIndex <= earliestShardIndex_) + return earliestLedgerSeq_; + return 1 + (shardIndex * ledgersPerShard_); + } + + /** Calculates the last ledger sequence for a given shard index + + @param shardIndex The shard index considered + @return The last ledger sequence pertaining to the shard index + */ + [[nodiscard]] std::uint32_t + lastLedgerSeq(std::uint32_t shardIndex) const noexcept + { + assert(shardIndex >= earliestShardIndex_); + return (shardIndex + 1) * ledgersPerShard_; + } + + /** Calculates the shard index for a given ledger sequence + + @param ledgerSeq ledger sequence + @return The shard index of the ledger sequence + */ + [[nodiscard]] std::uint32_t + seqToShardIndex(std::uint32_t ledgerSeq) const noexcept + { + assert(ledgerSeq >= earliestLedgerSeq_); + return (ledgerSeq - 1) / ledgersPerShard_; + } + + /** Calculates the maximum ledgers for a given shard index + + @param shardIndex The shard index considered + @return The maximum ledgers pertaining to the shard index + + @note The earliest shard may store less if the earliest ledger + sequence truncates its beginning + */ + [[nodiscard]] std::uint32_t + maxLedgers(std::uint32_t shardIndex) const noexcept; + protected: beast::Journal const j_; Scheduler& scheduler_; @@ -242,6 +307,25 @@ class Database : public Stoppable std::atomic fetchHitCount_{0}; std::atomic fetchSz_{0}; + // The default is DEFAULT_LEDGERS_PER_SHARD (16384) to match the XRP ledger + // network. Can be set through the configuration file using the + // 'ledgers_per_shard' field under the 'node_db' and 'shard_db' stanzas. + // If specified, the value must be a multiple of 256 and equally assigned + // in both stanzas. Only unit tests or alternate networks should change + // this value. + std::uint32_t const ledgersPerShard_; + + // The default is XRP_LEDGER_EARLIEST_SEQ (32570) to match the XRP ledger + // network's earliest allowed ledger sequence. Can be set through the + // configuration file using the 'earliest_seq' field under the 'node_db' + // and 'shard_db' stanzas. If specified, the value must be greater than zero + // and equally assigned in both stanzas. Only unit tests or alternate + // networks should change this value. + std::uint32_t const earliestLedgerSeq_; + + // The earliest shard index + std::uint32_t const earliestShardIndex_; + void stopReadThreads(); @@ -293,10 +377,6 @@ class Database : public Stoppable std::vector readThreads_; bool readShut_{false}; - // The default is 32570 to match the XRP ledger network's earliest - // allowed sequence. Alternate networks may set this value. - std::uint32_t const earliestLedgerSeq_; - virtual std::shared_ptr fetchNodeObject( uint256 const& hash, diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 826b09afdbe..8d2fab54b6a 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -21,8 +21,8 @@ #define RIPPLE_NODESTORE_DATABASESHARD_H_INCLUDED #include -#include #include +#include #include #include @@ -60,7 +60,7 @@ class DatabaseShard : public Database @return `true` if the database initialized without error */ - virtual bool + [[nodiscard]] virtual bool init() = 0; /** Prepare to store a new ledger in the shard being acquired @@ -75,7 +75,7 @@ class DatabaseShard : public Database between requests. @implNote adds a new writable shard if necessary */ - virtual std::optional + [[nodiscard]] virtual std::optional prepareLedger(std::uint32_t validLedgerSeq) = 0; /** Prepare one or more shard indexes to be imported into the database @@ -83,7 +83,7 @@ class DatabaseShard : public Database @param shardIndexes Shard indexes to be prepared for import @return true if all shard indexes successfully prepared for import */ - virtual bool + [[nodiscard]] virtual bool prepareShards(std::vector const& shardIndexes) = 0; /** Remove a previously prepared shard index for import @@ -97,7 +97,7 @@ class DatabaseShard : public Database @return a string representing the shards prepared for import */ - virtual std::string + [[nodiscard]] virtual std::string getPreShards() = 0; /** Import a shard into the shard database @@ -107,7 +107,7 @@ class DatabaseShard : public Database @return true If the shard was successfully imported @implNote if successful, srcDir is moved to the database directory */ - virtual bool + [[nodiscard]] virtual bool importShard( std::uint32_t shardIndex, boost::filesystem::path const& srcDir) = 0; @@ -118,7 +118,7 @@ class DatabaseShard : public Database @param seq The sequence of the ledger @return The ledger if found, nullptr otherwise */ - virtual std::shared_ptr + [[nodiscard]] virtual std::shared_ptr fetchLedger(uint256 const& hash, std::uint32_t seq) = 0; /** Notifies the database that the given ledger has been @@ -129,64 +129,24 @@ class DatabaseShard : public Database virtual void setStored(std::shared_ptr const& ledger) = 0; - /** Query which complete shards are stored + /** Query information about shards held - @return the indexes of complete shards + @return Information about shards held by this node */ - virtual std::string - getCompleteShards() = 0; - - /** @return The maximum number of ledgers stored in a shard - */ - virtual std::uint32_t - ledgersPerShard() const = 0; - - /** @return The earliest shard index - */ - virtual std::uint32_t - earliestShardIndex() const = 0; - - /** Calculates the shard index for a given ledger sequence - - @param seq ledger sequence - @return The shard index of the ledger sequence - */ - virtual std::uint32_t - seqToShardIndex(std::uint32_t seq) const = 0; - - /** Calculates the first ledger sequence for a given shard index - - @param shardIndex The shard index considered - @return The first ledger sequence pertaining to the shard index - */ - virtual std::uint32_t - firstLedgerSeq(std::uint32_t shardIndex) const = 0; - - /** Calculates the last ledger sequence for a given shard index - - @param shardIndex The shard index considered - @return The last ledger sequence pertaining to the shard index - */ - virtual std::uint32_t - lastLedgerSeq(std::uint32_t shardIndex) const = 0; + [[nodiscard]] virtual std::unique_ptr + getShardInfo() const = 0; /** Returns the root database directory */ - virtual boost::filesystem::path const& + [[nodiscard]] virtual boost::filesystem::path const& getRootDir() const = 0; - /** The number of ledgers in a shard */ - static constexpr std::uint32_t ledgersPerShardDefault{16384u}; + /** Returns the number of queued tasks + */ + [[nodiscard]] virtual size_t + getNumTasks() const = 0; }; -constexpr std::uint32_t -seqToShardIndex( - std::uint32_t ledgerSeq, - std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault) -{ - return (ledgerSeq - 1) / ledgersPerShard; -} - extern std::unique_ptr make_ShardStore( Application& app, diff --git a/src/ripple/nodestore/ShardInfo.h b/src/ripple/nodestore/ShardInfo.h new file mode 100644 index 00000000000..90400276f85 --- /dev/null +++ b/src/ripple/nodestore/ShardInfo.h @@ -0,0 +1,122 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED +#define RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED + +#include +#include +#include + +namespace ripple { +namespace NodeStore { + +/* Contains information on the status of shards for a node + */ +class ShardInfo +{ +private: + class Incomplete + { + public: + Incomplete() = delete; + Incomplete(ShardState state, std::uint32_t percentProgress) + : state_(state), percentProgress_(percentProgress) + { + } + + [[nodiscard]] ShardState + state() const noexcept + { + return state_; + } + + [[nodiscard]] std::uint32_t + percentProgress() const noexcept + { + return percentProgress_; + } + + private: + ShardState state_; + std::uint32_t percentProgress_; + }; + +public: + [[nodiscard]] NetClock::time_point const& + msgTimestamp() const + { + return msgTimestamp_; + } + + void + setMsgTimestamp(NetClock::time_point const& timestamp) + { + msgTimestamp_ = timestamp; + } + + [[nodiscard]] std::string + finalizedToString() const; + + [[nodiscard]] bool + setFinalizedFromString(std::string const& str) + { + return from_string(finalized_, str); + } + + [[nodiscard]] RangeSet const& + finalized() const + { + return finalized_; + } + + [[nodiscard]] std::string + incompleteToString() const; + + [[nodiscard]] std::map const& + incomplete() const + { + return incomplete_; + } + + // Returns true if successful or false because of a duplicate index + bool + update( + std::uint32_t shardIndex, + ShardState state, + std::uint32_t percentProgress); + + [[nodiscard]] protocol::TMPeerShardInfoV2 + makeMessage(Application& app); + +private: + // Finalized immutable shards + RangeSet finalized_; + + // Incomplete shards being acquired or finalized + std::map incomplete_; + + // Message creation time + NetClock::time_point msgTimestamp_; +}; + +} // namespace NodeStore +} // namespace ripple + +#endif diff --git a/src/ripple/nodestore/Types.h b/src/ripple/nodestore/Types.h index 2951ac80579..6d8583ed9d1 100644 --- a/src/ripple/nodestore/Types.h +++ b/src/ripple/nodestore/Types.h @@ -55,6 +55,16 @@ enum Status { using Batch = std::vector>; } // namespace NodeStore + +/** Shard states. */ +enum class ShardState : std::uint32_t { + acquire, // Acquiring ledgers + complete, // Backend is ledger complete, database is unverified + finalizing, // Verifying database + finalized, // Database verified, shard is immutable + queued // Queued to be finalized +}; + } // namespace ripple #endif diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 8c08b6cf2e8..fb051b05f92 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -39,9 +39,17 @@ Database::Database( : Stoppable(name, parent.getRoot()) , j_(journal) , scheduler_(scheduler) + , ledgersPerShard_(get( + config, + "ledgers_per_shard", + DEFAULT_LEDGERS_PER_SHARD)) , earliestLedgerSeq_( get(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) + , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_) { + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + Throw("Invalid ledgers_per_shard"); + if (earliestLedgerSeq_ < 1) Throw("Invalid earliest_seq"); @@ -74,6 +82,19 @@ Database::onChildrenStopped() stopped(); } +std::uint32_t +Database::maxLedgers(std::uint32_t shardIndex) const noexcept +{ + if (shardIndex > earliestShardIndex_) + return ledgersPerShard_; + + if (shardIndex == earliestShardIndex_) + return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1; + + assert(!"Invalid shard index"); + return 0; +} + void Database::stopReadThreads() { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index a56d9e1ce6d..731e84f02cb 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include @@ -56,7 +58,6 @@ DatabaseShardImp::DatabaseShardImp( , app_(app) , parent_(parent) , taskQueue_(std::make_unique(*this)) - , earliestShardIndex_(seqToShardIndex(earliestLedgerSeq())) , avgShardFileSz_(ledgersPerShard_ * kilobytes(192ull)) , openFinalLimit_( app.config().getValueFor(SizedItem::openFinalLimit, std::nullopt)) @@ -116,7 +117,7 @@ DatabaseShardImp::init() if (!app_.config().standalone() && !historicalPaths_.empty()) { // Check historical paths for duplicated file systems - if (!checkHistoricalPaths()) + if (!checkHistoricalPaths(lock)) return false; } @@ -146,12 +147,12 @@ DatabaseShardImp::init() // Ignore values below the earliest shard index auto const shardIndex{std::stoul(dirName)}; - if (shardIndex < earliestShardIndex()) + if (shardIndex < earliestShardIndex_) { JLOG(j_.debug()) << "shard " << shardIndex << " ignored, comes before earliest shard index " - << earliestShardIndex(); + << earliestShardIndex_; continue; } @@ -180,13 +181,13 @@ DatabaseShardImp::init() switch (shard->getState()) { - case Shard::final: + case ShardState::finalized: if (++openFinals > openFinalLimit_) shard->tryClose(); shards_.emplace(shardIndex, std::move(shard)); break; - case Shard::complete: + case ShardState::complete: finalizeShard( shards_.emplace(shardIndex, std::move(shard)) .first->second, @@ -194,7 +195,7 @@ DatabaseShardImp::init() std::nullopt); break; - case Shard::acquire: + case ShardState::acquire: if (acquireIndex_ != 0) { JLOG(j_.error()) @@ -221,12 +222,11 @@ DatabaseShardImp::init() return false; } - updateStatus(lock); setParent(parent_); init_ = true; } - setFileStats(); + updateFileStats(); return true; } @@ -294,7 +294,9 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) std::lock_guard lock(mutex_); shards_.emplace(*shardIndex, std::move(shard)); acquireIndex_ = *shardIndex; + updatePeers(lock); } + return ledgerSeq; } @@ -317,14 +319,15 @@ DatabaseShardImp::prepareShards(std::vector const& shardIndexes) boost::algorithm::join(indexesAsString, ", "); }; - std::string const prequel = shardIndex - ? "shard " + std::to_string(*shardIndex) - : multipleIndexPrequel(); - - JLOG(j.error()) << prequel << " " << msg; + JLOG(j.error()) << (shardIndex ? "shard " + std::to_string(*shardIndex) + : multipleIndexPrequel()) + << " " << msg; return false; }; + if (shardIndexes.empty()) + return fail("invalid shard indexes"); + std::lock_guard lock(mutex_); assert(init_); @@ -335,18 +338,18 @@ DatabaseShardImp::prepareShards(std::vector const& shardIndexes) for (auto const shardIndex : shardIndexes) { - if (shardIndex < earliestShardIndex()) + if (shardIndex < earliestShardIndex_) { return fail( "comes before earliest shard index " + - std::to_string(earliestShardIndex()), + std::to_string(earliestShardIndex_), shardIndex); } // If we are synced to the network, check if the shard index is // greater or equal to the current or validated shard index. auto seqCheck = [&](std::uint32_t ledgerSeq) { - if (ledgerSeq >= earliestLedgerSeq() && + if (ledgerSeq >= earliestLedgerSeq_ && shardIndex >= seqToShardIndex(ledgerSeq)) { return fail("invalid index", shardIndex); @@ -396,14 +399,9 @@ DatabaseShardImp::prepareShards(std::vector const& shardIndexes) } for (auto const shardIndex : shardIndexes) - { - auto const prepareSuccessful = - preparedIndexes_.emplace(shardIndex).second; - - (void)prepareSuccessful; - assert(prepareSuccessful); - } + preparedIndexes_.emplace(shardIndex); + updatePeers(lock); return true; } @@ -413,7 +411,8 @@ DatabaseShardImp::removePreShard(std::uint32_t shardIndex) std::lock_guard lock(mutex_); assert(init_); - preparedIndexes_.erase(shardIndex); + if (preparedIndexes_.erase(shardIndex)) + updatePeers(lock); } std::string @@ -431,7 +430,7 @@ DatabaseShardImp::getPreShards() if (rs.empty()) return {}; - return to_string(rs); + return ripple::to_string(rs); }; bool @@ -445,6 +444,7 @@ DatabaseShardImp::importShard( // Remove the failed import shard index so it can be retried preparedIndexes_.erase(shardIndex); + updatePeers(lock); return false; }; @@ -516,7 +516,8 @@ DatabaseShardImp::importShard( auto shard{std::make_unique( app_, *this, shardIndex, dstDir.parent_path(), j_)}; - if (!shard->init(scheduler_, *ctx_) || shard->getState() != Shard::complete) + if (!shard->init(scheduler_, *ctx_) || + shard->getState() != ShardState::complete) { shard.reset(); renameDir(dstDir, srcDir); @@ -559,9 +560,9 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t ledgerSeq) // Ledger must be stored in a final or acquiring shard switch (shard->getState()) { - case Shard::final: + case ShardState::finalized: break; - case Shard::acquire: + case ShardState::acquire: if (shard->containsLedger(ledgerSeq)) break; [[fallthrough]]; @@ -680,13 +681,11 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) setStoredInShard(shard, ledger); } -std::string -DatabaseShardImp::getCompleteShards() +std::unique_ptr +DatabaseShardImp::getShardInfo() const { std::lock_guard lock(mutex_); - assert(init_); - - return status_; + return getShardInfo(lock); } void @@ -716,12 +715,12 @@ DatabaseShardImp::onChildrenStopped() } // All shards should be expired at this point - for (auto const& e : shards) + for (auto const& weak : shards) { - if (!e.expired()) + if (!weak.expired()) { std::string shardIndex; - if (auto const shard{e.lock()}; shard) + if (auto const shard{weak.lock()}; shard) shardIndex = std::to_string(shard->index()); JLOG(j_.warn()) << " shard " << shardIndex << " unexpired"; @@ -754,8 +753,7 @@ DatabaseShardImp::import(Database& source) std::shared_ptr ledger; std::uint32_t ledgerSeq; std::tie(ledger, ledgerSeq, std::ignore) = loadLedgerHelper( - "WHERE LedgerSeq >= " + - std::to_string(earliestLedgerSeq()) + + "WHERE LedgerSeq >= " + std::to_string(earliestLedgerSeq_) + " order by LedgerSeq " + (ascendSort ? "asc" : "desc") + " limit 1", app_, @@ -839,13 +837,9 @@ DatabaseShardImp::import(Database& source) // Verify SQLite ledgers are in the node store { auto const firstSeq{firstLedgerSeq(shardIndex)}; - auto const lastSeq{ - std::max(firstSeq, lastLedgerSeq(shardIndex))}; - auto const numLedgers{ - shardIndex == earliestShardIndex() ? lastSeq - firstSeq + 1 - : ledgersPerShard_}; + auto const lastSeq{lastLedgerSeq(shardIndex)}; auto ledgerHashes{getHashesByIndex(firstSeq, lastSeq, app_)}; - if (ledgerHashes.size() != numLedgers) + if (ledgerHashes.size() != maxLedgers(shardIndex)) continue; bool valid{true}; @@ -913,7 +907,7 @@ DatabaseShardImp::import(Database& source) using namespace boost::filesystem; bool success{false}; - if (lastLedgerHash && shard->getState() == Shard::complete) + if (lastLedgerHash && shard->getState() == ShardState::complete) { // Store shard final key Serializer s; @@ -960,11 +954,9 @@ DatabaseShardImp::import(Database& source) shard->removeOnDestroy(); } } - - updateStatus(lock); } - setFileStats(); + updateFileStats(); } std::int32_t @@ -1069,13 +1061,11 @@ DatabaseShardImp::sweep() std::vector> openFinals; openFinals.reserve(openFinalLimit_); - for (auto const& e : shards) + for (auto const& weak : shards) { - if (auto const shard{e.lock()}; shard && shard->isOpen()) + if (auto const shard{weak.lock()}; shard && shard->isOpen()) { - shard->sweep(); - - if (shard->getState() == Shard::final) + if (shard->getState() == ShardState::finalized) openFinals.emplace_back(std::move(shard)); } } @@ -1118,30 +1108,30 @@ DatabaseShardImp::initConfig(std::lock_guard const&) Config const& config{app_.config()}; Section const& section{config.section(ConfigSection::shardDatabase())}; - { - // The earliest ledger sequence defaults to XRP_LEDGER_EARLIEST_SEQ. - // A custom earliest ledger sequence can be set through the - // configuration file using the 'earliest_seq' field under the - // 'node_db' and 'shard_db' stanzas. If specified, this field must - // have a value greater than zero and be equally assigned in - // both stanzas. - - std::uint32_t shardDBEarliestSeq{0}; - get_if_exists( - section, "earliest_seq", shardDBEarliestSeq); + auto compare = [&](std::string const& name, std::uint32_t defaultValue) { + std::uint32_t shardDBValue{defaultValue}; + get_if_exists(section, name, shardDBValue); - std::uint32_t nodeDBEarliestSeq{0}; + std::uint32_t nodeDBValue{defaultValue}; get_if_exists( - config.section(ConfigSection::nodeDatabase()), - "earliest_seq", - nodeDBEarliestSeq); + config.section(ConfigSection::nodeDatabase()), name, nodeDBValue); - if (shardDBEarliestSeq != nodeDBEarliestSeq) - { - return fail( - "and [" + ConfigSection::nodeDatabase() + - "] define different 'earliest_seq' values"); - } + return shardDBValue == nodeDBValue; + }; + + // If ledgers_per_shard or earliest_seq are specified, + // they must be equally assigned in 'node_db' + if (!compare("ledgers_per_shard", DEFAULT_LEDGERS_PER_SHARD)) + { + return fail( + "and [" + ConfigSection::nodeDatabase() + "] define different '" + + "ledgers_per_shard" + "' values"); + } + if (!compare("earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) + { + return fail( + "and [" + ConfigSection::nodeDatabase() + "] define different '" + + "earliest_seq" + "' values"); } using namespace boost::filesystem; @@ -1173,20 +1163,6 @@ DatabaseShardImp::initConfig(std::lock_guard const&) } } - if (section.exists("ledgers_per_shard")) - { - // To be set only in standalone for testing - if (!config.standalone()) - return fail("'ledgers_per_shard' only honored in stand alone"); - - ledgersPerShard_ = get(section, "ledgers_per_shard"); - if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) - return fail("'ledgers_per_shard' must be a multiple of 256"); - - earliestShardIndex_ = seqToShardIndex(earliestLedgerSeq()); - avgShardFileSz_ = ledgersPerShard_ * kilobytes(192); - } - // NuDB is the default and only supported permanent storage backend backendName_ = get(section, "type", "nudb"); if (!boost::iequals(backendName_, "NuDB")) @@ -1219,7 +1195,7 @@ DatabaseShardImp::findAcquireIndex( std::uint32_t validLedgerSeq, std::lock_guard const&) { - if (validLedgerSeq < earliestLedgerSeq()) + if (validLedgerSeq < earliestLedgerSeq_) return std::nullopt; auto const maxShardIndex{[this, validLedgerSeq]() { @@ -1228,7 +1204,7 @@ DatabaseShardImp::findAcquireIndex( --shardIndex; return shardIndex; }()}; - auto const maxNumShards{maxShardIndex - earliestShardIndex() + 1}; + auto const maxNumShards{maxShardIndex - earliestShardIndex_ + 1}; // Check if the shard store has all shards if (shards_.size() >= maxNumShards) @@ -1242,8 +1218,7 @@ DatabaseShardImp::findAcquireIndex( std::vector available; available.reserve(maxNumShards - shards_.size()); - for (auto shardIndex = earliestShardIndex(); - shardIndex <= maxShardIndex; + for (auto shardIndex = earliestShardIndex_; shardIndex <= maxShardIndex; ++shardIndex) { if (shards_.find(shardIndex) == shards_.end() && @@ -1268,7 +1243,7 @@ DatabaseShardImp::findAcquireIndex( // chances of running more than 30 times is less than 1 in a billion for (int i = 0; i < 40; ++i) { - auto const shardIndex{rand_int(earliestShardIndex(), maxShardIndex)}; + auto const shardIndex{rand_int(earliestShardIndex_, maxShardIndex)}; if (shards_.find(shardIndex) == shards_.end() && preparedIndexes_.find(shardIndex) == preparedIndexes_.end()) { @@ -1315,9 +1290,7 @@ DatabaseShardImp::finalizeShard( { auto const boundaryIndex{shardBoundaryIndex()}; - std::lock_guard lock(mutex_); - updateStatus(lock); if (shard->index() < boundaryIndex) { @@ -1330,19 +1303,17 @@ DatabaseShardImp::finalizeShard( << " is not stored at a historical path"; } } - else { // Not a historical shard. Shift recent shards if necessary - relocateOutdatedShards(lock); assert(!boundaryIndex || shard->index() - boundaryIndex <= 1); - - auto& recentShard = shard->index() == boundaryIndex - ? secondLatestShardIndex_ - : latestShardIndex_; + relocateOutdatedShards(lock); // Set the appropriate recent shard index - recentShard = shard->index(); + if (shard->index() == boundaryIndex) + secondLatestShardIndex_ = shard->index(); + else + latestShardIndex_ = shard->index(); if (shard->getDir().parent_path() != dir_) { @@ -1350,26 +1321,16 @@ DatabaseShardImp::finalizeShard( << " is not stored at the path"; } } - } - setFileStats(); - - // Update peers with new shard index - if (!app_.config().standalone() && - app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED) - { - protocol::TMPeerShardInfo message; - PublicKey const& publicKey{app_.nodeIdentity().first}; - message.set_nodepubkey(publicKey.data(), publicKey.size()); - message.set_shardindexes(std::to_string(shard->index())); - app_.overlay().foreach(send_always(std::make_shared( - message, protocol::mtPEER_SHARD_INFO))); + updatePeers(lock); } + + updateFileStats(); }); } void -DatabaseShardImp::setFileStats() +DatabaseShardImp::updateFileStats() { std::vector> shards; { @@ -1385,9 +1346,9 @@ DatabaseShardImp::setFileStats() std::uint64_t sumSz{0}; std::uint32_t sumFd{0}; std::uint32_t numShards{0}; - for (auto const& e : shards) + for (auto const& weak : shards) { - if (auto const shard{e.lock()}; shard) + if (auto const shard{weak.lock()}; shard) { auto const [sz, fd] = shard->getFileInfo(); sumSz += sz; @@ -1429,21 +1390,6 @@ DatabaseShardImp::setFileStats() } } -void -DatabaseShardImp::updateStatus(std::lock_guard const&) -{ - if (!shards_.empty()) - { - RangeSet rs; - for (auto const& e : shards_) - if (e.second->getState() == Shard::final) - rs.insert(e.second->index()); - status_ = to_string(rs); - } - else - status_.clear(); -} - bool DatabaseShardImp::sufficientStorage( std::uint32_t numShards, @@ -1511,7 +1457,7 @@ DatabaseShardImp::setStoredInShard( return false; } - if (shard->getState() == Shard::complete) + if (shard->getState() == ShardState::complete) { std::lock_guard lock(mutex_); if (auto const it{shards_.find(shard->index())}; it != shards_.end()) @@ -1528,7 +1474,7 @@ DatabaseShardImp::setStoredInShard( } } - setFileStats(); + updateFileStats(); return true; } @@ -1546,12 +1492,6 @@ DatabaseShardImp::removeFailedShard(std::shared_ptr& shard) if (shard->index() == secondLatestShardIndex_) secondLatestShardIndex_ = std::nullopt; - - if ((shards_.erase(shard->index()) > 0) && - shard->getState() == Shard::final) - { - updateStatus(lock); - } } shard->removeOnDestroy(); @@ -1559,7 +1499,7 @@ DatabaseShardImp::removeFailedShard(std::shared_ptr& shard) // Reset the shared_ptr to invoke the shard's // destructor and remove it from the server shard.reset(); - setFileStats(); + updateFileStats(); } std::uint32_t @@ -1567,7 +1507,7 @@ DatabaseShardImp::shardBoundaryIndex() const { auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex(); - if (validIndex < earliestLedgerSeq()) + if (validIndex < earliestLedgerSeq_) return 0; // Shards with an index earlier than the recent shard boundary index @@ -1593,151 +1533,135 @@ void DatabaseShardImp::relocateOutdatedShards( std::lock_guard const& lock) { - if (auto& cur = latestShardIndex_, &prev = secondLatestShardIndex_; - cur || prev) - { - auto const latestShardIndex = - seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex()); + auto& cur{latestShardIndex_}; + auto& prev{secondLatestShardIndex_}; + if (!cur && !prev) + return; - auto const separateHistoricalPath = !historicalPaths_.empty(); + auto const latestShardIndex = + seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex()); + auto const separateHistoricalPath = !historicalPaths_.empty(); - auto const removeShard = - [this](std::uint32_t const shardIndex) -> void { - canAdd_ = false; + auto const removeShard = [this](std::uint32_t const shardIndex) -> void { + canAdd_ = false; - if (auto it = shards_.find(shardIndex); it != shards_.end()) - { - if (it->second) - removeFailedShard(it->second); - else - { - JLOG(j_.warn()) << "can't find shard to remove"; - } - } + if (auto it = shards_.find(shardIndex); it != shards_.end()) + { + if (it->second) + removeFailedShard(it->second); else { JLOG(j_.warn()) << "can't find shard to remove"; } - }; + } + else + { + JLOG(j_.warn()) << "can't find shard to remove"; + } + }; - auto const keepShard = - [this, &lock, removeShard, separateHistoricalPath]( - std::uint32_t const shardIndex) -> bool { - if (numHistoricalShards(lock) >= maxHistoricalShards_) - { - JLOG(j_.error()) - << "maximum number of historical shards reached"; + auto const keepShard = [this, &lock, removeShard, separateHistoricalPath]( + std::uint32_t const shardIndex) -> bool { + if (numHistoricalShards(lock) >= maxHistoricalShards_) + { + JLOG(j_.error()) << "maximum number of historical shards reached"; + removeShard(shardIndex); + return false; + } + if (separateHistoricalPath && + !sufficientStorage(1, PathDesignation::historical, lock)) + { + JLOG(j_.error()) << "insufficient storage space available"; + removeShard(shardIndex); + return false; + } - removeShard(shardIndex); - return false; - } - if (separateHistoricalPath && - !sufficientStorage(1, PathDesignation::historical, lock)) - { - JLOG(j_.error()) << "insufficient storage space available"; + return true; + }; - removeShard(shardIndex); - return false; - } + // Move a shard from the main shard path to a historical shard + // path by copying the contents, and creating a new shard. + auto const moveShard = [this, + &lock](std::uint32_t const shardIndex) -> void { + auto it{shards_.find(shardIndex)}; + if (it == shards_.end()) + { + JLOG(j_.warn()) << "can't find shard to move to historical path"; + return; + } - return true; - }; + auto& shard{it->second}; - // Move a shard from the main shard path to a historical shard - // path by copying the contents, and creating a new shard. - auto const moveShard = [this, - &lock](std::uint32_t const shardIndex) -> void { - auto const dst = chooseHistoricalPath(lock); + // Close any open file descriptors before moving the shard + // directory. Don't call removeOnDestroy since that would + // attempt to close the fds after the directory has been moved. + if (!shard->tryClose()) + { + JLOG(j_.warn()) << "can't close shard to move to historical path"; + return; + } - if (auto it = shards_.find(shardIndex); it != shards_.end()) - { - auto& shard{it->second}; + auto const dst{chooseHistoricalPath(lock)}; + try + { + // Move the shard directory to the new path + boost::filesystem::rename( + shard->getDir().string(), dst / std::to_string(shardIndex)); + } + catch (...) + { + JLOG(j_.error()) << "shard " << shardIndex + << " failed to move to historical storage"; + return; + } - // Close any open file descriptors before moving the shard - // directory. Don't call removeOnDestroy since that would - // attempt to close the fds after the directory has been moved. - if (!shard->tryClose()) - { - JLOG(j_.warn()) - << "can't close shard to move to historical path"; - return; - } + // Create a shard instance at the new location + shard = std::make_shared(app_, *this, shardIndex, dst, j_); - try - { - // Move the shard directory to the new path - boost::filesystem::rename( - shard->getDir().string(), - dst / std::to_string(shardIndex)); - } - catch (...) - { - JLOG(j_.error()) << "shard " << shardIndex - << " failed to move to historical storage"; - return; - } + // Open the new shard + if (!shard->init(scheduler_, *ctx_)) + { + JLOG(j_.error()) << "shard " << shardIndex + << " failed to open in historical storage"; + shard->removeOnDestroy(); + shard.reset(); + } + }; - // Create a shard instance at the new location - shard = - std::make_shared(app_, *this, shardIndex, dst, j_); + // See if either of the recent shards needs to be updated + bool const curNotSynched = + latestShardIndex_ && *latestShardIndex_ != latestShardIndex; + bool const prevNotSynched = secondLatestShardIndex_ && + *secondLatestShardIndex_ != latestShardIndex - 1; - // Open the new shard - if (!shard->init(scheduler_, *ctx_)) - { - JLOG(j_.error()) << "shard " << shardIndex - << " failed to open in historical storage"; - shard->removeOnDestroy(); - shard.reset(); - } - } - else - { - JLOG(j_.warn()) - << "can't find shard to move to historical path"; - } - }; + // A new shard has been published. Move outdated + // shards to historical storage as needed + if (curNotSynched || prevNotSynched) + { + if (prev) + { + // Move the formerly second latest shard to historical storage + if (keepShard(*prev) && separateHistoricalPath) + moveShard(*prev); - // See if either of the recent shards needs to be updated - bool const curNotSynched = - latestShardIndex_ && *latestShardIndex_ != latestShardIndex; - bool const prevNotSynched = secondLatestShardIndex_ && - *secondLatestShardIndex_ != latestShardIndex - 1; + prev = std::nullopt; + } - // A new shard has been published. Move outdated - // shards to historical storage as needed - if (curNotSynched || prevNotSynched) + if (cur) { - if (prev) - { - // Move the formerly second latest shard to historical storage - if (keepShard(*prev) && separateHistoricalPath) - { - moveShard(*prev); - } + // The formerly latest shard is now the second latest + if (cur == latestShardIndex - 1) + prev = cur; - prev = std::nullopt; - } - - if (cur) + // The formerly latest shard is no longer a 'recent' shard + else { - // The formerly latest shard is now the second latest - if (cur == latestShardIndex - 1) - { - prev = cur; - } - - // The formerly latest shard is no longer a 'recent' shard - else - { - // Move the formerly latest shard to historical storage - if (keepShard(*cur) && separateHistoricalPath) - { - moveShard(*cur); - } - } - - cur = std::nullopt; + // Move the formerly latest shard to historical storage + if (keepShard(*cur) && separateHistoricalPath) + moveShard(*cur); } + + cur = std::nullopt; } } } @@ -1807,7 +1731,7 @@ DatabaseShardImp::chooseHistoricalPath(std::lock_guard const&) const } bool -DatabaseShardImp::checkHistoricalPaths() const +DatabaseShardImp::checkHistoricalPaths(std::lock_guard const&) const { #if BOOST_OS_LINUX // Each historical shard path must correspond @@ -1887,6 +1811,41 @@ DatabaseShardImp::checkHistoricalPaths() const return true; } +std::unique_ptr +DatabaseShardImp::getShardInfo(std::lock_guard const&) const +{ + auto shardInfo{std::make_unique()}; + for (auto const& [_, shard] : shards_) + { + shardInfo->update( + shard->index(), shard->getState(), shard->getPercentProgress()); + } + + for (auto const shardIndex : preparedIndexes_) + shardInfo->update(shardIndex, ShardState::queued, 0); + + return shardInfo; +} + +size_t +DatabaseShardImp::getNumTasks() const +{ + std::lock_guard lock(mutex_); + return taskQueue_->size(); +} + +void +DatabaseShardImp::updatePeers(std::lock_guard const& lock) const +{ + if (!app_.config().standalone() && + app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED) + { + auto const message{getShardInfo(lock)->makeMessage(app_)}; + app_.overlay().foreach(send_always(std::make_shared( + message, protocol::mtPEER_SHARD_INFO_V2))); + } +} + //------------------------------------------------------------------------------ std::unique_ptr diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 888fcfbe472..d2f49326b20 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -48,10 +48,10 @@ class DatabaseShardImp : public DatabaseShard int readThreads, beast::Journal j); - [[nodiscard]] bool + bool init() override; - [[nodiscard]] std::optional + std::optional prepareLedger(std::uint32_t validLedgerSeq) override; bool @@ -73,43 +73,11 @@ class DatabaseShardImp : public DatabaseShard void setStored(std::shared_ptr const& ledger) override; - std::string - getCompleteShards() override; - - std::uint32_t - ledgersPerShard() const override - { - return ledgersPerShard_; - } - - std::uint32_t - earliestShardIndex() const override - { - return earliestShardIndex_; - } + std::unique_ptr + getShardInfo() const override; - std::uint32_t - seqToShardIndex(std::uint32_t ledgerSeq) const override - { - assert(ledgerSeq >= earliestLedgerSeq()); - return NodeStore::seqToShardIndex(ledgerSeq, ledgersPerShard_); - } - - std::uint32_t - firstLedgerSeq(std::uint32_t shardIndex) const override - { - assert(shardIndex >= earliestShardIndex_); - if (shardIndex <= earliestShardIndex_) - return earliestLedgerSeq(); - return 1 + (shardIndex * ledgersPerShard_); - } - - std::uint32_t - lastLedgerSeq(std::uint32_t shardIndex) const override - { - assert(shardIndex >= earliestShardIndex_); - return (shardIndex + 1) * ledgersPerShard_; - } + size_t + getNumTasks() const override; boost::filesystem::path const& getRootDir() const override @@ -193,9 +161,6 @@ class DatabaseShardImp : public DatabaseShard // If new shards can be stored bool canAdd_{true}; - // Complete shard indexes - std::string status_; - // The name associated with the backend used with the shard store std::string backendName_; @@ -208,14 +173,6 @@ class DatabaseShardImp : public DatabaseShard // Storage space utilized by the shard store (in bytes) std::uint64_t fileSz_{0}; - // Each shard stores 16384 ledgers. The earliest shard may store - // less if the earliest ledger sequence truncates its beginning. - // The value should only be altered for unit tests. - std::uint32_t ledgersPerShard_ = ledgersPerShardDefault; - - // The earliest shard index - std::uint32_t earliestShardIndex_; - // Average storage space required by a shard (in bytes) std::uint64_t avgShardFileSz_; @@ -267,16 +224,11 @@ class DatabaseShardImp : public DatabaseShard bool writeSQLite, std::optional const& expectedHash); - // Set storage and file descriptor usage stats - void - setFileStats(); - - // Update status string - // Lock must be held + // Update storage and file descriptor usage stats void - updateStatus(std::lock_guard const&); + updateFileStats(); - // Returns true if the filesystem has enough storage + // Returns true if the file system has enough storage // available to hold the specified number of shards. // The value of pathDesignation determines whether // the shard(s) in question are historical and thus @@ -326,7 +278,14 @@ class DatabaseShardImp : public DatabaseShard chooseHistoricalPath(std::lock_guard const&) const; bool - checkHistoricalPaths() const; + checkHistoricalPaths(std::lock_guard const&) const; + + std::unique_ptr + getShardInfo(std::lock_guard const&) const; + + // Update peers with the status of every complete and incomplete shard + void + updatePeers(std::lock_guard const& lock) const; }; } // namespace NodeStore diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index e9bce3a7ca2..97fa9153077 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -54,9 +54,7 @@ Shard::Shard( , index_(index) , firstSeq_(db.firstLedgerSeq(index)) , lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index))) - , maxLedgers_( - index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1 - : db.ledgersPerShard()) + , maxLedgers_(db.maxLedgers(index)) , dir_((dir.empty() ? db.getRootDir() : dir) / std::to_string(index_)) { } @@ -144,7 +142,7 @@ bool Shard::tryClose() { // Keep database open if being acquired or finalized - if (state_ != final) + if (state_ != ShardState::finalized) return false; std::lock_guard lock(mutex_); @@ -187,7 +185,7 @@ Shard::tryClose() std::optional Shard::prepare() { - if (state_ != acquire) + if (state_ != ShardState::acquire) { JLOG(j_.warn()) << "shard " << index_ << " prepare called when not acquiring"; @@ -210,7 +208,7 @@ Shard::prepare() bool Shard::storeNodeObject(std::shared_ptr const& nodeObject) { - if (state_ != acquire) + if (state_ != ShardState::acquire) { // The import node store case is an exception if (nodeObject->getHash() != finalKey) @@ -294,7 +292,7 @@ Shard::storeLedger( std::shared_ptr const& next) { StoreLedgerResult result; - if (state_ != acquire) + if (state_ != ShardState::acquire) { // Ignore residual calls from InboundLedgers JLOG(j_.trace()) << "shard " << index_ << ". Not acquiring"; @@ -416,20 +414,21 @@ Shard::storeLedger( bool Shard::setLedgerStored(std::shared_ptr const& ledger) { - if (state_ != acquire) + if (state_ != ShardState::acquire) { // Ignore residual calls from InboundLedgers JLOG(j_.trace()) << "shard " << index_ << " not acquiring"; return false; } + auto fail = [&](std::string const& msg) { + JLOG(j_.error()) << "shard " << index_ << ". " << msg; + return false; + }; + auto const ledgerSeq{ledger->info().seq}; if (ledgerSeq < firstSeq_ || ledgerSeq > lastSeq_) - { - JLOG(j_.error()) << "shard " << index_ << " invalid ledger sequence " - << ledgerSeq; - return false; - } + return fail("Invalid ledger sequence " + std::to_string(ledgerSeq)); auto const scopedCount{makeBackendCount()}; if (!scopedCount) @@ -442,11 +441,8 @@ Shard::setLedgerStored(std::shared_ptr const& ledger) { std::lock_guard lock(mutex_); if (!acquireInfo_) - { - JLOG(j_.error()) - << "shard " << index_ << " missing acquire SQLite database"; - return false; - } + return fail("Missing acquire SQLite database"); + if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq)) { // Ignore redundant calls @@ -457,7 +453,7 @@ Shard::setLedgerStored(std::shared_ptr const& ledger) } if (!storeSQLite(ledger)) - return false; + return fail("Failed to store ledger"); std::lock_guard lock(mutex_); @@ -489,15 +485,16 @@ Shard::setLedgerStored(std::shared_ptr const& ledger) } catch (std::exception const& e) { - JLOG(j_.fatal()) << "shard " << index_ - << ". Exception caught in function " << __func__ - << ". Error: " << e.what(); acquireInfo_->storedSeqs.erase(ledgerSeq); - return false; + return fail( + std::string(". Exception caught in function ") + __func__ + + ". Error: " + e.what()); } - if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_) - state_ = complete; + // Update progress + progress_ = boost::icl::length(acquireInfo_->storedSeqs); + if (progress_ == maxLedgers_) + state_ = ShardState::complete; setFileStats(lock); JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence " @@ -510,7 +507,7 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const { if (ledgerSeq < firstSeq_ || ledgerSeq > lastSeq_) return false; - if (state_ != acquire) + if (state_ != ShardState::acquire) return true; std::lock_guard lock(mutex_); @@ -523,12 +520,6 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq); } -void -Shard::sweep() -{ - // nothing to do -} - std::chrono::steady_clock::time_point Shard::getLastUse() const { @@ -566,8 +557,6 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) if (!scopedCount) return false; - state_ = finalizing; - uint256 hash{0}; std::uint32_t ledgerSeq{0}; auto fail = [&](std::string const& msg) { @@ -577,12 +566,17 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) << (ledgerSeq == 0 ? "" : ". Ledger sequence " + std::to_string(ledgerSeq)); - state_ = finalizing; + state_ = ShardState::finalizing; + progress_ = 0; + busy_ = false; return false; }; try { + state_ = ShardState::finalizing; + progress_ = 0; + /* TODO MP A lock is required when calling the NuDB verify function. Because @@ -738,6 +732,10 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) hash = ledger->info().parentHash; next = std::move(ledger); + + // Update progress + progress_ = maxLedgers_ - (ledgerSeq - firstSeq_); + --ledgerSeq; fullBelowCache->reset(); @@ -824,7 +822,9 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) // Re-open deterministic shard if (!open(lock)) - return false; + return fail("failed to open"); + + assert(state_ == ShardState::finalized); // Allow all other threads work with the shard busy_ = false; @@ -851,7 +851,8 @@ Shard::open(std::lock_guard const& lock) txSQLiteDB_.reset(); acquireInfo_.reset(); - state_ = acquire; + state_ = ShardState::acquire; + progress_ = 0; if (!preexist) remove_all(dir_); @@ -863,21 +864,22 @@ Shard::open(std::lock_guard const& lock) return false; }; auto createAcquireInfo = [this, &config]() { - acquireInfo_ = std::make_unique(); - DatabaseCon::Setup setup; setup.startUp = config.standalone() ? config.LOAD : config.START_UP; setup.standAlone = config.standalone(); setup.dataDir = dir_; setup.useGlobalPragma = true; + acquireInfo_ = std::make_unique(); acquireInfo_->SQLiteDB = std::make_unique( setup, AcquireShardDBName, AcquireShardDBPragma, AcquireShardDBInit, DatabaseCon::CheckpointerSetup{&app_.getJobQueue(), &app_.logs()}); - state_ = acquire; + + state_ = ShardState::acquire; + progress_ = 0; }; try @@ -919,8 +921,10 @@ Shard::open(std::lock_guard const& lock) if (blobPresent == soci::i_ok) { std::string s; + convert(sociBlob, s); + auto& storedSeqs{acquireInfo_->storedSeqs}; - if (convert(sociBlob, s); !from_string(storedSeqs, s)) + if (!from_string(storedSeqs, s)) return fail("invalid StoredLedgerSeqs"); if (boost::icl::first(storedSeqs) < firstSeq_ || @@ -930,14 +934,14 @@ Shard::open(std::lock_guard const& lock) } // Check if backend is complete - if (boost::icl::length(storedSeqs) == maxLedgers_) - state_ = complete; + progress_ = boost::icl::length(storedSeqs); + if (progress_ == maxLedgers_) + state_ = ShardState::complete; } } else { - // A shard that is final or its backend is complete - // and ready to be finalized + // A shard with a finalized or complete state std::shared_ptr nodeObject; if (backend_->fetch(finalKey.data(), &nodeObject) != Status::ok) { @@ -960,10 +964,12 @@ Shard::open(std::lock_guard const& lock) if (exists(dir_ / LgrDBName) && exists(dir_ / TxDBName)) { lastAccess_ = std::chrono::steady_clock::now(); - state_ = final; + state_ = ShardState::finalized; } else - state_ = complete; + state_ = ShardState::complete; + + progress_ = maxLedgers_; } } catch (std::exception const& e) @@ -989,7 +995,7 @@ Shard::initSQLite(std::lock_guard const&) setup.startUp = config.standalone() ? config.LOAD : config.START_UP; setup.standAlone = config.standalone(); setup.dataDir = dir_; - setup.useGlobalPragma = (state_ != complete); + setup.useGlobalPragma = (state_ != ShardState::complete); return setup; }(); @@ -1001,46 +1007,52 @@ Shard::initSQLite(std::lock_guard const&) if (txSQLiteDB_) txSQLiteDB_.reset(); - if (state_ == final) + switch (state_) { - lgrSQLiteDB_ = std::make_unique( - setup, LgrDBName, FinalShardDBPragma, LgrDBInit); - lgrSQLiteDB_->getSession() << boost::str( - boost::format("PRAGMA cache_size=-%d;") % - kilobytes( - config.getValueFor(SizedItem::lgrDBCache, std::nullopt))); - - txSQLiteDB_ = std::make_unique( - setup, TxDBName, FinalShardDBPragma, TxDBInit); - txSQLiteDB_->getSession() << boost::str( - boost::format("PRAGMA cache_size=-%d;") % - kilobytes( - config.getValueFor(SizedItem::txnDBCache, std::nullopt))); - } - else - { - // Non final shards use a Write Ahead Log for performance - lgrSQLiteDB_ = std::make_unique( - setup, - LgrDBName, - LgrDBPragma, - LgrDBInit, - DatabaseCon::CheckpointerSetup{ - &app_.getJobQueue(), &app_.logs()}); - lgrSQLiteDB_->getSession() << boost::str( - boost::format("PRAGMA cache_size=-%d;") % - kilobytes(config.getValueFor(SizedItem::lgrDBCache))); - - txSQLiteDB_ = std::make_unique( - setup, - TxDBName, - TxDBPragma, - TxDBInit, - DatabaseCon::CheckpointerSetup{ - &app_.getJobQueue(), &app_.logs()}); - txSQLiteDB_->getSession() << boost::str( - boost::format("PRAGMA cache_size=-%d;") % - kilobytes(config.getValueFor(SizedItem::txnDBCache))); + case ShardState::complete: + case ShardState::finalizing: + case ShardState::finalized: + lgrSQLiteDB_ = std::make_unique( + setup, LgrDBName, FinalShardDBPragma, LgrDBInit); + lgrSQLiteDB_->getSession() << boost::str( + boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getValueFor( + SizedItem::lgrDBCache, std::nullopt))); + + txSQLiteDB_ = std::make_unique( + setup, TxDBName, FinalShardDBPragma, TxDBInit); + txSQLiteDB_->getSession() << boost::str( + boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getValueFor( + SizedItem::txnDBCache, std::nullopt))); + break; + + // case ShardState::acquire: + // case ShardState::queued: + default: + // Incomplete shards use a Write Ahead Log for performance + lgrSQLiteDB_ = std::make_unique( + setup, + LgrDBName, + LgrDBPragma, + LgrDBInit, + DatabaseCon::CheckpointerSetup{ + &app_.getJobQueue(), &app_.logs()}); + lgrSQLiteDB_->getSession() << boost::str( + boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getValueFor(SizedItem::lgrDBCache))); + + txSQLiteDB_ = std::make_unique( + setup, + TxDBName, + TxDBPragma, + TxDBInit, + DatabaseCon::CheckpointerSetup{ + &app_.getJobQueue(), &app_.logs()}); + txSQLiteDB_->getSession() << boost::str( + boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getValueFor(SizedItem::txnDBCache))); + break; } } catch (std::exception const& e) @@ -1360,7 +1372,7 @@ Shard::makeBackendCount() if (!open(lock)) return {nullptr}; } - else if (state_ == final) + else if (state_ == ShardState::finalized) lastAccess_ = std::chrono::steady_clock::now(); return Shard::Count(&backendCount_); diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 574e912b499..7044e9f01b8 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -51,17 +52,19 @@ class DatabaseShard; class Shard final { public: - enum class State { - acquire, // Being acquired - complete, // Backend contains all ledgers but is not yet final - finalizing, // Being finalized - final // Database verified, shard is immutable - }; + /// Copy constructor (disallowed) + Shard(Shard const&) = delete; + + /// Move constructor (disallowed) + Shard(Shard&&) = delete; - static constexpr State acquire = State::acquire; - static constexpr State complete = State::complete; - static constexpr State finalizing = State::finalizing; - static constexpr State final = State::final; + // Copy assignment (disallowed) + Shard& + operator=(Shard const&) = delete; + + // Move assignment (disallowed) + Shard& + operator=(Shard&&) = delete; Shard( Application& app, @@ -101,7 +104,7 @@ class Shard final /** Notify shard to prepare for shutdown. */ void - stop() + stop() noexcept { stop_ = true; } @@ -139,17 +142,14 @@ class Shard final [[nodiscard]] bool containsLedger(std::uint32_t ledgerSeq) const; - void - sweep(); - [[nodiscard]] std::uint32_t - index() const + index() const noexcept { return index_; } [[nodiscard]] boost::filesystem::path const& - getDir() const + getDir() const noexcept { return dir_; } @@ -163,12 +163,21 @@ class Shard final [[nodiscard]] std::pair getFileInfo() const; - [[nodiscard]] State - getState() const + [[nodiscard]] ShardState + getState() const noexcept { return state_; } + /** Returns a percent signifying how complete + the current state of the shard is. + */ + [[nodiscard]] std::uint32_t + getPercentProgress() const noexcept + { + return calculatePercent(progress_, maxLedgers_); + } + [[nodiscard]] std::int32_t getWriteLoad(); @@ -191,7 +200,7 @@ class Shard final /** Enables removal of the shard directory on destruction. */ void - removeOnDestroy() + removeOnDestroy() noexcept { removeOnDestroy_ = true; } @@ -210,28 +219,28 @@ class Shard final public: Count(Count const&) = delete; Count& - operator=(Count&&) = delete; - Count& operator=(Count const&) = delete; + Count& + operator=(Count&&) = delete; - Count(Count&& other) : counter_(other.counter_) + Count(Count&& other) noexcept : counter_(other.counter_) { other.counter_ = nullptr; } - Count(std::atomic* counter) : counter_(counter) + Count(std::atomic* counter) noexcept : counter_(counter) { if (counter_) ++(*counter_); } - ~Count() + ~Count() noexcept { if (counter_) --(*counter_); } - operator bool() const + operator bool() const noexcept { return counter_ != nullptr; } @@ -288,7 +297,7 @@ class Shard final std::unique_ptr txSQLiteDB_; // Tracking information used only when acquiring a shard from the network. - // If the shard is final, this member will be null. + // If the shard is finalized, this member will be null. std::unique_ptr acquireInfo_; // Older shard without an acquire database or final key @@ -301,12 +310,16 @@ class Shard final // Determines if the shard busy with replacing by deterministic one std::atomic busy_{false}; - std::atomic state_{State::acquire}; + // State of the shard + std::atomic state_{ShardState::acquire}; + + // Number of ledgers processed for the current shard state + std::atomic progress_{0}; // Determines if the shard directory should be removed in the destructor std::atomic removeOnDestroy_{false}; - // The time of the last access of a shard that has a final state + // The time of the last access of a shard with a finalized state std::chrono::steady_clock::time_point lastAccess_; // Open shard databases diff --git a/src/ripple/nodestore/impl/ShardInfo.cpp b/src/ripple/nodestore/impl/ShardInfo.cpp new file mode 100644 index 00000000000..ee3e5f194f5 --- /dev/null +++ b/src/ripple/nodestore/impl/ShardInfo.cpp @@ -0,0 +1,138 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include + +#include + +namespace ripple { +namespace NodeStore { + +std::string +ShardInfo::finalizedToString() const +{ + if (!finalized_.empty()) + return ripple::to_string(finalized_); + return {}; +} + +std::string +ShardInfo::incompleteToString() const +{ + std::string result; + if (!incomplete_.empty()) + { + for (auto const& [shardIndex, incomplete] : incomplete_) + { + result += std::to_string(shardIndex) + ":" + + std::to_string(incomplete.percentProgress()) + ","; + } + result.pop_back(); + } + + return result; +} + +bool +ShardInfo::update( + std::uint32_t shardIndex, + ShardState state, + std::uint32_t percentProgress) +{ + if (state == ShardState::finalized) + { + if (boost::icl::contains(finalized_, shardIndex)) + return false; + + finalized_.insert(shardIndex); + return true; + } + + return incomplete_.emplace(shardIndex, Incomplete(state, percentProgress)) + .second; +} + +protocol::TMPeerShardInfoV2 +ShardInfo::makeMessage(Application& app) +{ + protocol::TMPeerShardInfoV2 message; + Serializer s; + s.add32(HashPrefix::shardInfo); + + // Set the message creation time + msgTimestamp_ = app.timeKeeper().now(); + { + auto const timestamp{msgTimestamp_.time_since_epoch().count()}; + message.set_timestamp(timestamp); + s.add32(timestamp); + } + + if (!incomplete_.empty()) + { + message.mutable_incomplete()->Reserve(incomplete_.size()); + for (auto const& [shardIndex, incomplete] : incomplete_) + { + auto tmIncomplete{message.add_incomplete()}; + + tmIncomplete->set_shardindex(shardIndex); + s.add32(shardIndex); + + static_assert(std::is_same_v< + std::underlying_type_t, + std::uint32_t>); + auto const state{static_cast(incomplete.state())}; + tmIncomplete->set_state(state); + s.add32(state); + + // Set progress if greater than zero + auto const percentProgress{incomplete.percentProgress()}; + if (percentProgress > 0) + { + tmIncomplete->set_progress(percentProgress); + s.add32(percentProgress); + } + } + } + + if (!finalized_.empty()) + { + auto const str{ripple::to_string(finalized_)}; + message.set_finalized(str); + s.addRaw(str.data(), str.size()); + } + + // Set the public key + auto const& publicKey{app.nodeIdentity().first}; + message.set_publickey(publicKey.data(), publicKey.size()); + + // Create a digital signature using the node private key + auto const signature{sign(publicKey, app.nodeIdentity().second, s.slice())}; + + // Set the digital signature + message.set_signature(signature.data(), signature.size()); + + return message; +} + +} // namespace NodeStore +} // namespace ripple diff --git a/src/ripple/nodestore/impl/TaskQueue.cpp b/src/ripple/nodestore/impl/TaskQueue.cpp index 000b664b0b7..ed03fac5b81 100644 --- a/src/ripple/nodestore/impl/TaskQueue.cpp +++ b/src/ripple/nodestore/impl/TaskQueue.cpp @@ -46,6 +46,13 @@ TaskQueue::addTask(std::function task) workers_.addTask(); } +size_t +TaskQueue::size() const +{ + std::lock_guard lock{mutex_}; + return tasks_.size() + processing_; +} + void TaskQueue::processTask(int instance) { @@ -53,13 +60,18 @@ TaskQueue::processTask(int instance) { std::lock_guard lock{mutex_}; - assert(!tasks_.empty()); + assert(!tasks_.empty()); task = std::move(tasks_.front()); tasks_.pop(); + + ++processing_; } task(); + + std::lock_guard lock{mutex_}; + --processing_; } } // namespace NodeStore diff --git a/src/ripple/nodestore/impl/TaskQueue.h b/src/ripple/nodestore/impl/TaskQueue.h index 9f2121a4506..8b062d7a6ac 100644 --- a/src/ripple/nodestore/impl/TaskQueue.h +++ b/src/ripple/nodestore/impl/TaskQueue.h @@ -44,10 +44,16 @@ class TaskQueue : public Stoppable, private Workers::Callback void addTask(std::function task); + /** Return the queue size + */ + [[nodiscard]] size_t + size() const; + private: - std::mutex mutex_; + mutable std::mutex mutex_; Workers workers_; std::queue> tasks_; + std::uint64_t processing_{0}; void processTask(int instance) override; diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index b8675f02ac6..59a31bd3bc2 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -201,11 +201,12 @@ class Overlay : public Stoppable, public beast::PropertyStream::Source /** Returns information reported to the crawl shard RPC command. + @param includePublicKey include peer public keys in the result. @param hops the maximum jumps the crawler will attempt. The number of hops achieved is not guaranteed. */ virtual Json::Value - crawlShards(bool pubKey, std::uint32_t hops) = 0; + crawlShards(bool includePublicKey, std::uint32_t hops) = 0; /** Returns the ID of the network this server is configured for, if any. diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index 6b2da5a4855..49dc2fa096d 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -32,8 +32,8 @@ namespace Resource { class Charge; } -// Maximum hops to attempt when crawling shards. cs = crawl shards -static constexpr std::uint32_t csHopLimit = 3; +// Maximum hops to relay the peer shard info request +static constexpr std::uint32_t relayLimit = 3; enum class ProtocolFeature { ValidatorListPropagation, @@ -113,8 +113,6 @@ class Peer virtual void ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const = 0; virtual bool - hasShard(std::uint32_t shardIndex) const = 0; - virtual bool hasTxSet(uint256 const& hash) const = 0; virtual void cycleStatus() = 0; diff --git a/src/ripple/overlay/impl/Message.cpp b/src/ripple/overlay/impl/Message.cpp index 86196fcea97..aabd6ab7857 100644 --- a/src/ripple/overlay/impl/Message.cpp +++ b/src/ripple/overlay/impl/Message.cpp @@ -93,13 +93,13 @@ Message::compress() case protocol::mtSTATUS_CHANGE: case protocol::mtHAVE_SET: case protocol::mtVALIDATION: - case protocol::mtGET_SHARD_INFO: - case protocol::mtSHARD_INFO: case protocol::mtGET_PEER_SHARD_INFO: case protocol::mtPEER_SHARD_INFO: case protocol::mtPROOF_PATH_REQ: case protocol::mtPROOF_PATH_RESPONSE: case protocol::mtREPLAY_DELTA_REQ: + case protocol::mtGET_PEER_SHARD_INFO_V2: + case protocol::mtPEER_SHARD_INFO_V2: break; } return false; diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 451bccf189d..fbf9fdfdc35 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -724,98 +724,99 @@ OverlayImpl::reportTraffic( } Json::Value -OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops) +OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays) { using namespace std::chrono; - using namespace std::chrono_literals; Json::Value jv(Json::objectValue); - auto const numPeers{size()}; - if (numPeers == 0) + + // Add shard info from this server to json result + if (auto shardStore = app_.getShardStore()) + { + if (includePublicKey) + jv[jss::public_key] = + toBase58(TokenType::NodePublic, app_.nodeIdentity().first); + + auto const shardInfo{shardStore->getShardInfo()}; + if (!shardInfo->finalized().empty()) + jv[jss::complete_shards] = shardInfo->finalizedToString(); + if (!shardInfo->incomplete().empty()) + jv[jss::incomplete_shards] = shardInfo->incompleteToString(); + } + + if (relays == 0 || size() == 0) return jv; - // If greater than a hop away, we may need to gather or freshen data - if (hops > 0) { - // Prevent crawl spamming - clock_type::time_point const last(csLast_.load()); - if ((clock_type::now() - last) > 60s) - { - auto const timeout(seconds((hops * hops) * 10)); - std::unique_lock l{csMutex_}; + protocol::TMGetPeerShardInfoV2 tmGPS; + tmGPS.set_relays(relays); - // Check if already requested - if (csIDs_.empty()) - { - { - std::lock_guard lock{mutex_}; - for (auto& id : ids_) - csIDs_.emplace(id.first); - } + // Wait if a request is in progress + std::unique_lock csLock{csMutex_}; + if (!csIDs_.empty()) + csCV_.wait(csLock); - // Relay request to active peers - protocol::TMGetPeerShardInfo tmGPS; - tmGPS.set_hops(hops); - foreach(send_always(std::make_shared( - tmGPS, protocol::mtGET_PEER_SHARD_INFO))); + { + std::lock_guard lock{mutex_}; + for (auto const& id : ids_) + csIDs_.emplace(id.first); + } - if (csCV_.wait_for(l, timeout) == std::cv_status::timeout) - { - csIDs_.clear(); - csCV_.notify_all(); - } - csLast_ = duration_cast( - clock_type::now().time_since_epoch()); - } - else - csCV_.wait_for(l, timeout); + // Request peer shard info + foreach(send_always(std::make_shared( + tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2))); + + if (csCV_.wait_for(csLock, seconds(60)) == std::cv_status::timeout) + { + csIDs_.clear(); + csCV_.notify_all(); } } - // Combine the shard info from peers and their sub peers - hash_map peerShardInfo; - for_each([&](std::shared_ptr const& peer) { - if (auto psi = peer->getPeerShardInfo()) + // Combine shard info from peers + hash_map peerShardInfo; + for_each([&](std::shared_ptr&& peer) { + auto const psi{peer->getPeerShardInfos()}; + for (auto const& [publicKey, shardInfo] : psi) { - // e is non-const so it may be moved from - for (auto& e : *psi) - { - auto it{peerShardInfo.find(e.first)}; - if (it != peerShardInfo.end()) - // The key exists so join the shard indexes. - it->second.shardIndexes += e.second.shardIndexes; - else - peerShardInfo.emplace(std::move(e)); - } + auto const it{peerShardInfo.find(publicKey)}; + if (it == peerShardInfo.end()) + peerShardInfo.emplace(publicKey, shardInfo); + else if (shardInfo.msgTimestamp() > it->second.msgTimestamp()) + it->second = shardInfo; } }); - // Prepare json reply - auto& av = jv[jss::peers] = Json::Value(Json::arrayValue); - for (auto const& e : peerShardInfo) + // Add shard info to json result + if (!peerShardInfo.empty()) { - auto& pv{av.append(Json::Value(Json::objectValue))}; - if (pubKey) - pv[jss::public_key] = toBase58(TokenType::NodePublic, e.first); - - auto const& address{e.second.endpoint.address()}; - if (!address.is_unspecified()) - pv[jss::ip] = address.to_string(); + auto& av = jv[jss::peers] = Json::Value(Json::arrayValue); + for (auto const& [publicKey, shardInfo] : peerShardInfo) + { + auto& pv{av.append(Json::Value(Json::objectValue))}; + if (includePublicKey) + { + pv[jss::public_key] = + toBase58(TokenType::NodePublic, publicKey); + } - pv[jss::complete_shards] = to_string(e.second.shardIndexes); + if (!shardInfo.finalized().empty()) + pv[jss::complete_shards] = shardInfo.finalizedToString(); + if (!shardInfo.incomplete().empty()) + pv[jss::incomplete_shards] = shardInfo.incompleteToString(); + } } return jv; } void -OverlayImpl::lastLink(std::uint32_t id) +OverlayImpl::endOfPeerChain(std::uint32_t id) { - // Notify threads when every peer has received a last link. - // This doesn't account for every node that might reply but - // it is adequate. - std::lock_guard l{csMutex_}; - if (csIDs_.erase(id) && csIDs_.empty()) + // Notify threads if all peers have received a reply from all peer chains + std::lock_guard csLock{csMutex_}; + csIDs_.erase(id); + if (csIDs_.empty()) csCV_.notify_all(); } @@ -877,8 +878,16 @@ OverlayImpl::getOverlayInfo() pv[jss::complete_ledgers] = std::to_string(minSeq) + "-" + std::to_string(maxSeq); - if (auto shardIndexes = sp->getShardIndexes()) - pv[jss::complete_shards] = to_string(*shardIndexes); + auto const peerShardInfos{sp->getPeerShardInfos()}; + auto const it{peerShardInfos.find(sp->getNodePublic())}; + if (it != peerShardInfos.end()) + { + auto const& shardInfo{it->second}; + if (!shardInfo.finalized().empty()) + pv[jss::complete_shards] = shardInfo.finalizedToString(); + if (!shardInfo.incomplete().empty()) + pv[jss::incomplete_shards] = shardInfo.incompleteToString(); + } }); return jv; diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index d670a27317f..f5963532e48 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -117,8 +117,7 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler std::atomic peerDisconnects_{0}; std::atomic peerDisconnectsCharges_{0}; - // Last time we crawled peers for shard info. 'cs' = crawl shards - std::atomic csLast_{std::chrono::seconds{0}}; + // 'cs' = crawl shards std::mutex csMutex_; std::condition_variable csCV_; // Peer IDs expecting to receive a last link notification @@ -372,14 +371,14 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler } Json::Value - crawlShards(bool pubKey, std::uint32_t hops) override; + crawlShards(bool includePublicKey, std::uint32_t relays) override; - /** Called when the last link from a peer chain is received. + /** Called when the reply from the last peer in a peer chain is received. @param id peer id that received the shard info. */ void - lastLink(std::uint32_t id); + endOfPeerChain(std::uint32_t id); /** Updates message count for validator/peer. Sends TMSquelch if the number * of messages for N peers reaches threshold T. A message is counted diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 946ab00420d..280a8f39dc9 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -470,8 +470,17 @@ PeerImp::hasLedger(uint256 const& hash, std::uint32_t seq) const return true; } - return seq >= app_.getNodeStore().earliestLedgerSeq() && - hasShard(NodeStore::seqToShardIndex(seq)); + if (seq >= app_.getNodeStore().earliestLedgerSeq()) + { + std::lock_guard lock{shardInfoMutex_}; + auto const it{shardInfos_.find(publicKey_)}; + if (it != shardInfos_.end()) + { + auto const shardIndex{app_.getNodeStore().seqToShardIndex(seq)}; + return boost::icl::contains(it->second.finalized(), shardIndex); + } + } + return false; } void @@ -483,16 +492,6 @@ PeerImp::ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const maxSeq = maxLedger_; } -bool -PeerImp::hasShard(std::uint32_t shardIndex) const -{ - std::lock_guard l{shardInfoMutex_}; - auto const it{shardInfo_.find(publicKey_)}; - if (it != shardInfo_.end()) - return boost::icl::contains(it->second.shardIndexes, shardIndex); - return false; -} - bool PeerImp::hasTxSet(uint256 const& hash) const { @@ -575,23 +574,11 @@ PeerImp::fail(std::string const& name, error_code ec) close(); } -std::optional> -PeerImp::getShardIndexes() const +hash_map const +PeerImp::getPeerShardInfos() const { std::lock_guard l{shardInfoMutex_}; - auto it{shardInfo_.find(publicKey_)}; - if (it != shardInfo_.end()) - return it->second.shardIndexes; - return std::nullopt; -} - -std::optional> -PeerImp::getPeerShardInfo() const -{ - std::lock_guard l{shardInfoMutex_}; - if (!shardInfo_.empty()) - return shardInfo_; - return std::nullopt; + return shardInfos_; } void @@ -845,9 +832,9 @@ PeerImp::doProtocolStart() send(m); // Request shard info from peer - protocol::TMGetPeerShardInfo tmGPS; - tmGPS.set_hops(0); - send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO)); + protocol::TMGetPeerShardInfoV2 tmGPS; + tmGPS.set_relays(0); + send(std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2)); setTimer(); } @@ -1127,221 +1114,289 @@ PeerImp::onMessage(std::shared_ptr const& m) } void -PeerImp::onMessage(std::shared_ptr const& m) +PeerImp::onMessage(std::shared_ptr const& m) { // DEPRECATED } void -PeerImp::onMessage(std::shared_ptr const& m) +PeerImp::onMessage(std::shared_ptr const& m) { // DEPRECATED } void -PeerImp::onMessage(std::shared_ptr const& m) +PeerImp::onMessage(std::shared_ptr const& m) { auto badData = [&](std::string msg) { fee_ = Resource::feeBadData; JLOG(p_journal_.warn()) << msg; }; - if (m->hops() > csHopLimit) - return badData("Invalid hops: " + std::to_string(m->hops())); - if (m->peerchain_size() > csHopLimit) - return badData("Invalid peer chain"); + // Verify relays + if (m->relays() > relayLimit) + return badData("Invalid relays"); - // Reply with shard info we may have - if (auto shardStore = app_.getShardStore()) + // Verify peer chain + // The peer chain should not contain this node's public key + // nor the public key of the sending peer + std::set pubKeyChain; + pubKeyChain.insert(app_.nodeIdentity().first); + pubKeyChain.insert(publicKey_); + + auto const peerChainSz{m->peerchain_size()}; + if (peerChainSz > 0) { - fee_ = Resource::feeLightPeer; - auto shards{shardStore->getCompleteShards()}; - if (!shards.empty()) - { - protocol::TMPeerShardInfo reply; - reply.set_shardindexes(shards); + if (peerChainSz > relayLimit) + return badData("Invalid peer chain size"); - if (m->has_lastlink()) - reply.set_lastlink(true); + if (peerChainSz + m->relays() > relayLimit) + return badData("Invalid relays and peer chain size"); - if (m->peerchain_size() > 0) - { - for (int i = 0; i < m->peerchain_size(); ++i) - { - if (!publicKeyType(makeSlice(m->peerchain(i).nodepubkey()))) - return badData("Invalid peer chain public key"); - } - - *reply.mutable_peerchain() = m->peerchain(); - } + for (int i = 0; i < peerChainSz; ++i) + { + auto const slice{makeSlice(m->peerchain(i).publickey())}; - send(std::make_shared(reply, protocol::mtPEER_SHARD_INFO)); + // Verify peer public key + if (!publicKeyType(slice)) + return badData("Invalid peer public key"); - JLOG(p_journal_.trace()) << "Sent shard indexes " << shards; + // Verify peer public key is unique in the peer chain + if (!pubKeyChain.emplace(slice).second) + return badData("Invalid peer public key"); } } - // Relay request to peers - if (m->hops() > 0) + // Reply with shard info this node may have + if (auto shardStore = app_.getShardStore()) { - fee_ = Resource::feeMediumBurdenPeer; + auto reply{shardStore->getShardInfo()->makeMessage(app_)}; + if (peerChainSz > 0) + *(reply.mutable_peerchain()) = m->peerchain(); + send(std::make_shared(reply, protocol::mtPEER_SHARD_INFO_V2)); + } - m->set_hops(m->hops() - 1); - if (m->hops() == 0) - m->set_lastlink(true); + if (m->relays() == 0) + return; - m->add_peerchain()->set_nodepubkey( - publicKey_.data(), publicKey_.size()); + // Add peer to the peer chain + m->add_peerchain()->set_publickey(publicKey_.data(), publicKey_.size()); - overlay_.foreach(send_if_not( - std::make_shared(*m, protocol::mtGET_PEER_SHARD_INFO), - match_peer(this))); - } + // Relay the request to peers, exclude the peer chain + m->set_relays(m->relays() - 1); + overlay_.foreach(send_if_not( + std::make_shared(*m, protocol::mtGET_PEER_SHARD_INFO_V2), + [&](std::shared_ptr const& peer) { + return pubKeyChain.find(peer->getNodePublic()) != pubKeyChain.end(); + })); + + fee_ = Resource::feeMediumBurdenPeer; } void -PeerImp::onMessage(std::shared_ptr const& m) -{ +PeerImp::onMessage(std::shared_ptr const& m) +{ + // Find the earliest and latest shard indexes + auto const& db{app_.getNodeStore()}; + auto const earliestShardIndex{db.earliestShardIndex()}; + auto const latestShardIndex{[&]() -> std::optional { + auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()}; + if (curLedgerSeq >= db.earliestLedgerSeq()) + return db.seqToShardIndex(curLedgerSeq); + return std::nullopt; + }()}; + auto badData = [&](std::string msg) { fee_ = Resource::feeBadData; JLOG(p_journal_.warn()) << msg; }; - if (m->shardindexes().empty()) - return badData("Missing shard indexes"); - if (m->peerchain_size() > csHopLimit) - return badData("Invalid peer chain"); - if (m->has_nodepubkey() && !publicKeyType(makeSlice(m->nodepubkey()))) - return badData("Invalid public key"); + // Used to create a digest and verify the message signature + Serializer s; + s.add32(HashPrefix::shardInfo); - // Check if the message should be forwarded to another peer - if (m->peerchain_size() > 0) + // Verify message creation time + NodeStore::ShardInfo shardInfo; { - // Get the Public key of the last link in the peer chain - auto const s{ - makeSlice(m->peerchain(m->peerchain_size() - 1).nodepubkey())}; - if (!publicKeyType(s)) - return badData("Invalid pubKey"); - PublicKey peerPubKey(s); - - if (auto peer = overlay_.findPeerByPublicKey(peerPubKey)) - { - if (!m->has_nodepubkey()) - m->set_nodepubkey(publicKey_.data(), publicKey_.size()); - - if (!m->has_endpoint()) - { - // Check if peer will share IP publicly - if (crawl()) - m->set_endpoint(remote_address_.address().to_string()); - else - m->set_endpoint("0"); - } + auto const timestamp{ + NetClock::time_point{std::chrono::seconds{m->timestamp()}}}; + auto const now{app_.timeKeeper().now()}; + if (timestamp > (now + 5s)) + return badData("Invalid timestamp"); - m->mutable_peerchain()->RemoveLast(); - peer->send( - std::make_shared(*m, protocol::mtPEER_SHARD_INFO)); + // Check if stale + using namespace std::chrono_literals; + if (timestamp < (now - 5min)) + return badData("Stale timestamp"); - JLOG(p_journal_.trace()) - << "Relayed TMPeerShardInfo to peer with IP " - << remote_address_.address().to_string(); - } - else - { - // Peer is no longer available so the relay ends - fee_ = Resource::feeUnwantedData; - JLOG(p_journal_.info()) << "Unable to route shard info"; - } - return; + s.add32(m->timestamp()); + shardInfo.setMsgTimestamp(timestamp); } - // Parse the shard indexes received in the shard info - RangeSet shardIndexes; + // Verify incomplete shards + auto const numIncomplete{m->incomplete_size()}; + if (numIncomplete > 0) { - if (!from_string(shardIndexes, m->shardindexes())) - return badData("Invalid shard indexes"); + if (latestShardIndex && numIncomplete > *latestShardIndex) + return badData("Invalid number of incomplete shards"); - std::uint32_t earliestShard; - std::optional latestShard; + // Verify each incomplete shard + for (int i = 0; i < numIncomplete; ++i) { - auto const curLedgerSeq{ - app_.getLedgerMaster().getCurrentLedgerIndex()}; - if (auto shardStore = app_.getShardStore()) + auto const& incomplete{m->incomplete(i)}; + auto const shardIndex{incomplete.shardindex()}; + + // Verify shard index + if (shardIndex < earliestShardIndex || + (latestShardIndex && shardIndex > latestShardIndex)) { - earliestShard = shardStore->earliestShardIndex(); - if (curLedgerSeq >= shardStore->earliestLedgerSeq()) - latestShard = shardStore->seqToShardIndex(curLedgerSeq); + return badData("Invalid incomplete shard index"); } - else + s.add32(shardIndex); + + // Verify state + auto const state{static_cast(incomplete.state())}; + switch (state) + { + // Incomplete states + case ShardState::acquire: + case ShardState::complete: + case ShardState::finalizing: + case ShardState::queued: + break; + + // case ShardState::finalized: + default: + return badData("Invalid incomplete shard state"); + }; + s.add32(incomplete.state()); + + // Verify progress + std::uint32_t progress{0}; + if (incomplete.has_progress()) { - auto const earliestLedgerSeq{ - app_.getNodeStore().earliestLedgerSeq()}; - earliestShard = NodeStore::seqToShardIndex(earliestLedgerSeq); - if (curLedgerSeq >= earliestLedgerSeq) - latestShard = NodeStore::seqToShardIndex(curLedgerSeq); + progress = incomplete.progress(); + if (progress < 1 || progress > 100) + return badData("Invalid incomplete shard progress"); + s.add32(progress); } - } - if (boost::icl::first(shardIndexes) < earliestShard || - (latestShard && boost::icl::last(shardIndexes) > latestShard)) - { - return badData("Invalid shard indexes"); + // Verify each incomplete shard is unique + if (!shardInfo.update(shardIndex, state, progress)) + return badData("Invalid duplicate incomplete shards"); } } - // Get the IP of the node reporting the shard info - beast::IP::Endpoint endpoint; - if (m->has_endpoint()) + // Verify finalized shards + if (m->has_finalized()) { - if (m->endpoint() != "0") + auto const& str{m->finalized()}; + if (str.empty()) + return badData("Invalid finalized shards"); + + if (!shardInfo.setFinalizedFromString(str)) + return badData("Invalid finalized shard indexes"); + + auto const& finalized{shardInfo.finalized()}; + auto const numFinalized{boost::icl::length(finalized)}; + if (numFinalized == 0 || + boost::icl::first(finalized) < earliestShardIndex || + (latestShardIndex && + boost::icl::last(finalized) > latestShardIndex)) { - auto result = - beast::IP::Endpoint::from_string_checked(m->endpoint()); - if (!result) - return badData("Invalid incoming endpoint: " + m->endpoint()); - endpoint = std::move(*result); + return badData("Invalid finalized shard indexes"); } - } - else if (crawl()) // Check if peer will share IP publicly - { - endpoint = remote_address_; + + if (latestShardIndex && + (numFinalized + numIncomplete) > *latestShardIndex) + { + return badData("Invalid number of finalized and incomplete shards"); + } + + s.addRaw(str.data(), str.size()); } - // Get the Public key of the node reporting the shard info - PublicKey publicKey; - if (m->has_nodepubkey()) - publicKey = PublicKey(makeSlice(m->nodepubkey())); - else - publicKey = publicKey_; + // Verify public key + auto slice{makeSlice(m->publickey())}; + if (!publicKeyType(slice)) + return badData("Invalid public key"); + + // Verify peer public key isn't this nodes's public key + PublicKey const publicKey(slice); + if (publicKey == app_.nodeIdentity().first) + return badData("Invalid public key"); + + // Verify signature + if (!verify(publicKey, s.slice(), makeSlice(m->signature()), false)) + return badData("Invalid signature"); + // Forward the message if a peer chain exists + auto const peerChainSz{m->peerchain_size()}; + if (peerChainSz > 0) { - std::lock_guard l{shardInfoMutex_}; - auto it{shardInfo_.find(publicKey)}; - if (it != shardInfo_.end()) + // Verify peer chain + if (peerChainSz > relayLimit) + return badData("Invalid peer chain size"); + + // The peer chain should not contain this node's public key + // nor the public key of the sending peer + std::set pubKeyChain; + pubKeyChain.insert(app_.nodeIdentity().first); + pubKeyChain.insert(publicKey_); + + for (int i = 0; i < peerChainSz; ++i) { - // Update the IP address for the node - it->second.endpoint = std::move(endpoint); + // Verify peer public key + slice = makeSlice(m->peerchain(i).publickey()); + if (!publicKeyType(slice)) + return badData("Invalid peer public key"); - // Join the shard index range set - it->second.shardIndexes += shardIndexes; + // Verify peer public key is unique in the peer chain + if (!pubKeyChain.emplace(slice).second) + return badData("Invalid peer public key"); + } + + // If last peer in the chain is connected, relay the message + PublicKey const peerPubKey( + makeSlice(m->peerchain(peerChainSz - 1).publickey())); + if (auto peer = overlay_.findPeerByPublicKey(peerPubKey)) + { + m->mutable_peerchain()->RemoveLast(); + peer->send( + std::make_shared(*m, protocol::mtPEER_SHARD_INFO_V2)); + JLOG(p_journal_.trace()) + << "Relayed TMPeerShardInfoV2 from peer IP " + << remote_address_.address().to_string() << " to peer IP " + << peer->getRemoteAddress().to_string(); } else { - // Add a new node - ShardInfo shardInfo; - shardInfo.endpoint = std::move(endpoint); - shardInfo.shardIndexes = std::move(shardIndexes); - shardInfo_.emplace(publicKey, std::move(shardInfo)); + // Peer is no longer available so the relay ends + JLOG(p_journal_.info()) << "Unable to relay peer shard info"; } } JLOG(p_journal_.trace()) - << "Consumed TMPeerShardInfo originating from public key " - << toBase58(TokenType::NodePublic, publicKey) << " shard indexes " - << m->shardindexes(); - - if (m->has_lastlink()) - overlay_.lastLink(id_); + << "Consumed TMPeerShardInfoV2 originating from public key " + << toBase58(TokenType::NodePublic, publicKey) << " finalized shards[" + << ripple::to_string(shardInfo.finalized()) << "] incomplete shards[" + << (shardInfo.incomplete().empty() ? "empty" + : shardInfo.incompleteToString()) + << "]"; + + // Consume the message + { + std::lock_guard lock{shardInfoMutex_}; + auto const it{shardInfos_.find(publicKey_)}; + if (it == shardInfos_.end()) + shardInfos_.emplace(publicKey, std::move(shardInfo)); + else if (shardInfo.msgTimestamp() > it->second.msgTimestamp()) + it->second = std::move(shardInfo); + } + + // Notify overlay a reply was received from the last peer in this chain + if (peerChainSz == 0) + overlay_.endOfPeerChain(id_); } void diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index ccfaa317663..5ff40400abf 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -54,12 +55,6 @@ class PeerImp : public Peer, /** Whether the peer's view of the ledger converges or diverges from ours */ enum class Tracking { diverged, unknown, converged }; - struct ShardInfo - { - beast::IP::Endpoint endpoint; - RangeSet shardIndexes; - }; - private: using clock_type = std::chrono::steady_clock; using error_code = boost::system::error_code; @@ -166,8 +161,9 @@ class PeerImp : public Peer, // been sent to or received from this peer. hash_map publisherListSequences_; + // Any known shard info from this peer and its sub peers + hash_map shardInfos_; std::mutex mutable shardInfoMutex_; - hash_map shardInfo_; Compressed compressionEnabled_ = Compressed::Off; // true if validation/proposal reduce-relay feature is enabled @@ -375,9 +371,6 @@ class PeerImp : public Peer, void ledgerRange(std::uint32_t& minSeq, std::uint32_t& maxSeq) const override; - bool - hasShard(std::uint32_t shardIndex) const override; - bool hasTxSet(uint256 const& hash) const override; @@ -397,13 +390,9 @@ class PeerImp : public Peer, void fail(std::string const& reason); - /** Return a range set of known shard indexes from this peer. */ - std::optional> - getShardIndexes() const; - - /** Return any known shard info from this peer and its sub peers. */ - std::optional> - getPeerShardInfo() const; + // Return any known shard info from this peer and its sub peers + [[nodiscard]] hash_map const + getPeerShardInfos() const; bool compressionEnabled() const override @@ -501,14 +490,14 @@ class PeerImp : public Peer, void onMessage(std::shared_ptr const& m); void - onMessage(std::shared_ptr const& m); - void - onMessage(std::shared_ptr const& m); - void onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); void + onMessage(std::shared_ptr const& m); + void + onMessage(std::shared_ptr const& m); + void onMessage(std::shared_ptr const& m); void onMessage(std::shared_ptr const& m); diff --git a/src/ripple/overlay/impl/ProtocolMessage.h b/src/ripple/overlay/impl/ProtocolMessage.h index 9022994f370..bbf7c1e16cb 100644 --- a/src/ripple/overlay/impl/ProtocolMessage.h +++ b/src/ripple/overlay/impl/ProtocolMessage.h @@ -69,14 +69,6 @@ protocolMessageName(int type) return "ping"; case protocol::mtCLUSTER: return "cluster"; - case protocol::mtGET_SHARD_INFO: - return "get_shard_info"; - case protocol::mtSHARD_INFO: - return "shard_info"; - case protocol::mtGET_PEER_SHARD_INFO: - return "get_peer_shard_info"; - case protocol::mtPEER_SHARD_INFO: - return "peer_shard_info"; case protocol::mtENDPOINTS: return "endpoints"; case protocol::mtTRANSACTION: @@ -97,6 +89,10 @@ protocolMessageName(int type) return "validator_list_collection"; case protocol::mtVALIDATION: return "validation"; + case protocol::mtGET_PEER_SHARD_INFO: + return "get_peer_shard_info"; + case protocol::mtPEER_SHARD_INFO: + return "peer_shard_info"; case protocol::mtGET_OBJECTS: return "get_objects"; case protocol::mtSQUELCH: @@ -109,6 +105,10 @@ protocolMessageName(int type) return "replay_delta_request"; case protocol::mtREPLAY_DELTA_RESPONSE: return "replay_delta_response"; + case protocol::mtGET_PEER_SHARD_INFO_V2: + return "get_peer_shard_info_v2"; + case protocol::mtPEER_SHARD_INFO_V2: + return "peer_shard_info_v2"; default: break; } @@ -401,22 +401,6 @@ invokeProtocolMessage( success = detail::invoke(*header, buffers, handler); break; - case protocol::mtGET_SHARD_INFO: - success = detail::invoke( - *header, buffers, handler); - break; - case protocol::mtSHARD_INFO: - success = detail::invoke( - *header, buffers, handler); - break; - case protocol::mtGET_PEER_SHARD_INFO: - success = detail::invoke( - *header, buffers, handler); - break; - case protocol::mtPEER_SHARD_INFO: - success = detail::invoke( - *header, buffers, handler); - break; case protocol::mtENDPOINTS: success = detail::invoke( *header, buffers, handler); @@ -449,6 +433,14 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtGET_PEER_SHARD_INFO: + success = detail::invoke( + *header, buffers, handler); + break; + case protocol::mtPEER_SHARD_INFO: + success = detail::invoke( + *header, buffers, handler); + break; case protocol::mtVALIDATORLIST: success = detail::invoke( *header, buffers, handler); @@ -481,6 +473,14 @@ invokeProtocolMessage( success = detail::invoke( *header, buffers, handler); break; + case protocol::mtGET_PEER_SHARD_INFO_V2: + success = detail::invoke( + *header, buffers, handler); + break; + case protocol::mtPEER_SHARD_INFO_V2: + success = detail::invoke( + *header, buffers, handler); + break; default: handler.onMessageUnknown(header->message_type); success = true; diff --git a/src/ripple/overlay/impl/TrafficCount.cpp b/src/ripple/overlay/impl/TrafficCount.cpp index a12ee4645f3..5ae5abc3bf2 100644 --- a/src/ripple/overlay/impl/TrafficCount.cpp +++ b/src/ripple/overlay/impl/TrafficCount.cpp @@ -39,10 +39,10 @@ TrafficCount::categorize( if (type == protocol::mtENDPOINTS) return TrafficCount::category::overlay; - if ((type == protocol::mtGET_SHARD_INFO) || - (type == protocol::mtSHARD_INFO) || - (type == protocol::mtGET_PEER_SHARD_INFO) || - (type == protocol::mtPEER_SHARD_INFO)) + if ((type == protocol::mtGET_PEER_SHARD_INFO) || + (type == protocol::mtPEER_SHARD_INFO) || + (type == protocol::mtGET_PEER_SHARD_INFO_V2) || + (type == protocol::mtPEER_SHARD_INFO_V2)) return TrafficCount::category::shards; if (type == protocol::mtTRANSACTION) diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index af010eaeacf..5306aee7b70 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -6,29 +6,31 @@ package protocol; // conflict. Even if you're sure, it's probably best to assign a new type. enum MessageType { - mtMANIFESTS = 2; - mtPING = 3; - mtCLUSTER = 5; - mtENDPOINTS = 15; - mtTRANSACTION = 30; - mtGET_LEDGER = 31; - mtLEDGER_DATA = 32; - mtPROPOSE_LEDGER = 33; - mtSTATUS_CHANGE = 34; - mtHAVE_SET = 35; - mtVALIDATION = 41; - mtGET_OBJECTS = 42; - mtGET_SHARD_INFO = 50; - mtSHARD_INFO = 51; - mtGET_PEER_SHARD_INFO = 52; - mtPEER_SHARD_INFO = 53; - mtVALIDATORLIST = 54; - mtSQUELCH = 55; - mtVALIDATORLISTCOLLECTION = 56; - mtPROOF_PATH_REQ = 57; - mtPROOF_PATH_RESPONSE = 58; - mtREPLAY_DELTA_REQ = 59; - mtREPLAY_DELTA_RESPONSE = 60; + mtMANIFESTS = 2; + mtPING = 3; + mtCLUSTER = 5; + mtENDPOINTS = 15; + mtTRANSACTION = 30; + mtGET_LEDGER = 31; + mtLEDGER_DATA = 32; + mtPROPOSE_LEDGER = 33; + mtSTATUS_CHANGE = 34; + mtHAVE_SET = 35; + mtVALIDATION = 41; + mtGET_OBJECTS = 42; + mtGET_SHARD_INFO = 50; + mtSHARD_INFO = 51; + mtGET_PEER_SHARD_INFO = 52; + mtPEER_SHARD_INFO = 53; + mtVALIDATORLIST = 54; + mtSQUELCH = 55; + mtVALIDATORLISTCOLLECTION = 56; + mtPROOF_PATH_REQ = 57; + mtPROOF_PATH_RESPONSE = 58; + mtREPLAY_DELTA_REQ = 59; + mtREPLAY_DELTA_RESPONSE = 60; + mtGET_PEER_SHARD_INFO_V2 = 61; + mtPEER_SHARD_INFO_V2 = 62; } // token, iterations, target, challenge = issue demand for proof of work @@ -79,46 +81,75 @@ message TMCluster repeated TMLoadSource loadSources = 2; } +// Node public key +message TMLink +{ + required bytes nodePubKey = 1 [deprecated=true]; // node public key +} + // Request info on shards held -message TMGetShardInfo +message TMGetPeerShardInfo { required uint32 hops = 1 [deprecated=true]; // number of hops to travel optional bool lastLink = 2 [deprecated=true]; // true if last link in the peer chain - repeated uint32 peerchain = 3 [deprecated=true]; // IDs used to route messages + repeated TMLink peerChain = 3 [deprecated=true]; // public keys used to route messages } // Info about shards held -message TMShardInfo +message TMPeerShardInfo { required string shardIndexes = 1 [deprecated=true]; // rangeSet of shard indexes - optional bytes nodePubKey = 2 [deprecated=true]; // The node's public key + optional bytes nodePubKey = 2 [deprecated=true]; // node public key optional string endpoint = 3 [deprecated=true]; // ipv6 or ipv4 address optional bool lastLink = 4 [deprecated=true]; // true if last link in the peer chain - repeated uint32 peerchain = 5 [deprecated=true]; // IDs used to route messages + repeated TMLink peerChain = 5 [deprecated=true]; // public keys used to route messages } -// Node public key -message TMLink +// Peer public key +message TMPublicKey { - required bytes nodePubKey = 1; // node public key + required bytes publicKey = 1; } -// Request info on shards held -message TMGetPeerShardInfo +// Request peer shard information +message TMGetPeerShardInfoV2 { - required uint32 hops = 1; // number of hops to travel - optional bool lastLink = 2; // true if last link in the peer chain - repeated TMLink peerChain = 3; // public keys used to route messages + // Peer public keys used to route messages + repeated TMPublicKey peerChain = 1; + + // Remaining times to relay + required uint32 relays = 2; } -// Info about shards held -message TMPeerShardInfo +// Peer shard information +message TMPeerShardInfoV2 { - required string shardIndexes = 1; // rangeSet of shard indexes - optional bytes nodePubKey = 2; // node public key - optional string endpoint = 3; // ipv6 or ipv4 address - optional bool lastLink = 4; // true if last link in the peer chain - repeated TMLink peerChain = 5; // public keys used to route messages + message TMIncomplete + { + required uint32 shardIndex = 1; + required uint32 state = 2; + + // State completion percent, 1 - 100 + optional uint32 progress = 3; + } + + // Message creation time + required uint32 timestamp = 1; + + // Incomplete shards being acquired or verified + repeated TMIncomplete incomplete = 2; + + // Verified immutable shards (RangeSet) + optional string finalized = 3; + + // Public key of node that authored the shard info + required bytes publicKey = 4; + + // Digital signature of node that authored the shard info + required bytes signature = 5; + + // Peer public keys used to route messages + repeated TMPublicKey peerChain = 6; } // A transaction can have only one input and one output. diff --git a/src/ripple/protocol/HashPrefix.h b/src/ripple/protocol/HashPrefix.h index ec080c2d555..409f9de9b51 100644 --- a/src/ripple/protocol/HashPrefix.h +++ b/src/ripple/protocol/HashPrefix.h @@ -84,6 +84,9 @@ enum class HashPrefix : std::uint32_t { /** Payment Channel Claim */ paymentChannelClaim = detail::make_hash_prefix('C', 'L', 'M'), + + /** shard info for signing */ + shardInfo = detail::make_hash_prefix('S', 'H', 'D'), }; template diff --git a/src/ripple/protocol/SystemParameters.h b/src/ripple/protocol/SystemParameters.h index 2a59de656d6..0620f5f66ca 100644 --- a/src/ripple/protocol/SystemParameters.h +++ b/src/ripple/protocol/SystemParameters.h @@ -58,7 +58,10 @@ systemCurrencyCode() } /** The XRP ledger network's earliest allowed sequence */ -static std::uint32_t constexpr XRP_LEDGER_EARLIEST_SEQ{32570}; +static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ{32570u}; + +/** The number of ledgers in a shard */ +static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD{16384u}; /** The minimum amount of support an amendment should have. diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index a67e2628a39..414dac6fb6c 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -65,6 +65,7 @@ JSS(EscrowFinish); // transaction type. JSS(Fee); // in/out: TransactionSign; field. JSS(FeeSettings); // ledger type. JSS(Flags); // in/out: TransactionSign; field. +JSS(incomplete_shards); // out: OverlayImpl, PeerImp JSS(Invalid); // JSS(LastLedgerSequence); // in: TransactionSign; field JSS(LedgerHashes); // ledger type. diff --git a/src/ripple/rpc/handlers/CrawlShards.cpp b/src/ripple/rpc/handlers/CrawlShards.cpp index 501ce4fa114..87292e57b93 100644 --- a/src/ripple/rpc/handlers/CrawlShards.cpp +++ b/src/ripple/rpc/handlers/CrawlShards.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -48,37 +49,23 @@ doCrawlShards(RPC::JsonContext& context) if (context.role != Role::ADMIN) return rpcError(rpcNO_PERMISSION); - std::uint32_t hops{0}; + std::uint32_t relays{0}; if (auto const& jv = context.params[jss::limit]) { if (!(jv.isUInt() || (jv.isInt() && jv.asInt() >= 0))) - { return RPC::expected_field_error(jss::limit, "unsigned integer"); - } - - hops = std::min(jv.asUInt(), csHopLimit); + relays = std::min(jv.asUInt(), relayLimit); + context.loadType = Resource::feeHighBurdenRPC; } + else + context.loadType = Resource::feeMediumBurdenRPC; - bool const pubKey{ + // Collect shard info from server and peers + bool const includePublicKey{ context.params.isMember(jss::public_key) && context.params[jss::public_key].asBool()}; - - // Collect shard info from peers connected to this server - Json::Value jvResult{context.app.overlay().crawlShards(pubKey, hops)}; - - // Collect shard info from this server - if (auto shardStore = context.app.getShardStore()) - { - if (pubKey) - jvResult[jss::public_key] = toBase58( - TokenType::NodePublic, context.app.nodeIdentity().first); - jvResult[jss::complete_shards] = shardStore->getCompleteShards(); - } - - if (hops == 0) - context.loadType = Resource::feeMediumBurdenRPC; - else - context.loadType = Resource::feeHighBurdenRPC; + Json::Value jvResult{ + context.app.overlay().crawlShards(includePublicKey, relays)}; return jvResult; } diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index a1a59d3f015..401706567ea 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -271,11 +271,6 @@ class TestPeer : public Peer { } bool - hasShard(std::uint32_t shardIndex) const override - { - return false; - } - bool hasTxSet(uint256 const& hash) const override { return false; diff --git a/src/test/basics/RangeSet_test.cpp b/src/test/basics/RangeSet_test.cpp index 30cca8fe6f3..ccf76fad0d4 100644 --- a/src/test/basics/RangeSet_test.cpp +++ b/src/test/basics/RangeSet_test.cpp @@ -96,22 +96,18 @@ class RangeSet_test : public beast::unit_test::suite BEAST_EXPECT(!from_string(set, "1,,2")); BEAST_EXPECT(boost::icl::length(set) == 0); - set.clear(); BEAST_EXPECT(from_string(set, "1")); BEAST_EXPECT(boost::icl::length(set) == 1); BEAST_EXPECT(boost::icl::first(set) == 1); - set.clear(); BEAST_EXPECT(from_string(set, "1,1")); BEAST_EXPECT(boost::icl::length(set) == 1); BEAST_EXPECT(boost::icl::first(set) == 1); - set.clear(); BEAST_EXPECT(from_string(set, "1-1")); BEAST_EXPECT(boost::icl::length(set) == 1); BEAST_EXPECT(boost::icl::first(set) == 1); - set.clear(); BEAST_EXPECT(from_string(set, "1,4-6")); BEAST_EXPECT(boost::icl::length(set) == 4); BEAST_EXPECT(boost::icl::first(set) == 1); @@ -121,7 +117,6 @@ class RangeSet_test : public beast::unit_test::suite BEAST_EXPECT(boost::icl::contains(set, 5)); BEAST_EXPECT(boost::icl::last(set) == 6); - set.clear(); BEAST_EXPECT(from_string(set, "1-2,4-6")); BEAST_EXPECT(boost::icl::length(set) == 5); BEAST_EXPECT(boost::icl::first(set) == 1); @@ -129,7 +124,6 @@ class RangeSet_test : public beast::unit_test::suite BEAST_EXPECT(boost::icl::contains(set, 4)); BEAST_EXPECT(boost::icl::last(set) == 6); - set.clear(); BEAST_EXPECT(from_string(set, "1-2,6")); BEAST_EXPECT(boost::icl::length(set) == 3); BEAST_EXPECT(boost::icl::first(set) == 1); diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index f8babe9f55b..e710ba3d32b 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -19,6 +19,8 @@ #include #include +#include +#include #include #include #include @@ -27,14 +29,15 @@ #include #include #include +#include +#include + #include #include #include #include #include #include -#include -#include namespace ripple { namespace NodeStore { @@ -177,7 +180,7 @@ class DatabaseShard_test : public TestBase /* ring used to generate pseudo-random sequence */ beast::xor_shift_engine rng_; /* number of shards to generate */ - int nShards_; + int numShards_; /* vector of accounts used to send test transactions */ std::vector accounts_; /* nAccounts_[i] is the number of these accounts existed before i-th @@ -196,11 +199,11 @@ class DatabaseShard_test : public TestBase TestData( std::uint64_t const seedValue, int dataSize = dataSizeMax, - int nShards = 1) - : rng_(seedValue), nShards_(nShards) + int numShards = 1) + : rng_(seedValue), numShards_(numShards) { std::uint32_t n = 0; - std::uint32_t nLedgers = ledgersPerShard * nShards; + std::uint32_t nLedgers = ledgersPerShard * numShards; nAccounts_.reserve(nLedgers); payAccounts_.reserve(nLedgers); @@ -285,7 +288,7 @@ class DatabaseShard_test : public TestBase } } - for (std::uint32_t i = 0; i < ledgersPerShard * nShards_; ++i) + for (std::uint32_t i = 0; i < ledgersPerShard * numShards_; ++i) { auto const index = i + (startIndex * ledgersPerShard); @@ -545,8 +548,8 @@ class DatabaseShard_test : public TestBase } RangeSet rs; - from_string(rs, set); - return to_string(rs); + BEAST_EXPECT(from_string(rs, set)); + return ripple::to_string(rs); } std::unique_ptr @@ -573,76 +576,79 @@ class DatabaseShard_test : public TestBase std::to_string(earliestSeq)); // Node store configuration - cfg->overwrite( - ConfigSection::nodeDatabase(), - "earliest_seq", - std::to_string(earliestSeq)); cfg->overwrite( ConfigSection::nodeDatabase(), "path", nodeDir.empty() ? defNodeDir.path() : nodeDir); + cfg->overwrite( + ConfigSection::nodeDatabase(), + "ledgers_per_shard", + std::to_string(ledgersPerShard)); + cfg->overwrite( + ConfigSection::nodeDatabase(), + "earliest_seq", + std::to_string(earliestSeq)); return cfg; }); } - std::optional + std::optional waitShard( - DatabaseShard& db, - int shardIndex, + DatabaseShard& shardStore, + std::uint32_t shardIndex, std::chrono::seconds timeout = shardStoreTimeout) { - RangeSet rs; - auto start = std::chrono::system_clock::now(); - auto end = start + timeout; - while (!from_string(rs, db.getCompleteShards()) || - !boost::icl::contains(rs, shardIndex)) + auto const end{std::chrono::system_clock::now() + timeout}; + while (shardStore.getNumTasks() || + !boost::icl::contains( + shardStore.getShardInfo()->finalized(), shardIndex)) { if (!BEAST_EXPECT(std::chrono::system_clock::now() < end)) - return {}; - std::this_thread::yield(); + return std::nullopt; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } return shardIndex; } - std::optional + std::optional createShard( TestData& data, - DatabaseShard& db, - int maxShardNumber = 1, - int ledgerOffset = 0) + DatabaseShard& shardStore, + int maxShardIndex = 1, + int shardOffset = 0) { int shardIndex{-1}; for (std::uint32_t i = 0; i < ledgersPerShard; ++i) { - auto const ledgerSeq{ - db.prepareLedger((maxShardNumber + 1) * ledgersPerShard)}; + auto const ledgerSeq{shardStore.prepareLedger( + (maxShardIndex + 1) * ledgersPerShard)}; if (!BEAST_EXPECT(ledgerSeq != std::nullopt)) - return {}; + return std::nullopt; - shardIndex = db.seqToShardIndex(*ledgerSeq); + shardIndex = shardStore.seqToShardIndex(*ledgerSeq); - int const arrInd = *ledgerSeq - (ledgersPerShard * ledgerOffset) - + int const arrInd = *ledgerSeq - (ledgersPerShard * shardOffset) - ledgersPerShard - 1; BEAST_EXPECT( - arrInd >= 0 && arrInd < maxShardNumber * ledgersPerShard); - BEAST_EXPECT(saveLedger(db, *data.ledgers_[arrInd])); + arrInd >= 0 && arrInd < maxShardIndex * ledgersPerShard); + BEAST_EXPECT(saveLedger(shardStore, *data.ledgers_[arrInd])); if (arrInd % ledgersPerShard == (ledgersPerShard - 1)) { uint256 const finalKey_{0}; Serializer s; s.add32(Shard::version); - s.add32(db.firstLedgerSeq(shardIndex)); - s.add32(db.lastLedgerSeq(shardIndex)); + s.add32(shardStore.firstLedgerSeq(shardIndex)); + s.add32(shardStore.lastLedgerSeq(shardIndex)); s.addRaw(data.ledgers_[arrInd]->info().hash.data(), 256 / 8); - db.store( + shardStore.store( hotUNKNOWN, std::move(s.modData()), finalKey_, *ledgerSeq); } - db.setStored(data.ledgers_[arrInd]); + shardStore.setStored(data.ledgers_[arrInd]); } - return waitShard(db, shardIndex); + return waitShard(shardStore, shardIndex); } void @@ -653,25 +659,47 @@ class DatabaseShard_test : public TestBase using namespace test::jtx; beast::temp_dir shardDir; - Env env{*this, testConfig(shardDir.path())}; DummyScheduler scheduler; RootStoppable parent("TestRootStoppable"); + { + Env env{*this, testConfig(shardDir.path())}; + std::unique_ptr shardStore{ + make_ShardStore(env.app(), parent, scheduler, 2, journal_)}; + + BEAST_EXPECT(shardStore); + BEAST_EXPECT(shardStore->init()); + BEAST_EXPECT(shardStore->ledgersPerShard() == ledgersPerShard); + BEAST_EXPECT(shardStore->seqToShardIndex(ledgersPerShard + 1) == 1); + BEAST_EXPECT(shardStore->seqToShardIndex(2 * ledgersPerShard) == 1); + BEAST_EXPECT( + shardStore->seqToShardIndex(2 * ledgersPerShard + 1) == 2); + BEAST_EXPECT( + shardStore->earliestShardIndex() == + (earliestSeq - 1) / ledgersPerShard); + BEAST_EXPECT(shardStore->firstLedgerSeq(1) == ledgersPerShard + 1); + BEAST_EXPECT(shardStore->lastLedgerSeq(1) == 2 * ledgersPerShard); + BEAST_EXPECT(shardStore->getRootDir().string() == shardDir.path()); + } - std::unique_ptr db = - make_ShardStore(env.app(), parent, scheduler, 2, journal_); + { + Env env{*this, testConfig(shardDir.path())}; + std::unique_ptr shardStore{ + make_ShardStore(env.app(), parent, scheduler, 2, journal_)}; - BEAST_EXPECT(db); - BEAST_EXPECT(db->ledgersPerShard() == db->ledgersPerShardDefault); - BEAST_EXPECT(db->init()); - BEAST_EXPECT(db->ledgersPerShard() == ledgersPerShard); - BEAST_EXPECT(db->seqToShardIndex(ledgersPerShard + 1) == 1); - BEAST_EXPECT(db->seqToShardIndex(2 * ledgersPerShard) == 1); - BEAST_EXPECT(db->seqToShardIndex(2 * ledgersPerShard + 1) == 2); - BEAST_EXPECT( - db->earliestShardIndex() == (earliestSeq - 1) / ledgersPerShard); - BEAST_EXPECT(db->firstLedgerSeq(1) == ledgersPerShard + 1); - BEAST_EXPECT(db->lastLedgerSeq(1) == 2 * ledgersPerShard); - BEAST_EXPECT(db->getRootDir().string() == shardDir.path()); + env.app().config().overwrite( + ConfigSection::shardDatabase(), "ledgers_per_shard", "512"); + BEAST_EXPECT(!shardStore->init()); + } + + Env env{*this, testConfig(shardDir.path())}; + std::unique_ptr shardStore{ + make_ShardStore(env.app(), parent, scheduler, 2, journal_)}; + + env.app().config().overwrite( + ConfigSection::shardDatabase(), + "earliest_seq", + std::to_string(std::numeric_limits::max())); + BEAST_EXPECT(!shardStore->init()); } void @@ -714,9 +742,11 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - for (std::uint32_t i = 0; i < 2; ++i) + for (auto i = 0; i < 2; ++i) + { if (!createShard(data, *db, 2)) return; + } } { Env env{*this, testConfig(shardDir.path())}; @@ -736,9 +766,9 @@ class DatabaseShard_test : public TestBase } void - testGetCompleteShards(std::uint64_t const seedValue) + testGetFinalShards(std::uint64_t const seedValue) { - testcase("Get complete shards"); + testcase("Get final shards"); using namespace test::jtx; @@ -751,17 +781,20 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - BEAST_EXPECT(db->getCompleteShards() == ""); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); - std::uint64_t bitMask = 0; - - for (std::uint32_t i = 0; i < nTestShards; ++i) + for (auto i = 0; i < nTestShards; ++i) { - auto n = createShard(data, *db, nTestShards); - if (!BEAST_EXPECT(n && *n >= 1 && *n <= nTestShards)) + auto const shardIndex{createShard(data, *db, nTestShards)}; + if (!BEAST_EXPECT( + shardIndex && *shardIndex >= 1 && + *shardIndex <= nTestShards)) + { return; - bitMask |= 1ll << *n; - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(bitMask)); + } + + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), *shardIndex)); } } @@ -781,47 +814,54 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - std::uint64_t bitMask = 0; BEAST_EXPECT(db->getPreShards() == ""); + BEAST_EXPECT(!db->prepareShards({})); + std::uint64_t bitMask = 0; for (std::uint32_t i = 0; i < nTestShards * 2; ++i) { - std::uint32_t n = randInt(data.rng_, nTestShards - 1) + 1; - if (bitMask & (1ll << n)) + std::uint32_t const shardIndex{ + randInt(data.rng_, nTestShards - 1) + 1}; + if (bitMask & (1ll << shardIndex)) { - db->removePreShard(n); - bitMask &= ~(1ll << n); + db->removePreShard(shardIndex); + bitMask &= ~(1ll << shardIndex); } else { - db->prepareShards({n}); - bitMask |= 1ll << n; + BEAST_EXPECT(db->prepareShards({shardIndex})); + bitMask |= 1ll << shardIndex; } BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); } // test illegal cases // adding shards with too large number - db->prepareShards({0}); + BEAST_EXPECT(!db->prepareShards({0})); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); - db->prepareShards({nTestShards + 1}); + BEAST_EXPECT(!db->prepareShards({nTestShards + 1})); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); - db->prepareShards({nTestShards + 2}); + BEAST_EXPECT(!db->prepareShards({nTestShards + 2})); BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); // create shards which are not prepared for import - BEAST_EXPECT(db->getCompleteShards() == ""); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); std::uint64_t bitMask2 = 0; - - for (std::uint32_t i = 0; i < nTestShards; ++i) + for (auto i = 0; i < nTestShards; ++i) { - auto n = createShard(data, *db, nTestShards); - if (!BEAST_EXPECT(n && *n >= 1 && *n <= nTestShards)) + auto const shardIndex{createShard(data, *db, nTestShards)}; + if (!BEAST_EXPECT( + shardIndex && *shardIndex >= 1 && + *shardIndex <= nTestShards)) + { return; - bitMask2 |= 1ll << *n; - BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(bitMask2)); + } + + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), *shardIndex)); + + bitMask2 |= 1ll << *shardIndex; BEAST_EXPECT((bitMask & bitMask2) == 0); if ((bitMask | bitMask2) == ((1ll << nTestShards) - 1) << 1) break; @@ -872,8 +912,9 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - db->prepareShards({1}); - BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(2)); + BEAST_EXPECT(!db->importShard(1, importPath / "not_exist")); + BEAST_EXPECT(db->prepareShards({1})); + BEAST_EXPECT(db->getPreShards() == "1"); using namespace boost::filesystem; remove_all(importPath / LgrDBName); @@ -911,9 +952,11 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - for (std::uint32_t i = 0; i < 2; ++i) + for (auto i = 0; i < 2; ++i) + { if (!BEAST_EXPECT(createShard(data, *db, 2))) return; + } } boost::filesystem::path path = shardDir.path(); @@ -937,10 +980,10 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - for (std::uint32_t i = 1; i <= 1; ++i) - waitShard(*db, i); + for (std::uint32_t shardIndex = 1; shardIndex <= 1; ++shardIndex) + waitShard(*db, shardIndex); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x2)); + BEAST_EXPECT(boost::icl::contains(db->getShardInfo()->finalized(), 1)); for (std::uint32_t i = 0; i < 1 * ledgersPerShard; ++i) checkLedger(data, *db, *data.ledgers_[i]); @@ -999,7 +1042,11 @@ class DatabaseShard_test : public TestBase } if (i == 2) + { waitShard(*db, shardIndex); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), 1)); + } else { boost::filesystem::path path(shardDir.path()); @@ -1012,11 +1059,9 @@ class DatabaseShard_test : public TestBase { std::this_thread::yield(); } - } - BEAST_EXPECT( - db->getCompleteShards() == - bitmask2Rangeset(i == 2 ? 2 : 0)); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); + } } { @@ -1029,23 +1074,22 @@ class DatabaseShard_test : public TestBase return; if (i == 2) + { waitShard(*db, 1); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), 1)); - BEAST_EXPECT( - db->getCompleteShards() == - bitmask2Rangeset(i == 2 ? 2 : 0)); - - if (i == 2) - { for (std::uint32_t j = 0; j < ledgersPerShard; ++j) checkLedger(data, *db, *data.ledgers_[j]); } + else + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); } } } void - testImport(std::uint64_t const seedValue) + testImportNodeStore(std::uint64_t const seedValue) { testcase("Import node store"); @@ -1066,11 +1110,15 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); + for (std::uint32_t i = 1; i <= 2; ++i) waitShard(*db, i); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); + + auto const finalShards{std::move(db->getShardInfo()->finalized())}; + for (std::uint32_t shardIndex : {1, 2}) + BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); } { Env env{*this, testConfig(shardDir.path())}; @@ -1084,7 +1132,9 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 1; i <= 2; ++i) waitShard(*db, i); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0x6)); + auto const finalShards{std::move(db->getShardInfo()->finalized())}; + for (std::uint32_t shardIndex : {1, 2}) + BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) checkLedger(data, *db, *data.ledgers_[i]); @@ -1120,8 +1170,10 @@ class DatabaseShard_test : public TestBase using namespace test::jtx; - std::string ripemd160Key("B2F9DB61F714A82889966F097CD615C36DB2B01D"), - ripemd160Dat("6DB1D02CD019F09198FE80DB5A7D707F0C6BFF4C"); + std::string const ripemd160Key( + "B23490F7830707E8BEEFBFD78D76F437AE9DDE72"); + std::string const ripemd160Dat( + "8DCF2DA184D8DEC0E3D2716D7BF6C56641B6DA3B"); for (int i = 0; i < 2; i++) { @@ -1155,18 +1207,10 @@ class DatabaseShard_test : public TestBase boost::filesystem::path path(shardDir.path()); path /= "1"; - boost::filesystem::path keypath = path / "nudb.key"; - std::string key = ripemd160File(keypath.string()); - boost::filesystem::path datpath = path / "nudb.dat"; - std::string dat = ripemd160File(datpath.string()); - - std::cerr << "Iteration " << i << ": RIPEMD160[nudb.key] = " << key - << std::endl; - std::cerr << "Iteration " << i << ": RIPEMD160[nudb.dat] = " << dat - << std::endl; - - BEAST_EXPECT(key == ripemd160Key); - BEAST_EXPECT(dat == ripemd160Dat); + BEAST_EXPECT( + ripemd160File((path / "nudb.key").string()) == ripemd160Key); + BEAST_EXPECT( + ripemd160File((path / "nudb.dat").string()) == ripemd160Dat); } } @@ -1177,8 +1221,7 @@ class DatabaseShard_test : public TestBase using namespace test::jtx; - // Test importing with multiple historical - // paths + // Test importing with multiple historical paths { beast::temp_dir shardDir; std::array historicalDirs; @@ -1214,13 +1257,15 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); for (std::uint32_t i = 1; i <= ledgerCount; ++i) waitShard(*db, i); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110)); + auto const final{std::move(db->getShardInfo()->finalized())}; + for (std::uint32_t shardIndex : {1, 2, 3, 4}) + BEAST_EXPECT(boost::icl::contains(final, shardIndex)); auto const mainPathCount = std::distance( boost::filesystem::directory_iterator(shardDir.path()), @@ -1246,8 +1291,7 @@ class DatabaseShard_test : public TestBase BEAST_EXPECT(historicalPathCount == ledgerCount - 2); } - // Test importing with a single historical - // path + // Test importing with a single historical path { beast::temp_dir shardDir; beast::temp_dir historicalDir; @@ -1272,13 +1316,15 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0)); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); for (std::uint32_t i = 1; i <= ledgerCount; ++i) waitShard(*db, i); - BEAST_EXPECT(db->getCompleteShards() == bitmask2Rangeset(0b11110)); + auto const finalShards{std::move(db->getShardInfo()->finalized())}; + for (std::uint32_t shardIndex : {1, 2, 3, 4}) + BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); auto const mainPathCount = std::distance( boost::filesystem::directory_iterator(shardDir.path()), @@ -1305,170 +1351,114 @@ class DatabaseShard_test : public TestBase using namespace test::jtx; - // Test importing with multiple historical - // paths - { - beast::temp_dir shardDir; - std::array historicalDirs; - std::array historicalPaths; - - std::transform( - historicalDirs.begin(), - historicalDirs.end(), - historicalPaths.begin(), - [](const beast::temp_dir& dir) { return dir.path(); }); + // Create the primary shard directory + beast::temp_dir primaryDir; + auto config{testConfig(primaryDir.path())}; - beast::temp_dir nodeDir; - auto c = testConfig(shardDir.path()); - - auto& historyPaths = c->section(SECTION_HISTORICAL_SHARD_PATHS); - historyPaths.append( - {historicalPaths[0].string(), - historicalPaths[1].string(), - historicalPaths[2].string(), - historicalPaths[3].string()}); - - Env env{*this, std::move(c)}; - DatabaseShard* db = env.app().getShardStore(); - BEAST_EXPECT(db); + // Create four historical directories + std::array historicalDirs; + { + auto& paths{config->section(SECTION_HISTORICAL_SHARD_PATHS)}; + for (auto const& dir : historicalDirs) + paths.append(dir.path()); + } - auto const ledgerCount = 4; + Env env{*this, std::move(config)}; - TestData data(seedValue, 4, ledgerCount); - if (!BEAST_EXPECT(data.makeLedgers(env))) - return; + // Create some shards + std::uint32_t constexpr numShards{4}; + TestData data(seedValue, 4, numShards); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; - BEAST_EXPECT(db->getCompleteShards() == ""); - std::uint64_t bitMask = 0; + auto shardStore{env.app().getShardStore()}; + BEAST_EXPECT(shardStore); - // Add ten shards to the Shard Database - for (std::uint32_t i = 0; i < ledgerCount; ++i) + for (auto i = 0; i < numShards; ++i) + { + auto const shardIndex{createShard(data, *shardStore, numShards)}; + if (!BEAST_EXPECT( + shardIndex && *shardIndex >= 1 && *shardIndex <= numShards)) { - auto n = createShard(data, *db, ledgerCount); - if (!BEAST_EXPECT(n && *n >= 1 && *n <= ledgerCount)) - return; - bitMask |= 1ll << *n; - BEAST_EXPECT( - db->getCompleteShards() == bitmask2Rangeset(bitMask)); + return; } + } - auto mainPathCount = std::distance( - boost::filesystem::directory_iterator(shardDir.path()), - boost::filesystem::directory_iterator()); - - // Only the two most recent shards - // should be stored at the main path - BEAST_EXPECT(mainPathCount == 2); - - // Confirm recent shard locations - std::set mainPathShards{ - shardDir.path() / boost::filesystem::path("3"), - shardDir.path() / boost::filesystem::path("4")}; - std::set actual( - boost::filesystem::directory_iterator(shardDir.path()), - boost::filesystem::directory_iterator()); - - BEAST_EXPECT(mainPathShards == actual); - - const auto generateHistoricalStems = [&historicalPaths, &actual] { - for (auto const& path : historicalPaths) - { - for (auto const& shard : - boost::filesystem::directory_iterator(path)) - { - actual.insert(boost::filesystem::path(shard).stem()); - } - } - }; - - // Confirm historical shard locations - std::set historicalPathShards; - std::generate_n( - std::inserter( - historicalPathShards, historicalPathShards.begin()), - 2, - [n = 1]() mutable { return std::to_string(n++); }); - actual.clear(); - generateHistoricalStems(); + { + // Confirm finalized shards are in the shard store + auto const finalized{shardStore->getShardInfo()->finalized()}; + BEAST_EXPECT(boost::icl::length(finalized) == numShards); + BEAST_EXPECT(boost::icl::first(finalized) == 1); + BEAST_EXPECT(boost::icl::last(finalized) == numShards); + } - BEAST_EXPECT(historicalPathShards == actual); + using namespace boost::filesystem; + auto const dirContains = [](beast::temp_dir const& dir, + std::uint32_t shardIndex) { + boost::filesystem::path const path(std::to_string(shardIndex)); + for (auto const& it : directory_iterator(dir.path())) + if (boost::filesystem::path(it).stem() == path) + return true; + return false; + }; + auto const historicalDirsContains = [&](std::uint32_t shardIndex) { + for (auto const& dir : historicalDirs) + if (dirContains(dir, shardIndex)) + return true; + return false; + }; - auto historicalPathCount = std::accumulate( - historicalPaths.begin(), - historicalPaths.end(), - 0, - [](int const sum, boost::filesystem::path const& path) { - return sum + - std::distance( - boost::filesystem::directory_iterator(path), - boost::filesystem::directory_iterator()); - }); + // Confirm two most recent shards are in the primary shard directory + for (auto const shardIndex : {numShards - 1, numShards}) + { + BEAST_EXPECT(dirContains(primaryDir, shardIndex)); + BEAST_EXPECT(!historicalDirsContains(shardIndex)); + } - // All historical shards should be stored - // at historical paths - BEAST_EXPECT(historicalPathCount == ledgerCount - 2); + // Confirm remaining shards are in the historical shard directories + for (auto shardIndex = 1; shardIndex < numShards - 1; ++shardIndex) + { + BEAST_EXPECT(!dirContains(primaryDir, shardIndex)); + BEAST_EXPECT(historicalDirsContains(shardIndex)); + } - data = TestData(seedValue * 2, 4, ledgerCount); - if (!BEAST_EXPECT(data.makeLedgers(env, ledgerCount))) - return; + // Create some more shards to exercise recent shard rotation + data = TestData(seedValue * 2, 4, numShards); + if (!BEAST_EXPECT(data.makeLedgers(env, numShards))) + return; - // Add ten more shards to the Shard Database - // to exercise recent shard rotation - for (std::uint32_t i = 0; i < ledgerCount; ++i) + for (auto i = 0; i < numShards; ++i) + { + auto const shardIndex{ + createShard(data, *shardStore, numShards * 2, numShards)}; + if (!BEAST_EXPECT( + shardIndex && *shardIndex >= numShards + 1 && + *shardIndex <= numShards * 2)) { - auto n = createShard(data, *db, ledgerCount * 2, ledgerCount); - if (!BEAST_EXPECT( - n && *n >= 1 + ledgerCount && *n <= ledgerCount * 2)) - return; - bitMask |= 1ll << *n; - BEAST_EXPECT( - db->getCompleteShards() == bitmask2Rangeset(bitMask)); + return; } + } - mainPathCount = std::distance( - boost::filesystem::directory_iterator(shardDir.path()), - boost::filesystem::directory_iterator()); - - // Only the two most recent shards - // should be stored at the main path - BEAST_EXPECT(mainPathCount == 2); - - // Confirm recent shard locations - mainPathShards = { - shardDir.path() / boost::filesystem::path("7"), - shardDir.path() / boost::filesystem::path("8")}; - actual = { - boost::filesystem::directory_iterator(shardDir.path()), - boost::filesystem::directory_iterator()}; - - BEAST_EXPECT(mainPathShards == actual); - - // Confirm historical shard locations - historicalPathShards.clear(); - std::generate_n( - std::inserter( - historicalPathShards, historicalPathShards.begin()), - 6, - [n = 1]() mutable { return std::to_string(n++); }); - actual.clear(); - generateHistoricalStems(); - - BEAST_EXPECT(historicalPathShards == actual); + { + // Confirm finalized shards are in the shard store + auto const finalized{shardStore->getShardInfo()->finalized()}; + BEAST_EXPECT(boost::icl::length(finalized) == numShards * 2); + BEAST_EXPECT(boost::icl::first(finalized) == 1); + BEAST_EXPECT(boost::icl::last(finalized) == numShards * 2); + } - historicalPathCount = std::accumulate( - historicalPaths.begin(), - historicalPaths.end(), - 0, - [](int const sum, boost::filesystem::path const& path) { - return sum + - std::distance( - boost::filesystem::directory_iterator(path), - boost::filesystem::directory_iterator()); - }); + // Confirm two most recent shards are in the primary shard directory + for (auto const shardIndex : {numShards * 2 - 1, numShards * 2}) + { + BEAST_EXPECT(dirContains(primaryDir, shardIndex)); + BEAST_EXPECT(!historicalDirsContains(shardIndex)); + } - // All historical shards should be stored - // at historical paths - BEAST_EXPECT(historicalPathCount == (ledgerCount * 2) - 2); + // Confirm remaining shards are in the historical shard directories + for (auto shardIndex = 1; shardIndex < numShards * 2 - 1; ++shardIndex) + { + BEAST_EXPECT(!dirContains(primaryDir, shardIndex)); + BEAST_EXPECT(historicalDirsContains(shardIndex)); } } @@ -1494,18 +1484,20 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - BEAST_EXPECT(shardStore->getCompleteShards().empty()); + BEAST_EXPECT(shardStore->getShardInfo()->finalized().empty()); int oldestShardIndex{-1}; - std::uint64_t bitMask{0}; for (auto i = 0; i < numShards; ++i) { auto shardIndex{createShard(data, *shardStore, numShards)}; if (!BEAST_EXPECT( shardIndex && *shardIndex >= 1 && *shardIndex <= numShards)) + { return; + } - bitMask |= (1ll << *shardIndex); + BEAST_EXPECT(boost::icl::contains( + shardStore->getShardInfo()->finalized(), *shardIndex)); if (oldestShardIndex == -1) oldestShardIndex = *shardIndex; @@ -1522,6 +1514,137 @@ class DatabaseShard_test : public TestBase data.ledgers_[index]->info().hash, ledgerSeq)); } + void + testShardInfo(std::uint64_t const seedValue) + { + testcase("Shard info"); + + using namespace test::jtx; + beast::temp_dir shardDir; + Env env{*this, testConfig(shardDir.path())}; + + auto shardStore{env.app().getShardStore()}; + BEAST_EXPECT(shardStore); + + // Check shard store is empty + { + auto const shardInfo{shardStore->getShardInfo()}; + BEAST_EXPECT( + shardInfo->msgTimestamp().time_since_epoch().count() == 0); + BEAST_EXPECT(shardInfo->finalizedToString().empty()); + BEAST_EXPECT(shardInfo->finalized().empty()); + BEAST_EXPECT(shardInfo->incompleteToString().empty()); + BEAST_EXPECT(shardInfo->incomplete().empty()); + } + + // Create an incomplete shard with index 1 + TestData data(seedValue, dataSizeMax, 2); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; + if (!BEAST_EXPECT(shardStore->prepareLedger(2 * ledgersPerShard))) + return; + + // Check shard is incomplete + { + auto const shardInfo{shardStore->getShardInfo()}; + BEAST_EXPECT(shardInfo->finalizedToString().empty()); + BEAST_EXPECT(shardInfo->finalized().empty()); + BEAST_EXPECT(shardInfo->incompleteToString() == "1:0"); + BEAST_EXPECT( + shardInfo->incomplete().find(1) != + shardInfo->incomplete().end()); + } + + // Finalize the shard + { + auto shardIndex{createShard(data, *shardStore)}; + if (!BEAST_EXPECT(shardIndex && *shardIndex == 1)) + return; + } + + // Check shard is finalized + { + auto const shardInfo{shardStore->getShardInfo()}; + BEAST_EXPECT(shardInfo->finalizedToString() == "1"); + BEAST_EXPECT(boost::icl::contains(shardInfo->finalized(), 1)); + BEAST_EXPECT(shardInfo->incompleteToString().empty()); + BEAST_EXPECT(shardInfo->incomplete().empty()); + BEAST_EXPECT(!shardInfo->update(1, ShardState::finalized, 0)); + BEAST_EXPECT(shardInfo->setFinalizedFromString("2")); + BEAST_EXPECT(shardInfo->finalizedToString() == "2"); + BEAST_EXPECT(boost::icl::contains(shardInfo->finalized(), 2)); + } + + // Create an incomplete shard with index 2 + if (!BEAST_EXPECT(shardStore->prepareLedger(3 * ledgersPerShard))) + return; + + // Store 10 percent of the ledgers + for (std::uint32_t i = 0; i < (ledgersPerShard / 10); ++i) + { + auto const ledgerSeq{ + shardStore->prepareLedger(3 * ledgersPerShard)}; + if (!BEAST_EXPECT(ledgerSeq != std::nullopt)) + return; + + auto const arrInd{*ledgerSeq - ledgersPerShard - 1}; + if (!BEAST_EXPECT(saveLedger(*shardStore, *data.ledgers_[arrInd]))) + return; + + shardStore->setStored(data.ledgers_[arrInd]); + } + + auto const shardInfo{shardStore->getShardInfo()}; + BEAST_EXPECT(shardInfo->incompleteToString() == "2:10"); + BEAST_EXPECT( + shardInfo->incomplete().find(2) != shardInfo->incomplete().end()); + + auto const timeStamp{env.app().timeKeeper().now()}; + shardInfo->setMsgTimestamp(timeStamp); + BEAST_EXPECT(timeStamp == shardInfo->msgTimestamp()); + + // Check message + auto const msg{shardInfo->makeMessage(env.app())}; + Serializer s; + s.add32(HashPrefix::shardInfo); + + BEAST_EXPECT(msg.timestamp() != 0); + s.add32(msg.timestamp()); + + // Verify incomplete shard + { + BEAST_EXPECT(msg.incomplete_size() == 1); + + auto const& incomplete{msg.incomplete(0)}; + BEAST_EXPECT(incomplete.shardindex() == 2); + s.add32(incomplete.shardindex()); + + BEAST_EXPECT( + static_cast(incomplete.state()) == + ShardState::acquire); + s.add32(incomplete.state()); + + BEAST_EXPECT(incomplete.has_progress()); + BEAST_EXPECT(incomplete.progress() == 10); + s.add32(incomplete.progress()); + } + + // Verify finalized shard + BEAST_EXPECT(msg.has_finalized()); + BEAST_EXPECT(msg.finalized() == "1"); + s.addRaw(msg.finalized().data(), msg.finalized().size()); + + // Verify public key + auto slice{makeSlice(msg.publickey())}; + BEAST_EXPECT(publicKeyType(slice)); + + // Verify signature + BEAST_EXPECT(verify( + PublicKey(slice), s.slice(), makeSlice(msg.signature()), false)); + + BEAST_EXPECT(msg.peerchain_size() == 0); + } + public: DatabaseShard_test() : journal_("DatabaseShard_test", *this) { @@ -1531,20 +1654,20 @@ class DatabaseShard_test : public TestBase run() override { std::uint64_t const seedValue = 51; - testStandalone(); testCreateShard(seedValue); testReopenDatabase(seedValue + 10); - testGetCompleteShards(seedValue + 20); + testGetFinalShards(seedValue + 20); testPrepareShards(seedValue + 30); testImportShard(seedValue + 40); testCorruptedDatabase(seedValue + 50); testIllegalFinalKey(seedValue + 60); - testImport(seedValue + 70); + testImportNodeStore(seedValue + 70); testDeterministicShard(seedValue + 80); testImportWithHistoricalPaths(seedValue + 90); testPrepareWithHistoricalPaths(seedValue + 100); testOpenShardManagement(seedValue + 110); + testShardInfo(seedValue + 120); } }; diff --git a/src/test/nodestore/Database_test.cpp b/src/test/nodestore/Database_test.cpp index 67d3a6b3741..0f762ab3a82 100644 --- a/src/test/nodestore/Database_test.cpp +++ b/src/test/nodestore/Database_test.cpp @@ -598,9 +598,8 @@ class Database_test : public TestBase if (type == "memory") { - // Earliest ledger sequence tests + // Verify default earliest ledger sequence { - // Verify default earliest ledger sequence std::unique_ptr db = Manager::instance().make_Database( "test", @@ -673,6 +672,55 @@ class Database_test : public TestBase std::strcmp(e.what(), "earliest_seq set more than once") == 0); } + + // Verify default ledgers per shard + { + std::unique_ptr db = + Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + BEAST_EXPECT( + db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD); + } + + // Set an invalid ledgers per shard + try + { + nodeParams.set("ledgers_per_shard", "100"); + std::unique_ptr db = + Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + } + catch (std::runtime_error const& e) + { + BEAST_EXPECT( + std::strcmp(e.what(), "Invalid ledgers_per_shard") == 0); + } + + // Set a valid ledgers per shard + nodeParams.set("ledgers_per_shard", "256"); + std::unique_ptr db = Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + + // Verify database uses the ledgers per shard + BEAST_EXPECT(db->ledgersPerShard() == 256); } } diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index c413e59e2fb..dec1d879889 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -141,11 +141,6 @@ class PeerPartial : public Peer { } bool - hasShard(std::uint32_t shardIndex) const override - { - return false; - } - bool hasTxSet(uint256 const& hash) const override { return false; diff --git a/src/test/rpc/ShardArchiveHandler_test.cpp b/src/test/rpc/ShardArchiveHandler_test.cpp index d4452fc2959..ed007f496d9 100644 --- a/src/test/rpc/ShardArchiveHandler_test.cpp +++ b/src/test/rpc/ShardArchiveHandler_test.cpp @@ -166,13 +166,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); @@ -264,13 +269,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite { auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); @@ -361,19 +371,23 @@ class ShardArchiveHandler_test : public beast::unit_test::suite } auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("shard_verification_retry_interval", "1"); - section.set("shard_verification_max_attempts", "10000"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("shard_verification_retry_interval", "1"); + section.set("shard_verification_max_attempts", "10000"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); - std::uint8_t const numberOfDownloads = 10; // Create some ledgers so that the ShardArchiveHandler @@ -429,13 +443,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "1"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs)); @@ -503,13 +522,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "0"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "0"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs)); @@ -586,13 +610,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "1"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs)); @@ -614,7 +643,7 @@ class ShardArchiveHandler_test : public beast::unit_test::suite env.close(); } - env.app().getShardStore()->prepareShards({1}); + BEAST_EXPECT(env.app().getShardStore()->prepareShards({1})); auto handler = env.app().getShardArchiveHandler(); BEAST_EXPECT(handler);