From b136b2377c07dd739f01ac6b148d829ecac55ff5 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Tue, 26 Jan 2021 17:09:28 -0500 Subject: [PATCH] [FOLD] Address feedback --- Builds/CMake/RippledCore.cmake | 2 +- src/ripple/nodestore/Database.h | 92 ++++++++++++- src/ripple/nodestore/DatabaseShard.h | 55 +------- src/ripple/nodestore/ShardInfo.h | 52 +++++-- src/ripple/nodestore/Types.h | 19 +-- src/ripple/nodestore/impl/Database.cpp | 17 +++ .../nodestore/impl/DatabaseShardImp.cpp | 129 +++++++----------- src/ripple/nodestore/impl/DatabaseShardImp.h | 43 ------ src/ripple/nodestore/impl/Shard.cpp | 101 ++++++-------- src/ripple/nodestore/impl/Shard.h | 34 ++--- src/ripple/nodestore/impl/ShardInfo.cpp | 68 +++++---- src/ripple/overlay/impl/OverlayImpl.cpp | 22 +-- src/ripple/overlay/impl/PeerImp.cpp | 85 +++++------- src/ripple/proto/ripple.proto | 6 +- src/ripple/protocol/SystemParameters.h | 5 +- src/test/app/LedgerReplay_test.cpp | 5 - src/test/nodestore/DatabaseShard_test.cpp | 67 ++++----- src/test/nodestore/Database_test.cpp | 52 ++++++- src/test/rpc/ShardArchiveHandler_test.cpp | 119 ++++++++++------ 19 files changed, 519 insertions(+), 454 deletions(-) diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index e36218e58fd..11b49adb1de 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -522,7 +522,7 @@ target_sources (rippled PRIVATE src/ripple/nodestore/impl/ManagerImp.cpp src/ripple/nodestore/impl/NodeObject.cpp src/ripple/nodestore/impl/Shard.cpp - "../../src/ripple/nodestore/impl/ShardInfo.cpp" + src/ripple/nodestore/impl/ShardInfo.cpp src/ripple/nodestore/impl/TaskQueue.cpp #[===============================[ main sources: diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index da0677292ce..9c0c0ed152f 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -231,14 +231,79 @@ class Database : public Stoppable void onChildrenStopped() override; + /** @return The maximum number of ledgers stored in a shard + */ + [[nodiscard]] std::uint32_t + ledgersPerShard() const noexcept + { + return ledgersPerShard_; + } + /** @return The earliest ledger sequence allowed */ - std::uint32_t - earliestLedgerSeq() const + [[nodiscard]] std::uint32_t + earliestLedgerSeq() const noexcept { return earliestLedgerSeq_; } + /** @return The earliest shard index + */ + [[nodiscard]] std::uint32_t + earliestShardIndex() const noexcept + { + return earliestShardIndex_; + } + + /** Calculates the first ledger sequence for a given shard index + + @param shardIndex The shard index considered + @return The first ledger sequence pertaining to the shard index + */ + [[nodiscard]] std::uint32_t + firstLedgerSeq(std::uint32_t shardIndex) const noexcept + { + assert(shardIndex >= earliestShardIndex_); + if (shardIndex <= earliestShardIndex_) + return earliestLedgerSeq_; + return 1 + (shardIndex * ledgersPerShard_); + } + + /** Calculates the last ledger sequence for a given shard index + + @param shardIndex The shard index considered + @return The last ledger sequence pertaining to the shard index + */ + [[nodiscard]] std::uint32_t + lastLedgerSeq(std::uint32_t shardIndex) const noexcept + { + assert(shardIndex >= earliestShardIndex_); + return (shardIndex + 1) * ledgersPerShard_; + } + + /** Calculates the shard index for a given ledger sequence + + @param ledgerSeq ledger sequence + @return The shard index of the ledger sequence + */ + [[nodiscard]] std::uint32_t + seqToShardIndex(std::uint32_t ledgerSeq) const noexcept + { + assert(ledgerSeq >= earliestLedgerSeq_); + return (ledgerSeq - 1) / ledgersPerShard_; + } + + /** Calculates the maximum ledgers for a given shard index + + @param shardIndex The shard index considered + @return The maximum ledgers pertaining to the shard index + + @note The earliest shard may store less if the earliest ledger + sequence truncates its beginning + */ + [[nodiscard]] std::uint32_t + maxLedgers(std::uint32_t shardIndex) const noexcept; + protected: beast::Journal const j_; Scheduler& scheduler_; @@ -247,6 +312,25 @@ class Database : public Stoppable std::atomic fetchHitCount_{0}; std::atomic fetchSz_{0}; + // The default is DEFAULT_LEDGERS_PER_SHARD (16384) to match the XRP ledger + // network. Can be set through the configuration file using the + // 'ledgers_per_shard' field under the 'node_db' and 'shard_db' stanzas. + // If specified, the value must be a multiple of 256 and equally assigned + // in both stanzas. Only unit tests or alternate networks should change + // this value. + std::uint32_t const ledgersPerShard_; + + // The default is XRP_LEDGER_EARLIEST_SEQ (32570) to match the XRP ledger + // network's earliest allowed ledger sequence. Can be set through the + // configuration file using the 'earliest_seq' field under the 'node_db' + // and 'shard_db' stanzas. If specified, the value must be greater than zero + // and equally assigned in both stanzas. Only unit tests or alternate + // networks should change this value. + std::uint32_t const earliestLedgerSeq_; + + // The earliest shard index + std::uint32_t const earliestShardIndex_; + void stopReadThreads(); @@ -302,10 +386,6 @@ class Database : public Stoppable std::vector readThreads_; bool readShut_{false}; - // The default is 32570 to match the XRP ledger network's earliest - // allowed sequence. Alternate networks may set this value. - std::uint32_t const earliestLedgerSeq_; - virtual std::shared_ptr fetchNodeObject( uint256 const& hash, diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index e50092bcca1..5995704f36e 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -61,7 +61,7 @@ class DatabaseShard : public Database @return `true` if the database initialized without error */ - virtual [[nodiscard]] bool + [[nodiscard]] virtual bool init() = 0; /** Prepare to store a new ledger in the shard being acquired @@ -75,7 +75,7 @@ class DatabaseShard : public Database between requests. @implNote adds a new writable shard if necessary */ - virtual [[nodiscard]] boost::optional + [[nodiscard]] virtual boost::optional prepareLedger(std::uint32_t validLedgerSeq) = 0; /** Prepare one or more shard indexes to be imported into the database @@ -118,7 +118,7 @@ class DatabaseShard : public Database @param seq The sequence of the ledger @return The ledger if found, nullptr otherwise */ - virtual [[nodiscard]] std::shared_ptr + [[nodiscard]] virtual std::shared_ptr fetchLedger(uint256 const& hash, std::uint32_t seq) = 0; /** Notifies the database that the given ledger has been @@ -133,61 +133,16 @@ class DatabaseShard : public Database @return Information about shards held by this node */ - virtual [[nodiscard]] std::unique_ptr + [[nodiscard]] virtual std::unique_ptr getShardInfo() = 0; - /** @return The maximum number of ledgers stored in a shard - */ - virtual std::uint32_t - ledgersPerShard() const = 0; - - /** @return The earliest shard index - */ - virtual std::uint32_t - earliestShardIndex() const = 0; - - /** Calculates the shard index for a given ledger sequence - - @param seq ledger sequence - @return The shard index of the ledger sequence - */ - virtual std::uint32_t - seqToShardIndex(std::uint32_t seq) const = 0; - - /** Calculates the first ledger sequence for a given shard index - - @param shardIndex The shard index considered - @return The first ledger sequence pertaining to the shard index - */ - virtual std::uint32_t - firstLedgerSeq(std::uint32_t shardIndex) const = 0; - - /** Calculates the last ledger sequence for a given shard index - - @param shardIndex The shard index considered - @return The last ledger sequence pertaining to the shard index - */ - virtual std::uint32_t - lastLedgerSeq(std::uint32_t shardIndex) const = 0; - /** Returns the root database directory */ virtual boost::filesystem::path const& getRootDir() const = 0; - - /** The number of ledgers in a shard */ - static constexpr std::uint32_t ledgersPerShardDefault{16384u}; }; -constexpr std::uint32_t -seqToShardIndex( - std::uint32_t ledgerSeq, - std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault) -{ - return (ledgerSeq - 1) / ledgersPerShard; -} - -extern [[nodiscard]] std::unique_ptr +extern std::unique_ptr make_ShardStore( Application& app, Stoppable& parent, diff --git a/src/ripple/nodestore/ShardInfo.h b/src/ripple/nodestore/ShardInfo.h index 389a2d2be5e..d6929f746e6 100644 --- a/src/ripple/nodestore/ShardInfo.h +++ b/src/ripple/nodestore/ShardInfo.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED #define RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED +#include #include #include #include @@ -32,33 +33,62 @@ namespace NodeStore { */ class ShardInfo { -public: - struct Incomplete +private: + class Incomplete { + public: Incomplete() = delete; - Incomplete(State state_, float progress_) - : state(state_), progress(progress_) + Incomplete(ShardState state_, Shard::Progress const& progress) + : state(state_) + , percentProgress(calculatePercent(progress.soFar, progress.target)) { } - State const state; - float const progress; + ShardState const state; + std::uint32_t const percentProgress; }; +public: [[nodiscard]] std::string - finalToString() const; + finalizedToString() const; + + [[nodiscard]] bool + setFinalizedFromString(std::string const& str) + { + return from_string(finalized_, str); + } + + [[nodiscard]] RangeSet const& + finalized() const + { + return finalized_; + } [[nodiscard]] std::string incompleteToString() const; + [[nodiscard]] std::map const& + incomplete() const + { + return incomplete_; + } + + // Returns true if successful or false because of a duplicate index + bool + update( + std::uint32_t shardIndex, + ShardState state_, + Shard::Progress const& progress); + [[nodiscard]] protocol::TMPeerShardInfoV2 makeMessage(Application& app) const; - // Complete and verified immutable shards - RangeSet final; +private: + // Finalized immutable shards + RangeSet finalized_; - // Incomplete shards being acquired or verified - std::map incomplete; + // Incomplete shards being acquired or finalized + std::map incomplete_; }; } // namespace NodeStore diff --git a/src/ripple/nodestore/Types.h b/src/ripple/nodestore/Types.h index 6e6455652d8..781b0bf7a59 100644 --- a/src/ripple/nodestore/Types.h +++ b/src/ripple/nodestore/Types.h @@ -54,22 +54,17 @@ enum Status { /** A batch of NodeObjects to write at once. */ using Batch = std::vector>; +} // namespace NodeStore + /** Shard states. */ -enum class State { - acquire, // Being acquired - complete, // Backend contains all ledgers, requires finalizing - finalizing, // Being finalized - final, // Database verified, shard is immutable +enum class ShardState { + acquire, // Acquiring ledgers + complete, // Backend is ledger complete, database is unverified + finalizing, // Verifying database + finalized, // Database verified, shard is immutable queued // Queued to be finalized }; -static constexpr State acquire = State::acquire; -static constexpr State complete = State::complete; -static constexpr State finalizing = State::finalizing; -static constexpr State final = State::final; -static constexpr State queued = State::queued; - -} // namespace NodeStore } // namespace ripple #endif diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 579443ace8f..d9ae7db4d3a 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -39,9 +39,17 @@ Database::Database( : Stoppable(name, parent.getRoot()) , j_(journal) , scheduler_(scheduler) + , ledgersPerShard_(get( + config, + "ledgers_per_shard", + DEFAULT_LEDGERS_PER_SHARD)) , earliestLedgerSeq_( get(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) + , earliestShardIndex_(seqToShardIndex(earliestLedgerSeq_)) { + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + Throw("Invalid ledgers_per_shard"); + if (earliestLedgerSeq_ < 1) Throw("Invalid earliest_seq"); @@ -74,6 +82,15 @@ Database::onChildrenStopped() stopped(); } +std::uint32_t +Database::maxLedgers(std::uint32_t shardIndex) const noexcept +{ + if (shardIndex == earliestShardIndex_) + return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1; + + return ledgersPerShard_; +} + void Database::stopReadThreads() { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 0d2c174726e..253bfcde2cc 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -58,7 +58,6 @@ DatabaseShardImp::DatabaseShardImp( , app_(app) , parent_(parent) , taskQueue_(std::make_unique(*this)) - , earliestShardIndex_(seqToShardIndex(earliestLedgerSeq())) , avgShardFileSz_(ledgersPerShard_ * kilobytes(192ull)) , openFinalLimit_( app.config().getValueFor(SizedItem::openFinalLimit, boost::none)) @@ -148,12 +147,12 @@ DatabaseShardImp::init() // Ignore values below the earliest shard index auto const shardIndex{std::stoul(dirName)}; - if (shardIndex < earliestShardIndex()) + if (shardIndex < earliestShardIndex_) { JLOG(j_.debug()) << "shard " << shardIndex << " ignored, comes before earliest shard index " - << earliestShardIndex(); + << earliestShardIndex_; continue; } @@ -182,13 +181,13 @@ DatabaseShardImp::init() switch (shard->getState()) { - case final: + case ShardState::finalized: if (++openFinals > openFinalLimit_) shard->tryClose(); shards_.emplace(shardIndex, std::move(shard)); break; - case complete: + case ShardState::complete: finalizeShard( shards_.emplace(shardIndex, std::move(shard)) .first->second, @@ -196,7 +195,7 @@ DatabaseShardImp::init() boost::none); break; - case acquire: + case ShardState::acquire: if (acquireIndex_ != 0) { JLOG(j_.error()) @@ -341,18 +340,18 @@ DatabaseShardImp::prepareShards(std::vector const& shardIndexes) for (auto const shardIndex : shardIndexes) { - if (shardIndex < earliestShardIndex()) + if (shardIndex < earliestShardIndex_) { return fail( "comes before earliest shard index " + - std::to_string(earliestShardIndex()), + std::to_string(earliestShardIndex_), shardIndex); } // If we are synced to the network, check if the shard index is // greater or equal to the current or validated shard index. auto seqCheck = [&](std::uint32_t ledgerSeq) { - if (ledgerSeq >= earliestLedgerSeq() && + if (ledgerSeq >= earliestLedgerSeq_ && shardIndex >= seqToShardIndex(ledgerSeq)) { return fail("invalid index", shardIndex); @@ -527,7 +526,8 @@ DatabaseShardImp::importShard( auto shard{std::make_unique( app_, *this, shardIndex, dstDir.parent_path(), j_)}; - if (!shard->init(scheduler_, *ctx_) || shard->getState() != complete) + if (!shard->init(scheduler_, *ctx_) || + shard->getState() != ShardState::complete) { shard.reset(); renameDir(dstDir, srcDir); @@ -576,9 +576,9 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t ledgerSeq) // Ledger must be stored in a final or acquiring shard switch (shard->getState()) { - case final: + case ShardState::finalized: break; - case acquire: + case ShardState::acquire: if (shard->containsLedger(ledgerSeq)) break; [[fallthrough]]; @@ -718,26 +718,15 @@ DatabaseShardImp::getShardInfo() { if (auto const shard = weak.lock()) { - auto const state{shard->getState()}; - switch (state) - { - case final: - shardInfo->final.insert(shard->index()); - break; - - default: - shardInfo->incomplete.emplace( - shard->index(), - ShardInfo::Incomplete(state, shard->getProgress())); - break; - } + shardInfo->update( + shard->index(), shard->getState(), shard->getProgress()); } } for (auto const shardIndex : preparedIndexes) { - shardInfo->incomplete.emplace( - shardIndex, ShardInfo::Incomplete(queued, 0)); + shardInfo->update( + shardIndex, ShardState::queued, {0, maxLedgers(shardIndex)}); } return shardInfo; @@ -808,8 +797,7 @@ DatabaseShardImp::import(Database& source) std::shared_ptr ledger; std::uint32_t ledgerSeq; std::tie(ledger, ledgerSeq, std::ignore) = loadLedgerHelper( - "WHERE LedgerSeq >= " + - std::to_string(earliestLedgerSeq()) + + "WHERE LedgerSeq >= " + std::to_string(earliestLedgerSeq_) + " order by LedgerSeq " + (ascendSort ? "asc" : "desc") + " limit 1", app_, @@ -893,13 +881,9 @@ DatabaseShardImp::import(Database& source) // Verify SQLite ledgers are in the node store { auto const firstSeq{firstLedgerSeq(shardIndex)}; - auto const lastSeq{ - std::max(firstSeq, lastLedgerSeq(shardIndex))}; - auto const numLedgers{ - shardIndex == earliestShardIndex() ? lastSeq - firstSeq + 1 - : ledgersPerShard_}; + auto const lastSeq{lastLedgerSeq(shardIndex)}; auto ledgerHashes{getHashesByIndex(firstSeq, lastSeq, app_)}; - if (ledgerHashes.size() != numLedgers) + if (ledgerHashes.size() != maxLedgers(shardIndex)) continue; bool valid{true}; @@ -967,7 +951,7 @@ DatabaseShardImp::import(Database& source) using namespace boost::filesystem; bool success{false}; - if (lastLedgerHash && shard->getState() == complete) + if (lastLedgerHash && shard->getState() == ShardState::complete) { // Store shard final key Serializer s; @@ -1125,9 +1109,7 @@ DatabaseShardImp::sweep() { if (auto const shard{weak.lock()}; shard && shard->isOpen()) { - shard->sweep(); - - if (shard->getState() == final) + if (shard->getState() == ShardState::finalized) openFinals.emplace_back(std::move(shard)); } } @@ -1170,30 +1152,30 @@ DatabaseShardImp::initConfig(std::lock_guard const&) Config const& config{app_.config()}; Section const& section{config.section(ConfigSection::shardDatabase())}; - { - // The earliest ledger sequence defaults to XRP_LEDGER_EARLIEST_SEQ. - // A custom earliest ledger sequence can be set through the - // configuration file using the 'earliest_seq' field under the - // 'node_db' and 'shard_db' stanzas. If specified, this field must - // have a value greater than zero and be equally assigned in - // both stanzas. - - std::uint32_t shardDBEarliestSeq{0}; - get_if_exists( - section, "earliest_seq", shardDBEarliestSeq); + auto compare = [&](std::string const& name, std::uint32_t defaultValue) { + std::uint32_t shardDBValue{defaultValue}; + get_if_exists(section, name, shardDBValue); - std::uint32_t nodeDBEarliestSeq{0}; + std::uint32_t nodeDBValue{defaultValue}; get_if_exists( - config.section(ConfigSection::nodeDatabase()), - "earliest_seq", - nodeDBEarliestSeq); + config.section(ConfigSection::nodeDatabase()), name, nodeDBValue); - if (shardDBEarliestSeq != nodeDBEarliestSeq) - { - return fail( - "and [" + ConfigSection::nodeDatabase() + - "] define different 'earliest_seq' values"); - } + return shardDBValue == nodeDBValue; + }; + + // If ledgers_per_shard or earliest_seq are specified, + // they must be equally assigned in 'node_db' + if (!compare("ledgers_per_shard", DEFAULT_LEDGERS_PER_SHARD)) + { + return fail( + "and [" + ConfigSection::nodeDatabase() + "] define different '" + + "ledgers_per_shard" + "' values"); + } + if (!compare("earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) + { + return fail( + "and [" + ConfigSection::nodeDatabase() + "] define different '" + + "earliest_seq" + "' values"); } using namespace boost::filesystem; @@ -1225,20 +1207,6 @@ DatabaseShardImp::initConfig(std::lock_guard const&) } } - if (section.exists("ledgers_per_shard")) - { - // To be set only in standalone for testing - if (!config.standalone()) - return fail("'ledgers_per_shard' only honored in stand alone"); - - ledgersPerShard_ = get(section, "ledgers_per_shard"); - if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) - return fail("'ledgers_per_shard' must be a multiple of 256"); - - earliestShardIndex_ = seqToShardIndex(earliestLedgerSeq()); - avgShardFileSz_ = ledgersPerShard_ * kilobytes(192); - } - // NuDB is the default and only supported permanent storage backend backendName_ = get(section, "type", "nudb"); if (!boost::iequals(backendName_, "NuDB")) @@ -1271,7 +1239,7 @@ DatabaseShardImp::findAcquireIndex( std::uint32_t validLedgerSeq, std::lock_guard const&) { - if (validLedgerSeq < earliestLedgerSeq()) + if (validLedgerSeq < earliestLedgerSeq_) return boost::none; auto const maxShardIndex{[this, validLedgerSeq]() { @@ -1280,7 +1248,7 @@ DatabaseShardImp::findAcquireIndex( --shardIndex; return shardIndex; }()}; - auto const maxNumShards{maxShardIndex - earliestShardIndex() + 1}; + auto const maxNumShards{maxShardIndex - earliestShardIndex_ + 1}; // Check if the shard store has all shards if (shards_.size() >= maxNumShards) @@ -1294,8 +1262,7 @@ DatabaseShardImp::findAcquireIndex( std::vector available; available.reserve(maxNumShards - shards_.size()); - for (auto shardIndex = earliestShardIndex(); - shardIndex <= maxShardIndex; + for (auto shardIndex = earliestShardIndex_; shardIndex <= maxShardIndex; ++shardIndex) { if (shards_.find(shardIndex) == shards_.end() && @@ -1320,7 +1287,7 @@ DatabaseShardImp::findAcquireIndex( // chances of running more than 30 times is less than 1 in a billion for (int i = 0; i < 40; ++i) { - auto const shardIndex{rand_int(earliestShardIndex(), maxShardIndex)}; + auto const shardIndex{rand_int(earliestShardIndex_, maxShardIndex)}; if (shards_.find(shardIndex) == shards_.end() && preparedIndexes_.find(shardIndex) == preparedIndexes_.end()) { @@ -1534,7 +1501,7 @@ DatabaseShardImp::setStoredInShard( return false; } - if (shard->getState() == complete) + if (shard->getState() == ShardState::complete) { std::lock_guard lock(mutex_); if (auto const it{shards_.find(shard->index())}; it != shards_.end()) @@ -1584,7 +1551,7 @@ DatabaseShardImp::shardBoundaryIndex() const { auto const validIndex = app_.getLedgerMaster().getValidLedgerIndex(); - if (validIndex < earliestLedgerSeq()) + if (validIndex < earliestLedgerSeq_) return 0; // Shards with an index earlier than the recent shard boundary index diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 9df79df5ce3..16a8fab321e 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -76,41 +76,6 @@ class DatabaseShardImp : public DatabaseShard std::unique_ptr getShardInfo() override; - std::uint32_t - ledgersPerShard() const override - { - return ledgersPerShard_; - } - - std::uint32_t - earliestShardIndex() const override - { - return earliestShardIndex_; - } - - std::uint32_t - seqToShardIndex(std::uint32_t ledgerSeq) const override - { - assert(ledgerSeq >= earliestLedgerSeq()); - return NodeStore::seqToShardIndex(ledgerSeq, ledgersPerShard_); - } - - std::uint32_t - firstLedgerSeq(std::uint32_t shardIndex) const override - { - assert(shardIndex >= earliestShardIndex_); - if (shardIndex <= earliestShardIndex_) - return earliestLedgerSeq(); - return 1 + (shardIndex * ledgersPerShard_); - } - - std::uint32_t - lastLedgerSeq(std::uint32_t shardIndex) const override - { - assert(shardIndex >= earliestShardIndex_); - return (shardIndex + 1) * ledgersPerShard_; - } - boost::filesystem::path const& getRootDir() const override { @@ -208,14 +173,6 @@ class DatabaseShardImp : public DatabaseShard // Storage space utilized by the shard store (in bytes) std::uint64_t fileSz_{0}; - // Each shard stores 16384 ledgers. The earliest shard may store - // less if the earliest ledger sequence truncates its beginning. - // The value should only be altered for unit tests. - std::uint32_t ledgersPerShard_ = ledgersPerShardDefault; - - // The earliest shard index - std::uint32_t earliestShardIndex_; - // Average storage space required by a shard (in bytes) std::uint64_t avgShardFileSz_; diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 94642bb6259..1e57e716dc3 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -53,9 +53,7 @@ Shard::Shard( , index_(index) , firstSeq_(db.firstLedgerSeq(index)) , lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index))) - , maxLedgers_( - index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1 - : db.ledgersPerShard()) + , maxLedgers_(db.maxLedgers(index)) , dir_((dir.empty() ? db.getRootDir() : dir) / std::to_string(index_)) { } @@ -142,7 +140,7 @@ bool Shard::tryClose() { // Keep database open if being acquired or finalized - if (state_ != final) + if (state_ != ShardState::finalized) return false; std::lock_guard lock(mutex_); @@ -185,7 +183,7 @@ Shard::tryClose() boost::optional Shard::prepare() { - if (state_ != acquire) + if (state_ != ShardState::acquire) { JLOG(j_.warn()) << "shard " << index_ << " prepare called when not acquiring"; @@ -208,7 +206,7 @@ Shard::prepare() bool Shard::storeNodeObject(std::shared_ptr const& nodeObject) { - if (state_ != acquire) + if (state_ != ShardState::acquire) { // The import node store case is an exception if (nodeObject->getHash() != finalKey) @@ -292,7 +290,7 @@ Shard::storeLedger( std::shared_ptr const& next) { StoreLedgerResult result; - if (state_ != acquire) + if (state_ != ShardState::acquire) { // Ignore residual calls from InboundLedgers JLOG(j_.trace()) << "shard " << index_ << ". Not acquiring"; @@ -414,7 +412,7 @@ Shard::storeLedger( bool Shard::setLedgerStored(std::shared_ptr const& ledger) { - if (state_ != acquire) + if (state_ != ShardState::acquire) { // Ignore residual calls from InboundLedgers JLOG(j_.trace()) << "shard " << index_ << " not acquiring"; @@ -492,14 +490,9 @@ Shard::setLedgerStored(std::shared_ptr const& ledger) } // Update progress - auto const numStored{boost::icl::length(acquireInfo_->storedSeqs)}; - if (numStored >= maxLedgers_) - { - state_ = complete; - progress_ = 0.0f; - } - else - progress_ = static_cast(numStored) / maxLedgers_; + progress_ = boost::icl::length(acquireInfo_->storedSeqs); + if (progress_ == maxLedgers_) + state_ = ShardState::complete; setFileStats(lock); JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence " @@ -512,7 +505,7 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const { if (ledgerSeq < firstSeq_ || ledgerSeq > lastSeq_) return false; - if (state_ != acquire) + if (state_ != ShardState::acquire) return true; std::lock_guard lock(mutex_); @@ -525,12 +518,6 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq); } -void -Shard::sweep() -{ - // nothing to do -} - std::chrono::steady_clock::time_point Shard::getLastUse() const { @@ -539,14 +526,14 @@ Shard::getLastUse() const } std::pair -Shard::getFileInfo() const noexcept +Shard::getFileInfo() const { std::lock_guard lock(mutex_); return {fileSz_, fdRequired_}; } std::int32_t -Shard::getWriteLoad() noexcept +Shard::getWriteLoad() { auto const scopedCount{makeBackendCount()}; if (!scopedCount) @@ -555,7 +542,7 @@ Shard::getWriteLoad() noexcept } bool -Shard::isLegacy() const noexcept +Shard::isLegacy() const { std::lock_guard lock(mutex_); return legacy_; @@ -575,7 +562,8 @@ Shard::finalize( << (ledgerSeq == 0 ? "" : ". Ledger sequence " + std::to_string(ledgerSeq)); - state_ = finalizing; + state_ = ShardState::finalizing; + progress_ = 0; return false; }; @@ -585,8 +573,8 @@ Shard::finalize( try { - state_ = finalizing; - progress_ = 0.0f; + state_ = ShardState::finalizing; + progress_ = 0; /* TODO MP @@ -727,11 +715,11 @@ Shard::finalize( hash = ledger->info().parentHash; next = std::move(ledger); - --ledgerSeq; // Update progress - auto const numStored{maxLedgers_ - (ledgerSeq - firstSeq_)}; - progress_ = static_cast(numStored) / maxLedgers_; + progress_ = maxLedgers_ - (ledgerSeq - firstSeq_); + + --ledgerSeq; fullBelowCache->reset(); treeNodeCache->reset(); @@ -795,7 +783,7 @@ Shard::finalize( } lastAccess_ = std::chrono::steady_clock::now(); - state_ = final; + state_ = ShardState::finalized; if (!initSQLite(lock)) return fail("failed to initialize SQLite databases"); @@ -809,9 +797,6 @@ Shard::finalize( ". Error: " + e.what()); } - state_ = final; - progress_ = 0.0f; - return true; } @@ -827,8 +812,8 @@ Shard::open(std::lock_guard const& lock) txSQLiteDB_.reset(); acquireInfo_.reset(); - state_ = acquire; - progress_ = 0.0f; + state_ = ShardState::acquire; + progress_ = 0; if (!preexist) remove_all(dir_); @@ -840,27 +825,26 @@ Shard::open(std::lock_guard const& lock) return false; }; auto createAcquireInfo = [this, &config]() { - acquireInfo_ = std::make_unique(); - DatabaseCon::Setup setup; setup.startUp = config.standalone() ? config.LOAD : config.START_UP; setup.standAlone = config.standalone(); setup.dataDir = dir_; setup.useGlobalPragma = true; + acquireInfo_ = std::make_unique(); acquireInfo_->SQLiteDB = std::make_unique( setup, AcquireShardDBName, AcquireShardDBPragma, AcquireShardDBInit, DatabaseCon::CheckpointerSetup{&app_.getJobQueue(), &app_.logs()}); - state_ = acquire; + + state_ = ShardState::acquire; + progress_ = 0; }; try { - progress_ = 0.0f; - // Open or create the NuDB key/value store preexist = exists(dir_); backend_->open(!preexist); @@ -909,17 +893,14 @@ Shard::open(std::lock_guard const& lock) } // Check if backend is complete - auto const numStored{boost::icl::length(storedSeqs)}; - if (numStored == maxLedgers_) - state_ = complete; - else - progress_ = static_cast(numStored) / maxLedgers_; + progress_ = boost::icl::length(storedSeqs); + if (progress_ == maxLedgers_) + state_ = ShardState::complete; } } else { - // A shard that is final or its backend is complete - // and ready to be finalized + // A shard with a finalized or complete state std::shared_ptr nodeObject; if (backend_->fetch(finalKey.data(), &nodeObject) != Status::ok) { @@ -942,10 +923,12 @@ Shard::open(std::lock_guard const& lock) if (exists(dir_ / LgrDBName) && exists(dir_ / TxDBName)) { lastAccess_ = std::chrono::steady_clock::now(); - state_ = final; + state_ = ShardState::finalized; } else - state_ = complete; + state_ = ShardState::complete; + + progress_ = maxLedgers_; } } catch (std::exception const& e) @@ -971,7 +954,7 @@ Shard::initSQLite(std::lock_guard const&) setup.startUp = config.standalone() ? config.LOAD : config.START_UP; setup.standAlone = config.standalone(); setup.dataDir = dir_; - setup.useGlobalPragma = (state_ != complete); + setup.useGlobalPragma = (state_ != ShardState::complete); return setup; }(); @@ -985,9 +968,9 @@ Shard::initSQLite(std::lock_guard const&) switch (state_) { - case complete: - case finalizing: - case final: + case ShardState::complete: + case ShardState::finalizing: + case ShardState::finalized: lgrSQLiteDB_ = std::make_unique( setup, LgrDBName, FinalShardDBPragma, LgrDBInit); lgrSQLiteDB_->getSession() << boost::str( @@ -1003,8 +986,8 @@ Shard::initSQLite(std::lock_guard const&) SizedItem::txnDBCache, boost::none))); break; - // case acquire: - // case queued: + // case ShardState::acquire: + // case ShardState::queued: default: // Incomplete shards use a Write Ahead Log for performance lgrSQLiteDB_ = std::make_unique( @@ -1343,7 +1326,7 @@ Shard::makeBackendCount() if (!open(lock)) return {nullptr}; } - else if (state_ == final) + else if (state_ == ShardState::finalized) lastAccess_ = std::chrono::steady_clock::now(); return Shard::Count(&backendCount_); diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 641169c3f15..aec6703779f 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -140,9 +140,6 @@ class Shard final [[nodiscard]] bool containsLedger(std::uint32_t ledgerSeq) const; - void - sweep(); - [[nodiscard]] std::uint32_t index() const noexcept { @@ -162,30 +159,35 @@ class Shard final utilized and the second item is the number of file descriptors required. */ [[nodiscard]] std::pair - getFileInfo() const noexcept; + getFileInfo() const; - [[nodiscard]] State + [[nodiscard]] ShardState getState() const noexcept { return state_; } - /** Returns a value 0 - 1.0 signifying how complete the current state - of the shard is. This excludes the final state, where 0 is returned. + /** Returns a struct signifying how complete + the current state of the shard is. */ - [[nodiscard]] float + struct Progress + { + std::uint32_t const soFar; + std::uint32_t const target; + }; + [[nodiscard]] Progress getProgress() const noexcept { - return progress_; + return {progress_, maxLedgers_}; } [[nodiscard]] std::int32_t - getWriteLoad() noexcept; + getWriteLoad(); /** Returns `true` if shard is older, without final key data */ [[nodiscard]] bool - isLegacy() const noexcept; + isLegacy() const; /** Finalize shard by walking its ledgers and verifying each Merkle tree. @@ -299,7 +301,7 @@ class Shard final std::unique_ptr txSQLiteDB_; // Tracking information used only when acquiring a shard from the network. - // If the shard is final, this member will be null. + // If the shard is finalized, this member will be null. std::unique_ptr acquireInfo_; // Older shard without an acquire database or final key @@ -310,15 +312,15 @@ class Shard final std::atomic stop_{false}; // State of the shard - std::atomic state_{State::acquire}; + std::atomic state_{ShardState::acquire}; - // A value 0 - 1.0 signifying how complete the current state of the shard is - std::atomic progress_{0.0f}; + // Number of ledgers processed for the current shard state + std::atomic progress_{0}; // Determines if the shard directory should be removed in the destructor std::atomic removeOnDestroy_{false}; - // The time of the last access of a shard that has a final state + // The time of the last access of a shard with a finalized state std::chrono::steady_clock::time_point lastAccess_; // Open shard databases diff --git a/src/ripple/nodestore/impl/ShardInfo.cpp b/src/ripple/nodestore/impl/ShardInfo.cpp index 294cf37b682..98a95f6d138 100644 --- a/src/ripple/nodestore/impl/ShardInfo.cpp +++ b/src/ripple/nodestore/impl/ShardInfo.cpp @@ -17,7 +17,9 @@ */ //============================================================================== +#include #include +#include #include @@ -25,10 +27,10 @@ namespace ripple { namespace NodeStore { std::string -ShardInfo::finalToString() const +ShardInfo::finalizedToString() const { - if (!final.empty()) - return ripple::to_string(final); + if (!finalized_.empty()) + return ripple::to_string(finalized_); return {}; } @@ -36,14 +38,12 @@ std::string ShardInfo::incompleteToString() const { std::string result; - if (!incomplete.empty()) + if (!incomplete_.empty()) { - for (auto const& [shardIndex, incomplete_] : incomplete) + for (auto const& [shardIndex, incomplete] : incomplete_) { result += std::to_string(shardIndex) + ":" + - std::to_string(static_cast( - incomplete_.progress * 100)) + - ","; + std::to_string(incomplete.percentProgress) + ","; } result.pop_back(); } @@ -51,6 +51,24 @@ ShardInfo::incompleteToString() const return result; } +bool +ShardInfo::update( + std::uint32_t shardIndex, + ShardState state, + Shard::Progress const& progress) +{ + if (state == ShardState::finalized) + { + if (boost::icl::contains(finalized_, shardIndex)) + return false; + + finalized_.insert(shardIndex); + return true; + } + + return incomplete_.emplace(shardIndex, Incomplete(state, progress)).second; +} + protocol::TMPeerShardInfoV2 ShardInfo::makeMessage(Application& app) const { @@ -58,48 +76,42 @@ ShardInfo::makeMessage(Application& app) const Serializer s; s.add32(HashPrefix::shardInfo); - if (!incomplete.empty()) + if (!incomplete_.empty()) { - message.mutable_incomplete()->Reserve(incomplete.size()); - for (auto const& [shardIndex, incomplete_] : incomplete) + message.mutable_incomplete()->Reserve(incomplete_.size()); + for (auto const& [shardIndex, incomplete] : incomplete_) { auto tmIncomplete{message.add_incomplete()}; tmIncomplete->set_shardindex(shardIndex); s.add32(shardIndex); - auto const state{static_cast(incomplete_.state)}; + auto const state{static_cast(incomplete.state)}; tmIncomplete->set_state(state); s.add32(state); - // Report an integer within a range of 1 to 99 - auto const progress{std::min( - 99u, static_cast(incomplete_.progress * 100))}; - if (progress >= 1) + // Set progress if greater than zero + if (incomplete.percentProgress > 0) { - tmIncomplete->set_progress(progress); - s.add32(progress); + tmIncomplete->set_progress(incomplete.percentProgress); + s.add32(incomplete.percentProgress); } } } - if (!final.empty()) + if (!finalized_.empty()) { - auto const final_{ripple::to_string(final)}; - message.set_final(final_); - s.addRaw(final_.data(), final_.size()); + auto const str{ripple::to_string(finalized_)}; + message.set_finalized(str); + s.addRaw(str.data(), str.size()); } // Set the public key auto const& publicKey{app.nodeIdentity().first}; message.set_publickey(publicKey.data(), publicKey.size()); - // Create a digest using final and incomplete shards - auto const digest{sha512Half(s.slice())}; - - // Encrypt the digest using the node private key - auto const signature{ - signDigest(publicKey, app.nodeIdentity().second, digest)}; + // Create a digital signature using the node private key + auto const signature{sign(publicKey, app.nodeIdentity().second, s.slice())}; // Set the digital signature message.set_signature(signature.data(), signature.size()); diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index a7c2f3cf731..e041ac3cfa2 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -739,9 +739,9 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays) toBase58(TokenType::NodePublic, app_.nodeIdentity().first); auto const shardInfo{shardStore->getShardInfo()}; - if (!shardInfo->final.empty()) - jv[jss::complete_shards] = shardInfo->finalToString(); - if (!shardInfo->incomplete.empty()) + if (!shardInfo->finalized().empty()) + jv[jss::complete_shards] = shardInfo->finalizedToString(); + if (!shardInfo->incomplete().empty()) jv[jss::incomplete_shards] = shardInfo->incompleteToString(); } @@ -762,8 +762,8 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays) // Request peer shard info protocol::TMGetPeerShardInfoV2 tmGPS; tmGPS.set_relays(relays); - foreach(send_always(std::make_shared( - tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2))); + foreach(send_always( + std::make_shared(tmGPS, protocol::mtGET_PEER_SHARD_INFO_V2))); if (csCV_.wait_for(lock, seconds(60)) == std::cv_status::timeout) { @@ -792,9 +792,9 @@ OverlayImpl::crawlShards(bool includePublicKey, std::uint32_t relays) toBase58(TokenType::NodePublic, publicKey); } - if (!shardInfo.final.empty()) - pv[jss::complete_shards] = shardInfo.finalToString(); - if (!shardInfo.incomplete.empty()) + if (!shardInfo.finalized().empty()) + pv[jss::complete_shards] = shardInfo.finalizedToString(); + if (!shardInfo.incomplete().empty()) pv[jss::incomplete_shards] = shardInfo.incompleteToString(); } } @@ -875,9 +875,9 @@ OverlayImpl::getOverlayInfo() if (it != peerShardInfos.end()) { auto const& shardInfo{it->second}; - if (!shardInfo.final.empty()) - pv[jss::complete_shards] = shardInfo.finalToString(); - if (!shardInfo.incomplete.empty()) + if (!shardInfo.finalized().empty()) + pv[jss::complete_shards] = shardInfo.finalizedToString(); + if (!shardInfo.incomplete().empty()) pv[jss::incomplete_shards] = shardInfo.incompleteToString(); } }); diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index fde47b99c3e..eeb90638969 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -476,8 +476,8 @@ PeerImp::hasLedger(uint256 const& hash, std::uint32_t seq) const auto const it{shardInfos_.find(publicKey_)}; if (it != shardInfos_.end()) { - auto const shardIndex{NodeStore::seqToShardIndex(seq)}; - return boost::icl::contains(it->second.final, shardIndex); + auto const shardIndex{app_.getNodeStore().seqToShardIndex(seq)}; + return boost::icl::contains(it->second.finalized(), shardIndex); } } return false; @@ -1197,22 +1197,13 @@ void PeerImp::onMessage(std::shared_ptr const& m) { // Find the earliest and latest shard indexes + auto const& db{app_.getNodeStore()}; + auto const earliestShardIndex{db.earliestShardIndex()}; auto const curLedgerSeq{app_.getLedgerMaster().getCurrentLedgerIndex()}; - std::uint32_t earliestIndex; - boost::optional latestIndex; - if (auto shardStore = app_.getShardStore()) - { - earliestIndex = shardStore->earliestShardIndex(); - if (curLedgerSeq >= shardStore->earliestLedgerSeq()) - latestIndex = shardStore->seqToShardIndex(curLedgerSeq); - } - else - { - auto const earliestLedgerSeq{app_.getNodeStore().earliestLedgerSeq()}; - earliestIndex = NodeStore::seqToShardIndex(earliestLedgerSeq); - if (curLedgerSeq >= earliestLedgerSeq) - latestIndex = NodeStore::seqToShardIndex(curLedgerSeq); - } + boost::optional latestShardIndex; + + if (curLedgerSeq >= db.earliestLedgerSeq()) + latestShardIndex = db.seqToShardIndex(curLedgerSeq); auto badData = [&](std::string msg) { fee_ = Resource::feeBadData; @@ -1238,25 +1229,25 @@ PeerImp::onMessage(std::shared_ptr const& m) auto const shardIndex{incomplete.shardindex()}; // Verify shard index - if (shardIndex < earliestIndex || - (latestIndex && shardIndex > latestIndex)) + if (shardIndex < earliestShardIndex || + (latestShardIndex && shardIndex > latestShardIndex)) { return badData("Invalid incomplete shard index"); } s.add32(shardIndex); // Verify state - auto const state{static_cast(incomplete.state())}; + auto const state{static_cast(incomplete.state())}; switch (state) { // Incomplete states - case NodeStore::acquire: - case NodeStore::complete: - case NodeStore::finalizing: - case NodeStore::queued: + case ShardState::acquire: + case ShardState::complete: + case ShardState::finalizing: + case ShardState::queued: break; - // case NodeStore::final: + // case ShardState::finalized: default: return badData("Invalid incomplete shard state"); }; @@ -1267,42 +1258,40 @@ PeerImp::onMessage(std::shared_ptr const& m) if (incomplete.has_progress()) { progress = incomplete.progress(); - if (progress < 1 || progress > 99) + if (progress < 1 || progress > 100) return badData("Invalid incomplete shard progress"); s.add32(progress); } // Verify each incomplete shard is unique - if (!shardInfo.incomplete - .emplace( - shardIndex, - NodeStore::ShardInfo::Incomplete( - state, static_cast(progress) / 100)) - .second) + if (!shardInfo.update( + shardIndex, state, {progress, db.maxLedgers(shardIndex)})) { return badData("Invalid duplicate incomplete shards"); } } } - // Verify final shards - if (m->has_final()) + // Verify finalized shards + if (m->has_finalized()) { - auto const& final{m->final()}; - if (final.empty()) - return badData("Invalid final shards"); + auto const& str{m->finalized()}; + if (str.empty()) + return badData("Invalid finalized shards"); - if (!from_string(shardInfo.final, final)) - return badData("Invalid final shard indexes"); + if (!shardInfo.setFinalizedFromString(str)) + return badData("Invalid finalized shard indexes"); - if (boost::icl::length(shardInfo.final) == 0 || - boost::icl::first(shardInfo.final) < earliestIndex || - (latestIndex && boost::icl::last(shardInfo.final) > latestIndex)) + auto const& finalized{shardInfo.finalized()}; + if (boost::icl::length(finalized) == 0 || + boost::icl::first(finalized) < earliestShardIndex || + (latestShardIndex && + boost::icl::last(finalized) > latestShardIndex)) { - return badData("Invalid final shard indexes"); + return badData("Invalid finalized shard indexes"); } - s.addRaw(final.data(), final.size()); + s.addRaw(str.data(), str.size()); } // Verify public key @@ -1367,10 +1356,10 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.trace()) << "Consumed TMPeerShardInfoV2 originating from public key " - << toBase58(TokenType::NodePublic, publicKey) << " final shards[" - << ripple::to_string(shardInfo.final) << "] incomplete shards[" - << (shardInfo.incomplete.empty() ? "empty" - : shardInfo.incompleteToString()) + << toBase58(TokenType::NodePublic, publicKey) << " finalized shards[" + << ripple::to_string(shardInfo.finalized()) << "] incomplete shards[" + << (shardInfo.incomplete().empty() ? "empty" + : shardInfo.incompleteToString()) << "]"; // Consume the message diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 13baf4a5179..c890de94f35 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -129,15 +129,15 @@ message TMPeerShardInfoV2 required uint32 shardIndex = 1; required uint32 state = 2; - // State completion percent, 1 - 99 + // State completion percent, 1 - 100 optional uint32 progress = 3; } // Incomplete shards being acquired or verified repeated TMIncomplete incomplete = 1; - // Complete and verified immutable shards (RangeSet) - optional string final = 2; + // Verified immutable shards (RangeSet) + optional string finalized = 2; // Public key of node that authored the shard info required bytes publicKey = 3; diff --git a/src/ripple/protocol/SystemParameters.h b/src/ripple/protocol/SystemParameters.h index 2a59de656d6..0620f5f66ca 100644 --- a/src/ripple/protocol/SystemParameters.h +++ b/src/ripple/protocol/SystemParameters.h @@ -58,7 +58,10 @@ systemCurrencyCode() } /** The XRP ledger network's earliest allowed sequence */ -static std::uint32_t constexpr XRP_LEDGER_EARLIEST_SEQ{32570}; +static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ{32570u}; + +/** The number of ledgers in a shard */ +static constexpr std::uint32_t DEFAULT_LEDGERS_PER_SHARD{16384u}; /** The minimum amount of support an amendment should have. diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index 1b9f5ab7128..1d75054dc2b 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -270,11 +270,6 @@ class TestPeer : public Peer { } bool - hasShard(std::uint32_t shardIndex) const override - { - return false; - } - bool hasTxSet(uint256 const& hash) const override { return false; diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index a08cf658b7f..dadaac7804b 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -452,14 +452,18 @@ class DatabaseShard_test : public TestBase std::to_string(earliestSeq)); // Node store configuration - cfg->overwrite( - ConfigSection::nodeDatabase(), - "earliest_seq", - std::to_string(earliestSeq)); cfg->overwrite( ConfigSection::nodeDatabase(), "path", nodeDir.empty() ? defNodeDir.path() : nodeDir); + cfg->overwrite( + ConfigSection::nodeDatabase(), + "ledgers_per_shard", + std::to_string(ledgersPerShard)); + cfg->overwrite( + ConfigSection::nodeDatabase(), + "earliest_seq", + std::to_string(earliestSeq)); return cfg; }); } @@ -471,7 +475,8 @@ class DatabaseShard_test : public TestBase std::chrono::seconds timeout = shardStoreTimeout) { auto const end{std::chrono::system_clock::now() + timeout}; - while (!boost::icl::contains(db.getShardInfo()->final, shardIndex)) + while ( + !boost::icl::contains(db.getShardInfo()->finalized(), shardIndex)) { if (!BEAST_EXPECT(std::chrono::system_clock::now() < end)) return {}; @@ -537,7 +542,6 @@ class DatabaseShard_test : public TestBase make_ShardStore(env.app(), parent, scheduler, 2, journal_); BEAST_EXPECT(db); - BEAST_EXPECT(db->ledgersPerShard() == db->ledgersPerShardDefault); BEAST_EXPECT(db->init()); BEAST_EXPECT(db->ledgersPerShard() == ledgersPerShard); BEAST_EXPECT(db->seqToShardIndex(ledgersPerShard + 1) == 1); @@ -629,7 +633,7 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); for (auto i = 0; i < nTestShards; ++i) { @@ -641,8 +645,8 @@ class DatabaseShard_test : public TestBase return; } - BEAST_EXPECT( - boost::icl::contains(db->getShardInfo()->final, *shardIndex)); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), *shardIndex)); } } @@ -692,7 +696,7 @@ class DatabaseShard_test : public TestBase BEAST_EXPECT(db->getPreShards() == bitmask2Rangeset(bitMask)); // create shards which are not prepared for import - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); std::uint64_t bitMask2 = 0; for (auto i = 0; i < nTestShards; ++i) @@ -705,8 +709,8 @@ class DatabaseShard_test : public TestBase return; } - BEAST_EXPECT( - boost::icl::contains(db->getShardInfo()->final, *shardIndex)); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), *shardIndex)); bitMask2 |= 1ll << *shardIndex; BEAST_EXPECT((bitMask & bitMask2) == 0); @@ -829,7 +833,7 @@ class DatabaseShard_test : public TestBase for (std::uint32_t shardIndex = 1; shardIndex <= 1; ++shardIndex) waitShard(*db, shardIndex); - BEAST_EXPECT(boost::icl::contains(db->getShardInfo()->final, 1)); + BEAST_EXPECT(boost::icl::contains(db->getShardInfo()->finalized(), 1)); for (std::uint32_t i = 0; i < 1 * ledgersPerShard; ++i) checkLedger(data, *db, *data.ledgers_[i]); @@ -890,8 +894,8 @@ class DatabaseShard_test : public TestBase if (i == 2) { waitShard(*db, shardIndex); - BEAST_EXPECT( - boost::icl::contains(db->getShardInfo()->final, 1)); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), 1)); } else { @@ -906,7 +910,7 @@ class DatabaseShard_test : public TestBase std::this_thread::yield(); } - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); } } @@ -922,14 +926,14 @@ class DatabaseShard_test : public TestBase if (i == 2) { waitShard(*db, 1); - BEAST_EXPECT( - boost::icl::contains(db->getShardInfo()->final, 1)); + BEAST_EXPECT(boost::icl::contains( + db->getShardInfo()->finalized(), 1)); for (std::uint32_t j = 0; j < ledgersPerShard; ++j) checkLedger(data, *db, *data.ledgers_[j]); } else - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); } } } @@ -956,13 +960,13 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < 2 * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); for (std::uint32_t i = 1; i <= 2; ++i) waitShard(*db, i); - auto const finalShards{std::move(db->getShardInfo()->final)}; + auto const finalShards{std::move(db->getShardInfo()->finalized())}; for (std::uint32_t shardIndex : {1, 2}) BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); } @@ -978,7 +982,7 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 1; i <= 2; ++i) waitShard(*db, i); - auto const finalShards{std::move(db->getShardInfo()->final)}; + auto const finalShards{std::move(db->getShardInfo()->finalized())}; for (std::uint32_t shardIndex : {1, 2}) BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); @@ -1031,13 +1035,13 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); for (std::uint32_t i = 1; i <= ledgerCount; ++i) waitShard(*db, i); - auto const final{std::move(db->getShardInfo()->final)}; + auto const final{std::move(db->getShardInfo()->finalized())}; for (std::uint32_t shardIndex : {1, 2, 3, 4}) BEAST_EXPECT(boost::icl::contains(final, shardIndex)); @@ -1091,13 +1095,13 @@ class DatabaseShard_test : public TestBase for (std::uint32_t i = 0; i < ledgerCount * ledgersPerShard; ++i) BEAST_EXPECT(saveLedger(ndb, *data.ledgers_[i])); - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); db->import(ndb); for (std::uint32_t i = 1; i <= ledgerCount; ++i) waitShard(*db, i); - auto const finalShards{std::move(db->getShardInfo()->final)}; + auto const finalShards{std::move(db->getShardInfo()->finalized())}; for (std::uint32_t shardIndex : {1, 2, 3, 4}) BEAST_EXPECT(boost::icl::contains(finalShards, shardIndex)); @@ -1159,7 +1163,7 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - BEAST_EXPECT(db->getShardInfo()->final.empty()); + BEAST_EXPECT(db->getShardInfo()->finalized().empty()); // Add ten shards to the Shard Database for (auto i = 0; i < ledgerCount; ++i) @@ -1173,7 +1177,7 @@ class DatabaseShard_test : public TestBase } BEAST_EXPECT(boost::icl::contains( - db->getShardInfo()->final, *shardIndex)); + db->getShardInfo()->finalized(), *shardIndex)); } auto mainPathCount = std::distance( @@ -1248,10 +1252,9 @@ class DatabaseShard_test : public TestBase { return; } - auto aa = to_string(db->getShardInfo()->final); // TODO BEAST_EXPECT(boost::icl::contains( - db->getShardInfo()->final, *shardIndex)); + db->getShardInfo()->finalized(), *shardIndex)); } mainPathCount = std::distance( @@ -1323,7 +1326,7 @@ class DatabaseShard_test : public TestBase if (!BEAST_EXPECT(data.makeLedgers(env))) return; - BEAST_EXPECT(shardStore->getShardInfo()->final.empty()); + BEAST_EXPECT(shardStore->getShardInfo()->finalized().empty()); int oldestShardIndex{-1}; for (auto i = 0; i < numShards; ++i) @@ -1336,7 +1339,7 @@ class DatabaseShard_test : public TestBase } BEAST_EXPECT(boost::icl::contains( - shardStore->getShardInfo()->final, *shardIndex)); + shardStore->getShardInfo()->finalized(), *shardIndex)); if (oldestShardIndex == -1) oldestShardIndex = *shardIndex; diff --git a/src/test/nodestore/Database_test.cpp b/src/test/nodestore/Database_test.cpp index 67d3a6b3741..0f762ab3a82 100644 --- a/src/test/nodestore/Database_test.cpp +++ b/src/test/nodestore/Database_test.cpp @@ -598,9 +598,8 @@ class Database_test : public TestBase if (type == "memory") { - // Earliest ledger sequence tests + // Verify default earliest ledger sequence { - // Verify default earliest ledger sequence std::unique_ptr db = Manager::instance().make_Database( "test", @@ -673,6 +672,55 @@ class Database_test : public TestBase std::strcmp(e.what(), "earliest_seq set more than once") == 0); } + + // Verify default ledgers per shard + { + std::unique_ptr db = + Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + BEAST_EXPECT( + db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD); + } + + // Set an invalid ledgers per shard + try + { + nodeParams.set("ledgers_per_shard", "100"); + std::unique_ptr db = + Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + } + catch (std::runtime_error const& e) + { + BEAST_EXPECT( + std::strcmp(e.what(), "Invalid ledgers_per_shard") == 0); + } + + // Set a valid ledgers per shard + nodeParams.set("ledgers_per_shard", "256"); + std::unique_ptr db = Manager::instance().make_Database( + "test", + megabytes(4), + scheduler, + 2, + parent, + nodeParams, + journal_); + + // Verify database uses the ledgers per shard + BEAST_EXPECT(db->ledgersPerShard() == 256); } } diff --git a/src/test/rpc/ShardArchiveHandler_test.cpp b/src/test/rpc/ShardArchiveHandler_test.cpp index d4452fc2959..b6c4897d520 100644 --- a/src/test/rpc/ShardArchiveHandler_test.cpp +++ b/src/test/rpc/ShardArchiveHandler_test.cpp @@ -166,13 +166,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); @@ -264,13 +269,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite { auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); @@ -361,19 +371,23 @@ class ShardArchiveHandler_test : public beast::unit_test::suite } auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "20"); - section.set("ledgers_per_shard", "256"); - section.set("shard_verification_retry_interval", "1"); - section.set("shard_verification_max_attempts", "10000"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "20"); + section.set("shard_verification_retry_interval", "1"); + section.set("shard_verification_max_attempts", "10000"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); jtx::Env env(*this, std::move(c)); - std::uint8_t const numberOfDownloads = 10; // Create some ledgers so that the ShardArchiveHandler @@ -429,13 +443,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "1"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs)); @@ -503,13 +522,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "0"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "0"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs)); @@ -586,13 +610,18 @@ class ShardArchiveHandler_test : public beast::unit_test::suite beast::temp_dir tempDir; auto c = jtx::envconfig(); - auto& section = c->section(ConfigSection::shardDatabase()); - section.set("path", tempDir.path()); - section.set("max_historical_shards", "1"); - section.set("ledgers_per_shard", "256"); - section.set("earliest_seq", "257"); - auto& sectionNode = c->section(ConfigSection::nodeDatabase()); - sectionNode.set("earliest_seq", "257"); + { + auto& section{c->section(ConfigSection::shardDatabase())}; + section.set("path", tempDir.path()); + section.set("max_historical_shards", "1"); + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } + { + auto& section{c->section(ConfigSection::nodeDatabase())}; + section.set("ledgers_per_shard", "256"); + section.set("earliest_seq", "257"); + } c->setupControl(true, true, true); std::unique_ptr logs(new CaptureLogs(&capturedLogs));