diff --git a/Builds/CMake/RippleConfig.cmake b/Builds/CMake/RippleConfig.cmake index 1091e741e74..1dc75cb9093 100644 --- a/Builds/CMake/RippleConfig.cmake +++ b/Builds/CMake/RippleConfig.cmake @@ -21,7 +21,6 @@ find_dependency (Boost 1.70 filesystem program_options regex - serialization system thread) #[=========================================================[ diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 6e7feb69af0..f39522002e5 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -514,6 +514,7 @@ target_sources (rippled PRIVATE src/ripple/nodestore/impl/ManagerImp.cpp src/ripple/nodestore/impl/NodeObject.cpp src/ripple/nodestore/impl/Shard.cpp + src/ripple/nodestore/impl/TaskQueue.cpp #[===============================[ main sources: subdir: overlay diff --git a/Builds/CMake/deps/Boost.cmake b/Builds/CMake/deps/Boost.cmake index e3e8d92d85e..bdff36909cc 100644 --- a/Builds/CMake/deps/Boost.cmake +++ b/Builds/CMake/deps/Boost.cmake @@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED filesystem program_options regex - serialization system thread) @@ -69,7 +68,6 @@ target_link_libraries (ripple_boost Boost::filesystem Boost::program_options Boost::regex - Boost::serialization Boost::system Boost::thread) if (Boost_COMPILER) diff --git a/Builds/containers/shared/install_boost.sh b/Builds/containers/shared/install_boost.sh index 51d6524d785..ea26220e627 100755 --- a/Builds/containers/shared/install_boost.sh +++ b/Builds/containers/shared/install_boost.sh @@ -39,7 +39,6 @@ else BLDARGS+=(--with-filesystem) BLDARGS+=(--with-program_options) BLDARGS+=(--with-regex) - BLDARGS+=(--with-serialization) BLDARGS+=(--with-system) BLDARGS+=(--with-atomic) BLDARGS+=(--with-thread) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index fcb0961ade0..93042ce5ebc 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -345,9 +345,9 @@ class ApplicationImp // These are Stoppable-related std::unique_ptr m_jobQueue; std::unique_ptr m_nodeStore; - std::unique_ptr shardStore_; detail::AppFamily family_; - std::unique_ptr sFamily_; + std::unique_ptr shardStore_; + std::unique_ptr shardFamily_; // VFALCO TODO Make OrderBookDB abstract OrderBookDB m_orderBookDB; std::unique_ptr m_pathRequests; @@ -463,18 +463,18 @@ class ApplicationImp m_collectorManager->group ("jobq"), m_nodeStoreScheduler, logs_->journal("JobQueue"), *logs_, *perfLog_)) - , m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4)) + , m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4)) + + , family_ (*this, *m_nodeStore, *m_collectorManager) // The shard store is optional and make_ShardStore can return null. - , shardStore_(make_ShardStore( + , shardStore_ (make_ShardStore ( *this, *m_jobQueue, m_nodeStoreScheduler, 4, logs_->journal("ShardStore"))) - , family_ (*this, *m_nodeStore, *m_collectorManager) - , m_orderBookDB (*this, *m_jobQueue) , m_pathRequests (std::make_unique ( @@ -558,14 +558,6 @@ class ApplicationImp logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service()) , grpcServer_(std::make_unique(*this)) { - if (shardStore_) - { - sFamily_ = std::make_unique( - *this, - *shardStore_, - *m_collectorManager); - } - add (m_resourceManager.get ()); // @@ -626,7 +618,7 @@ class ApplicationImp Family* shardFamily() override { - return sFamily_.get(); + return shardFamily_.get(); } TimeKeeper& @@ -943,7 +935,7 @@ class ApplicationImp } bool - initNodeStoreDBs() + initNodeStore() { if (config_->doImport) { @@ -961,12 +953,12 @@ class ApplicationImp JLOG(j.warn()) << "Starting node import from '" << source->getName() << - "' to '" << getNodeStore().getName() << "'."; + "' to '" << m_nodeStore->getName() << "'."; using namespace std::chrono; auto const start = steady_clock::now(); - getNodeStore().import(*source); + m_nodeStore->import(*source); auto const elapsed = duration_cast (steady_clock::now() - start); @@ -990,14 +982,6 @@ class ApplicationImp family().treecache().setTargetAge( seconds{config_->getValueFor(SizedItem::treeCacheAge)}); - if (sFamily_) - { - sFamily_->treecache().setTargetSize( - config_->getValueFor(SizedItem::treeCacheSize)); - sFamily_->treecache().setTargetAge( - seconds{config_->getValueFor(SizedItem::treeCacheAge)}); - } - return true; } @@ -1252,8 +1236,8 @@ class ApplicationImp // have listeners register for "onSweep ()" notification. family().fullbelow().sweep(); - if (sFamily_) - sFamily_->fullbelow().sweep(); + if (shardFamily_) + shardFamily_->fullbelow().sweep(); getMasterTransaction().sweep(); getNodeStore().sweep(); if (shardStore_) @@ -1264,8 +1248,8 @@ class ApplicationImp getInboundLedgers().sweep(); m_acceptedLedgerCache.sweep(); family().treecache().sweep(); - if (sFamily_) - sFamily_->treecache().sweep(); + if (shardFamily_) + shardFamily_->treecache().sweep(); cachedSLEs_.expire(); // Set timer to do another sweep later. @@ -1350,9 +1334,26 @@ bool ApplicationImp::setup() if (!config_->standalone()) timeKeeper_->run(config_->SNTP_SERVERS); - if (!initSQLiteDBs() || !initNodeStoreDBs()) + if (!initSQLiteDBs() || !initNodeStore()) return false; + if (shardStore_) + { + shardFamily_ = std::make_unique( + *this, + *shardStore_, + *m_collectorManager); + + using namespace std::chrono; + shardFamily_->treecache().setTargetSize( + config_->getValueFor(SizedItem::treeCacheSize)); + shardFamily_->treecache().setTargetAge( + seconds{config_->getValueFor(SizedItem::treeCacheAge)}); + + if (!shardStore_->init()) + return false; + } + if (!peerReservations_->load(getWalletDB())) { JLOG(m_journal.fatal()) << "Cannot find peer reservations!"; diff --git a/src/ripple/app/main/DBInit.h b/src/ripple/app/main/DBInit.h index b632d168bf3..3b6d8b05833 100644 --- a/src/ripple/app/main/DBInit.h +++ b/src/ripple/app/main/DBInit.h @@ -116,9 +116,30 @@ std::array TxDBInit {{ //////////////////////////////////////////////////////////////////////////////// +// Temporary database used with an incomplete shard that is being acquired +static constexpr auto AcquireDBName {"acquire.db"}; + +static constexpr +std::array AcquireDBPragma {{ + "PRAGMA synchronous=NORMAL;", + "PRAGMA journal_mode=WAL;", + "PRAGMA journal_size_limit=1582080;" +}}; + +static constexpr +std::array AcquireDBInit {{ + "CREATE TABLE IF NOT EXISTS Shard ( \ + ShardIndex INTEGER PRIMARY KEY, \ + LastLedgerHash CHARACTER(64), \ + StoredLedgerSeqs BLOB \ + );" +}}; + +//////////////////////////////////////////////////////////////////////////////// + // Pragma for Ledger and Transaction databases with complete shards static constexpr -std::array CompleteShardDBPragma {{ +std::array CompltDBPragma {{ "PRAGMA synchronous=OFF;", "PRAGMA journal_mode=OFF;" }}; diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index 21267db99d5..cfc824915d0 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -142,7 +142,7 @@ void printHelp (const po::options_description& desc) " connect []\n" " consensus_info\n" " deposit_authorized []\n" - " download_shard [[ ]] \n" + " download_shard [[ ]]\n" " feature [ [accept|reject]]\n" " fetch_info [clear]\n" " gateway_balances [] [ [ ]]\n" diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 7e32ff0f890..36cb0b2417a 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -449,7 +449,7 @@ SHAMapStoreImp::run() std::string nextArchiveDir = dbRotating_->getWritableBackend()->getName(); lastRotated = validatedSeq; - std::unique_ptr oldBackend; + std::shared_ptr oldBackend; { std::lock_guard lock (dbRotating_->peekMutex()); diff --git a/src/ripple/basics/RangeSet.h b/src/ripple/basics/RangeSet.h index 13f58c94ad9..4e00a4627ed 100644 --- a/src/ripple/basics/RangeSet.h +++ b/src/ripple/basics/RangeSet.h @@ -20,11 +20,14 @@ #ifndef RIPPLE_BASICS_RANGESET_H_INCLUDED #define RIPPLE_BASICS_RANGESET_H_INCLUDED -#include -#include +#include + +#include #include #include -#include +#include + +#include namespace ripple { @@ -86,8 +89,8 @@ std::string to_string(ClosedInterval const & ci) /** Convert the given RangeSet to a styled string. - The styled string represention is the set of disjoint intervals joined by - commas. The string "empty" is returned if the set is empty. + The styled string representation is the set of disjoint intervals joined + by commas. The string "empty" is returned if the set is empty. @param rs The rangeset to convert @return The styled string @@ -109,6 +112,67 @@ std::string to_string(RangeSet const & rs) return res; } +/** Convert the given styled string to a RangeSet. + + The styled string representation is the set + of disjoint intervals joined by commas. + + @param rs The set to be populated + @param s The styled string to convert + @return True on successfully converting styled string +*/ +template +bool +from_string(RangeSet& rs, std::string const& s) +{ + std::vector intervals; + std::vector tokens; + bool result {true}; + + boost::split(tokens, s, boost::algorithm::is_any_of(",")); + for (auto const& t : tokens) + { + boost::split(intervals, t, boost::algorithm::is_any_of("-")); + switch (intervals.size()) + { + case 1: + { + T front; + if (!beast::lexicalCastChecked(front, intervals.front())) + result = false; + else + rs.insert(front); + break; + } + case 2: + { + T front; + if (!beast::lexicalCastChecked(front, intervals.front())) + result = false; + else + { + T back; + if (!beast::lexicalCastChecked(back, intervals.back())) + result = false; + else + rs.insert(range(front, back)); + } + break; + } + default: + result = false; + } + + if (!result) + break; + intervals.clear(); + } + + if (!result) + rs.clear(); + return result; +} + /** Find the largest value not in the set that is less than a given value. @param rs The set of interest @@ -129,75 +193,8 @@ prevMissing(RangeSet const & rs, T t, T minVal = 0) return boost::none; return boost::icl::last(tgt); } -} // namespace ripple - - -// The boost serialization documents recommended putting free-function helpers -// in the boost serialization namespace -namespace boost { -namespace serialization { -template -void -save(Archive& ar, - ripple::ClosedInterval const& ci, - const unsigned int version) -{ - auto l = ci.lower(); - auto u = ci.upper(); - ar << l << u; -} - -template -void -load(Archive& ar, ripple::ClosedInterval& ci, const unsigned int version) -{ - T low, up; - ar >> low >> up; - ci = ripple::ClosedInterval{low, up}; -} - -template -void -serialize(Archive& ar, - ripple::ClosedInterval& ci, - const unsigned int version) -{ - split_free(ar, ci, version); -} - -template -void -save(Archive& ar, ripple::RangeSet const& rs, const unsigned int version) -{ - auto s = rs.iterative_size(); - ar << s; - for (auto const& r : rs) - ar << r; -} - -template -void -load(Archive& ar, ripple::RangeSet& rs, const unsigned int version) -{ - rs.clear(); - std::size_t intervals; - ar >> intervals; - for (std::size_t i = 0; i < intervals; ++i) - { - ripple::ClosedInterval ci; - ar >> ci; - rs.insert(ci); - } -} +} // namespace ripple -template -void -serialize(Archive& ar, ripple::RangeSet& rs, const unsigned int version) -{ - split_free(ar, rs, version); -} -} // serialization -} // boost #endif diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index c418bc67a03..3cf796f06e1 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -31,7 +31,7 @@ JobQueue::JobQueue (beast::insight::Collector::ptr const& collector, , m_lastJob (0) , m_invalidJobData (JobTypes::instance().getInvalid (), collector, logs) , m_processCount (0) - , m_workers (*this, perfLog, "JobQueue", 0) + , m_workers (*this, &perfLog, "JobQueue", 0) , m_cancelCallback (std::bind (&Stoppable::isStopping, this)) , perfLog_ (perfLog) , m_collector (collector) diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index ca456de5728..f04f94e4b84 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -26,7 +26,7 @@ namespace ripple { Workers::Workers ( Callback& callback, - perf::PerfLog& perfLog, + perf::PerfLog* perfLog, std::string const& threadNames, int numberOfThreads) : m_callback (callback) @@ -63,7 +63,8 @@ void Workers::setNumberOfThreads (int numberOfThreads) static int instance {0}; if (m_numberOfThreads != numberOfThreads) { - perfLog_.resizeJobs(numberOfThreads); + if (perfLog_) + perfLog_->resizeJobs(numberOfThreads); if (numberOfThreads > m_numberOfThreads) { diff --git a/src/ripple/core/impl/Workers.h b/src/ripple/core/impl/Workers.h index 9721ae9e6e2..3a811ce899c 100644 --- a/src/ripple/core/impl/Workers.h +++ b/src/ripple/core/impl/Workers.h @@ -69,7 +69,7 @@ class Workers @param threadNames The name given to each created worker thread. */ explicit Workers (Callback& callback, - perf::PerfLog& perfLog, + perf::PerfLog* perfLog, std::string const& threadNames = "Worker", int numberOfThreads = static_cast(std::thread::hardware_concurrency())); @@ -166,7 +166,7 @@ class Workers private: Callback& m_callback; - perf::PerfLog& perfLog_; + perf::PerfLog* perfLog_; std::string m_threadNames; // The name to give each thread std::condition_variable m_cv; // signaled when all threads paused std::mutex m_mut; diff --git a/src/ripple/net/impl/RPCCall.cpp b/src/ripple/net/impl/RPCCall.cpp index 64f2941653f..bca8fefb5b7 100644 --- a/src/ripple/net/impl/RPCCall.cpp +++ b/src/ripple/net/impl/RPCCall.cpp @@ -186,7 +186,6 @@ class RPCParser ++i; else if (!boost::iequals(jvParams[--sz].asString(), "novalidate")) return rpcError(rpcINVALID_PARAMS); - jvResult[jss::validate] = false; } // Create the 'shards' array diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 1dde5f4df47..57459aabf06 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -149,7 +149,7 @@ class Database : public Stoppable */ virtual bool - copyLedger(std::shared_ptr const& ledger) = 0; + copyLedger(std::shared_ptr const& srcLedger) = 0; /** Wait for all currently pending async reads to complete. */ @@ -240,7 +240,7 @@ class Database : public Stoppable std::shared_ptr> const& nCache); std::shared_ptr - fetchInternal(uint256 const& hash, Backend& srcBackend); + fetchInternal(uint256 const& hash, std::shared_ptr backend); void importInternal(Backend& dstBackend, Database& srcDB); @@ -251,10 +251,12 @@ class Database : public Stoppable KeyCache& nCache, bool isAsync); bool - copyLedger(Backend& dstBackend, Ledger const& srcLedger, - std::shared_ptr> const& pCache, - std::shared_ptr> const& nCache, - std::shared_ptr const& srcNext); + copyLedger( + Ledger const& srcLedger, + std::shared_ptr dstBackend, + std::shared_ptr> dstPCache, + std::shared_ptr> dstNCache, + std::shared_ptr next); private: std::atomic storeCount_ {0}; diff --git a/src/ripple/nodestore/DatabaseRotating.h b/src/ripple/nodestore/DatabaseRotating.h index 75606be187e..9e661054b0f 100644 --- a/src/ripple/nodestore/DatabaseRotating.h +++ b/src/ripple/nodestore/DatabaseRotating.h @@ -50,12 +50,12 @@ class DatabaseRotating : public Database virtual std::mutex& peekMutex() const = 0; virtual - std::unique_ptr const& + std::shared_ptr const& getWritableBackend() const = 0; virtual - std::unique_ptr - rotateBackends(std::unique_ptr newBackend) = 0; + std::shared_ptr + rotateBackends(std::shared_ptr newBackend) = 0; }; } diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index a6bd7dd283f..0e86664ca8c 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -109,14 +109,14 @@ class DatabaseShard : public Database @param shardIndex Shard index to import @param srcDir The directory to import from - @param validate If true validate shard ledger data @return true If the shard was successfully imported @implNote if successful, srcDir is moved to the database directory */ virtual bool - importShard(std::uint32_t shardIndex, - boost::filesystem::path const& srcDir, bool validate) = 0; + importShard( + std::uint32_t shardIndex, + boost::filesystem::path const& srcDir) = 0; /** Fetch a ledger from the shard store @@ -137,15 +137,6 @@ class DatabaseShard : public Database void setStored(std::shared_ptr const& ledger) = 0; - /** Query if a ledger with the given sequence is stored - - @param seq The ledger sequence to check if stored - @return `true` if the ledger is stored - */ - virtual - bool - contains(std::uint32_t seq) = 0; - /** Query which complete shards are stored @return the indexes of complete shards diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index f6a3c3785b0..5c760f6e1b7 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -115,13 +115,13 @@ Database::asyncFetch(uint256 const& hash, std::uint32_t seq, } std::shared_ptr -Database::fetchInternal(uint256 const& hash, Backend& srcBackend) +Database::fetchInternal(uint256 const& hash, std::shared_ptr backend) { std::shared_ptr nObj; Status status; try { - status = srcBackend.fetch(hash.begin(), &nObj); + status = backend->fetch(hash.begin(), &nObj); } catch (std::exception const& e) { @@ -226,12 +226,14 @@ Database::doFetch(uint256 const& hash, std::uint32_t seq, } bool -Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger, - std::shared_ptr> const& pCache, - std::shared_ptr> const& nCache, - std::shared_ptr const& srcNext) +Database::copyLedger( + Ledger const& srcLedger, + std::shared_ptr dstBackend, + std::shared_ptr> dstPCache, + std::shared_ptr> dstNCache, + std::shared_ptr next) { - assert(static_cast(pCache) == static_cast(nCache)); + assert(static_cast(dstPCache) == static_cast(dstNCache)); if (srcLedger.info().hash.isZero() || srcLedger.info().accountHash.isZero()) { @@ -259,43 +261,49 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger, { assert(nObj->getHash() == sha512Hash(makeSlice(nObj->getData()))); - if (pCache && nCache) + if (dstPCache && dstNCache) { - pCache->canonicalize(nObj->getHash(), nObj, true); - nCache->erase(nObj->getHash()); + dstPCache->canonicalize(nObj->getHash(), nObj, true); + dstNCache->erase(nObj->getHash()); storeStats(nObj->getData().size()); } } #else - if (pCache && nCache) + if (dstPCache && dstNCache) for (auto& nObj : batch) { - pCache->canonicalize(nObj->getHash(), nObj, true); - nCache->erase(nObj->getHash()); + dstPCache->canonicalize(nObj->getHash(), nObj, true); + dstNCache->erase(nObj->getHash()); storeStats(nObj->getData().size()); } #endif - dstBackend.storeBatch(batch); + dstBackend->storeBatch(batch); batch.clear(); batch.reserve(batchWritePreallocationSize); }; bool error = false; - auto f = [&](SHAMapAbstractNode& node) { + auto visit = [&](SHAMapAbstractNode& node) + { if (auto nObj = srcDB.fetch( node.getNodeHash().as_uint256(), srcLedger.info().seq)) { batch.emplace_back(std::move(nObj)); - if (batch.size() >= batchWritePreallocationSize) - storeBatch(); + if (batch.size() < batchWritePreallocationSize) + return true; + + storeBatch(); + + if (!isStopping()) + return true; } - else - error = true; - return !error; + + error = true; + return false; }; // Store ledger header { - Serializer s(1024); + Serializer s(128); s.add32(HashPrefix::ledgerMaster); addRaw(srcLedger.info(), s); auto nObj = NodeObject::createObject(hotLEDGER, @@ -313,14 +321,14 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger, " state map invalid"; return false; } - if (srcNext && srcNext->info().parentHash == srcLedger.info().hash) + if (next && next->info().parentHash == srcLedger.info().hash) { - auto have = srcNext->stateMap().snapShot(false); + auto have = next->stateMap().snapShot(false); srcLedger.stateMap().snapShot( - false)->visitDifferences(&(*have), f); + false)->visitDifferences(&(*have), visit); } else - srcLedger.stateMap().snapShot(false)->visitNodes(f); + srcLedger.stateMap().snapShot(false)->visitNodes(visit); if (error) return false; } @@ -335,7 +343,7 @@ Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger, " transaction map invalid"; return false; } - srcLedger.txMap().snapShot(false)->visitNodes(f); + srcLedger.txMap().snapShot(false)->visitNodes(visit); if (error) return false; } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index 4e62f9ace43..0d9a5a164ce 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -38,7 +38,7 @@ class DatabaseNodeImp : public Database Scheduler& scheduler, int readThreads, Stoppable& parent, - std::unique_ptr backend, + std::shared_ptr backend, Section const& config, beast::Journal j) : Database(name, parent, scheduler, readThreads, config, j) @@ -91,10 +91,10 @@ class DatabaseNodeImp : public Database std::shared_ptr& object) override; bool - copyLedger(std::shared_ptr const& ledger) override + copyLedger(std::shared_ptr const& srcLedger) override { return Database::copyLedger( - *backend_, *ledger, pCache_, nCache_, nullptr); + *srcLedger, backend_, pCache_, nCache_, nullptr); } int @@ -123,12 +123,12 @@ class DatabaseNodeImp : public Database std::shared_ptr> nCache_; // Persistent key/value storage - std::unique_ptr backend_; + std::shared_ptr backend_; std::shared_ptr fetchFrom(uint256 const& hash, std::uint32_t seq) override { - return fetchInternal(hash, *backend_); + return fetchInternal(hash, backend_); } void diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index 76b2b4ec59d..b1fd28cc772 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -29,8 +29,8 @@ DatabaseRotatingImp::DatabaseRotatingImp( Scheduler& scheduler, int readThreads, Stoppable& parent, - std::unique_ptr writableBackend, - std::unique_ptr archiveBackend, + std::shared_ptr writableBackend, + std::shared_ptr archiveBackend, Section const& config, beast::Journal j) : DatabaseRotating(name, parent, scheduler, readThreads, config, j) @@ -49,9 +49,8 @@ DatabaseRotatingImp::DatabaseRotatingImp( } // Make sure to call it already locked! -std::unique_ptr -DatabaseRotatingImp::rotateBackends( - std::unique_ptr newBackend) +std::shared_ptr +DatabaseRotatingImp::rotateBackends(std::shared_ptr newBackend) { auto oldBackend {std::move(archiveBackend_)}; archiveBackend_ = std::move(writableBackend_); @@ -106,10 +105,10 @@ std::shared_ptr DatabaseRotatingImp::fetchFrom(uint256 const& hash, std::uint32_t seq) { Backends b = getBackends(); - auto nObj = fetchInternal(hash, *b.writableBackend); + auto nObj = fetchInternal(hash, b.writableBackend); if (! nObj) { - nObj = fetchInternal(hash, *b.archiveBackend); + nObj = fetchInternal(hash, b.archiveBackend); if (nObj) { getWritableBackend()->store(nObj); diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index e925de5d89d..7c24e6a899b 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -37,8 +37,8 @@ class DatabaseRotatingImp : public DatabaseRotating Scheduler& scheduler, int readThreads, Stoppable& parent, - std::unique_ptr writableBackend, - std::unique_ptr archiveBackend, + std::shared_ptr writableBackend, + std::shared_ptr archiveBackend, Section const& config, beast::Journal j); @@ -48,15 +48,15 @@ class DatabaseRotatingImp : public DatabaseRotating stopThreads(); } - std::unique_ptr const& + std::shared_ptr const& getWritableBackend() const override { std::lock_guard lock (rotateMutex_); return writableBackend_; } - std::unique_ptr - rotateBackends(std::unique_ptr newBackend) override; + std::shared_ptr + rotateBackends(std::shared_ptr newBackend) override; std::mutex& peekMutex() const override { @@ -92,10 +92,10 @@ class DatabaseRotatingImp : public DatabaseRotating std::shared_ptr& object) override; bool - copyLedger(std::shared_ptr const& ledger) override + copyLedger(std::shared_ptr const& srcLedger) override { return Database::copyLedger( - *getWritableBackend(), *ledger, pCache_, nCache_, nullptr); + *srcLedger, getWritableBackend(), pCache_, nCache_, nullptr); } int @@ -126,13 +126,13 @@ class DatabaseRotatingImp : public DatabaseRotating // Negative cache std::shared_ptr> nCache_; - std::unique_ptr writableBackend_; - std::unique_ptr archiveBackend_; + std::shared_ptr writableBackend_; + std::shared_ptr archiveBackend_; mutable std::mutex rotateMutex_; struct Backends { - std::unique_ptr const& writableBackend; - std::unique_ptr const& archiveBackend; + std::shared_ptr const& writableBackend; + std::shared_ptr const& archiveBackend; }; Backends getBackends() const diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 0d28c9cb40e..18ec124f881 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -20,12 +20,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -50,23 +50,21 @@ DatabaseShardImp::DatabaseShardImp( app.config().section(ConfigSection::shardDatabase()), j) , app_(app) + , parent_(parent) + , taskQueue_(std::make_unique()) , earliestShardIndex_(seqToShardIndex(earliestSeq())) , avgShardFileSz_(ledgersPerShard_ * kilobytes(192)) { } - DatabaseShardImp::~DatabaseShardImp() { - // Stop threads before data members are destroyed - stopThreads(); + if (!isStopping()) + onStop(); // Close backend databases before destroying the context - std::lock_guard lock(m_); - complete_.clear(); - if (incomplete_) - incomplete_.reset(); - preShards_.clear(); + std::lock_guard lock(mutex_); + shards_.clear(); ctx_.reset(); } @@ -74,8 +72,7 @@ bool DatabaseShardImp::init() { using namespace boost::filesystem; - - std::lock_guard lock(m_); + using namespace boost::beast::detail; auto fail = [j = j_](std::string const& msg) { JLOG(j.error()) << @@ -83,176 +80,199 @@ DatabaseShardImp::init() return false; }; - if (init_) - return fail("already initialized"); + { + std::lock_guard lock(mutex_); + + if (init_) + return fail("already initialized"); - Config const& config {app_.config()}; - Section const& section {config.section(ConfigSection::shardDatabase())}; - if (section.empty()) - return fail("missing configuration"); + Config const& config {app_.config()}; + Section const& section {config.section(ConfigSection::shardDatabase())}; - { - // Node and shard stores must use same earliest ledger sequence - std::uint32_t seq; - if (get_if_exists( - config.section(ConfigSection::nodeDatabase()), - "earliest_seq", - seq)) { - std::uint32_t seq2; - if (get_if_exists(section, "earliest_seq", seq2) && - seq != seq2) + // Node and shard stores must use same earliest ledger sequence + std::uint32_t seq; + if (get_if_exists( + config.section(ConfigSection::nodeDatabase()), + "earliest_seq", + seq)) { - return fail("and [" + ConfigSection::shardDatabase() + - "] both define 'earliest_seq'"); + std::uint32_t seq2; + if (get_if_exists( + section, + "earliest_seq", seq2) && + seq != seq2) + { + return fail("and [" + ConfigSection::shardDatabase() + + "] both define 'earliest_seq'"); + } } } - } - if (!get_if_exists(section, "path", dir_)) - return fail("'path' missing"); + if (!get_if_exists(section, "path", dir_)) + return fail("'path' missing"); - if (boost::filesystem::exists(dir_)) - { - if (!boost::filesystem::is_directory(dir_)) - return fail("'path' must be a directory"); - } - else - boost::filesystem::create_directories(dir_); - - { - std::uint64_t sz; - if (!get_if_exists(section, "max_size_gb", sz)) - return fail("'max_size_gb' missing"); - - if ((sz << 30) < sz) - return fail("'max_size_gb' overflow"); - - // Minimum storage space required (in gigabytes) - if (sz < 10) - return fail("'max_size_gb' must be at least 10"); + if (exists(dir_)) + { + if (!is_directory(dir_)) + return fail("'path' must be a directory"); + } + else + create_directories(dir_); - // Convert to bytes - maxFileSz_ = sz << 30; - } + { + std::uint64_t sz; + if (!get_if_exists(section, "max_size_gb", sz)) + return fail("'max_size_gb' missing"); - if (section.exists("ledgers_per_shard")) - { - // To be set only in standalone for testing - if (!config.standalone()) - return fail("'ledgers_per_shard' only honored in stand alone"); + if ((sz << 30) < sz) + return fail("'max_size_gb' overflow"); - ledgersPerShard_ = get(section, "ledgers_per_shard"); - if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) - return fail("'ledgers_per_shard' must be a multiple of 256"); - } + // Minimum storage space required (in gigabytes) + if (sz < 10) + return fail("'max_size_gb' must be at least 10"); - // NuDB is the default and only supported permanent storage backend - // "Memory" and "none" types are supported for tests - backendName_ = get(section, "type", "nudb"); - if (!boost::iequals(backendName_, "NuDB") && - !boost::iequals(backendName_, "Memory") && - !boost::iequals(backendName_, "none")) - { - return fail("'type' value unsupported"); - } + // Convert to bytes + maxFileSz_ = sz << 30; + } - // Check if backend uses permanent storage - if (auto factory = Manager::instance().find(backendName_)) - { - auto backend {factory->createInstance( - NodeObject::keyBytes, section, scheduler_, j_)}; - backed_ = backend->backed(); - if (!backed_) + if (section.exists("ledgers_per_shard")) { - setFileStats(lock); - init_ = true; - return true; + // To be set only in standalone for testing + if (!config.standalone()) + return fail("'ledgers_per_shard' only honored in stand alone"); + + ledgersPerShard_ = get(section, "ledgers_per_shard"); + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + return fail("'ledgers_per_shard' must be a multiple of 256"); } - } - else - return fail(backendName_ + " backend unsupported"); - try - { - ctx_ = std::make_unique(); - ctx_->start(); + // NuDB is the default and only supported permanent storage backend + backendName_ = get(section, "type", "nudb"); + if (!boost::iequals(backendName_, "NuDB")) + return fail("'type' value unsupported"); - // Find shards - for (auto const& d : directory_iterator(dir_)) + try { - if (!is_directory(d)) - continue; + ctx_ = std::make_unique(); + ctx_->start(); - // Validate shard directory name is numeric - auto dirName = d.path().stem().string(); - if (!std::all_of( - dirName.begin(), - dirName.end(), - [](auto c) { - return ::isdigit(static_cast(c)); - })) + // Find shards + for (auto const& d : directory_iterator(dir_)) { - continue; - } + if (!is_directory(d)) + continue; - auto const shardIndex {std::stoul(dirName)}; - if (shardIndex < earliestShardIndex()) - { - return fail("shard " + std::to_string(shardIndex) + - " comes before earliest shard index " + - std::to_string(earliestShardIndex())); - } + // Check shard directory name is numeric + auto dirName = d.path().stem().string(); + if (!std::all_of( + dirName.begin(), + dirName.end(), + [](auto c) { + return ::isdigit(static_cast(c)); + })) + { + continue; + } - // Check if a previous import failed - if (is_regular_file( - dir_ / std::to_string(shardIndex) / importMarker_)) - { - JLOG(j_.warn()) << - "shard " << shardIndex << - " previously failed import, removing"; - remove_all(dir_ / std::to_string(shardIndex)); - continue; - } + auto const shardIndex {std::stoul(dirName)}; + if (shardIndex < earliestShardIndex()) + { + return fail("shard " + std::to_string(shardIndex) + + " comes before earliest shard index " + + std::to_string(earliestShardIndex())); + } - auto shard {std::make_unique(app_, *this, shardIndex, j_)}; - if (!shard->open(scheduler_, *ctx_)) - return false; + auto const shardDir {dir_ / std::to_string(shardIndex)}; - if (shard->complete()) - complete_.emplace(shard->index(), std::move(shard)); - else - { - if (incomplete_) - return fail("more than one control file found"); - incomplete_ = std::move(shard); + // Check if a previous import failed + if (is_regular_file(shardDir / importMarker_)) + { + JLOG(j_.warn()) << + "shard " << shardIndex << + " previously failed import, removing"; + remove_all(shardDir); + continue; + } + + auto shard {std::make_unique( + app_, + *this, + shardIndex, + j_)}; + if (!shard->open(scheduler_, *ctx_)) + { + if (!shard->isLegacy()) + return false; + + // Remove legacy shard + JLOG(j_.warn()) << + "shard " << shardIndex << + " incompatible legacy shard, removing"; + remove_all(shardDir); + continue; + } + + if (shard->isFinal()) + { + shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::final)); + } + else if (shard->isBackendComplete()) + { + auto const result {shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::none))}; + finalizeShard(result.first->second, true, lock); + } + else + { + if (acquireIndex_ != 0) + return fail("more than one shard being acquired"); + + shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::acquire)); + acquireIndex_ = shardIndex; + } } } - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + + updateStatus(lock); + setParent(parent_); + init_ = true; } - setFileStats(lock); - updateStatus(lock); - init_ = true; + setFileStats(); return true; } boost::optional DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) { - std::lock_guard lock(m_); - assert(init_); + boost::optional shardIndex; - if (incomplete_) - return incomplete_->prepare(); - if (!canAdd_) - return boost::none; - if (backed_) { + std::lock_guard lock(mutex_); + assert(init_); + + if (acquireIndex_ != 0) + { + if (auto it {shards_.find(acquireIndex_)}; it != shards_.end()) + return it->second.shard->prepare(); + assert(false); + return boost::none; + } + + if (!canAdd_) + return boost::none; + // Check available storage space if (fileSz_ + avgShardFileSz_ > maxFileSz_) { @@ -266,39 +286,45 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) canAdd_ = false; return boost::none; } + + shardIndex = findAcquireIndex(validLedgerSeq, lock); } - auto const shardIndex {findShardIndexToAdd(validLedgerSeq, lock)}; if (!shardIndex) { JLOG(j_.debug()) << "no new shards to add"; - canAdd_ = false; + { + std::lock_guard lock(mutex_); + canAdd_ = false; + } return boost::none; } - // With every new shard, clear family caches - app_.shardFamily()->reset(); - incomplete_ = std::make_unique(app_, *this, *shardIndex, j_); - if (!incomplete_->open(scheduler_, *ctx_)) - { - incomplete_.reset(); + auto shard {std::make_unique(app_, *this, *shardIndex, j_)}; + if (!shard->open(scheduler_, *ctx_)) return boost::none; - } - return incomplete_->prepare(); + auto const seq {shard->prepare()}; + { + std::lock_guard lock(mutex_); + shards_.emplace( + *shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::acquire)); + acquireIndex_ = *shardIndex; + } + return seq; } bool DatabaseShardImp::prepareShard(std::uint32_t shardIndex) { - std::lock_guard lock(m_); - assert(init_); - auto fail = [j = j_, shardIndex](std::string const& msg) { JLOG(j.error()) << "shard " << shardIndex << " " << msg; return false; }; + std::lock_guard lock(mutex_); + assert(init_); if (!canAdd_) return fail("cannot be stored at this time"); @@ -324,50 +350,35 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) return false; } - if (complete_.find(shardIndex) != complete_.end()) - { - JLOG(j_.debug()) << "shard " << shardIndex << " is already stored"; - return false; - } - if (incomplete_ && incomplete_->index() == shardIndex) - { - JLOG(j_.debug()) << "shard " << shardIndex << " is being acquired"; - return false; - } - if (preShards_.find(shardIndex) != preShards_.end()) + if (shards_.find(shardIndex) != shards_.end()) { JLOG(j_.debug()) << - "shard " << shardIndex << " is already prepared for import"; + "shard " << shardIndex << + " is already stored or queued for import"; return false; } - // Check limit and space requirements - if (backed_) - { - std::uint64_t const sz { - (preShards_.size() + 1 + (incomplete_ ? 1 : 0)) * avgShardFileSz_}; - if (fileSz_ + sz > maxFileSz_) - { - JLOG(j_.debug()) << - "shard " << shardIndex << " exceeds the maximum storage size"; - return false; - } - if (sz > available()) - return fail("insufficient storage space available"); - } + // Check available storage space + if (fileSz_ + avgShardFileSz_ > maxFileSz_) + return fail("maximum storage size reached"); + if (avgShardFileSz_ > available()) + return fail("insufficient storage space available"); - // Add to shards prepared - preShards_.emplace(shardIndex, nullptr); + shards_.emplace(shardIndex, ShardInfo(nullptr, ShardInfo::State::import)); return true; } void DatabaseShardImp::removePreShard(std::uint32_t shardIndex) { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - preShards_.erase(shardIndex); + if (auto const it {shards_.find(shardIndex)}; + it != shards_.end() && it->second.state == ShardInfo::State::import) + { + shards_.erase(it); + } } std::string @@ -375,27 +386,32 @@ DatabaseShardImp::getPreShards() { RangeSet rs; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - if (preShards_.empty()) - return {}; - for (auto const& ps : preShards_) - rs.insert(ps.first); + for (auto const& e : shards_) + if (e.second.state == ShardInfo::State::import) + rs.insert(e.first); } + + if (rs.empty()) + return {}; + return to_string(rs); }; bool -DatabaseShardImp::importShard(std::uint32_t shardIndex, - boost::filesystem::path const& srcDir, bool validate) +DatabaseShardImp::importShard( + std::uint32_t shardIndex, + boost::filesystem::path const& srcDir) { using namespace boost::filesystem; try { if (!is_directory(srcDir) || is_empty(srcDir)) { - JLOG(j_.error()) << "invalid source directory " << srcDir.string(); + JLOG(j_.error()) << + "invalid source directory " << srcDir.string(); return false; } } @@ -406,7 +422,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, return false; } - auto move = [&](path const& src, path const& dst) + auto renameDir = [&](path const& src, path const& dst) { try { @@ -421,86 +437,88 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, return true; }; - std::unique_lock lock(m_); - assert(init_); - - // Check shard is prepared - auto it {preShards_.find(shardIndex)}; - if(it == preShards_.end()) + path dstDir; { - JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index"; - return false; + std::lock_guard lock(mutex_); + assert(init_); + + // Check shard is prepared + if (auto const it {shards_.find(shardIndex)}; + it == shards_.end() || + it->second.shard || + it->second.state != ShardInfo::State::import) + { + JLOG(j_.error()) << + "shard " << shardIndex << " failed to import"; + return false; + } + + dstDir = dir_ / std::to_string(shardIndex); } - // Move source directory to the shard database directory - auto const dstDir {dir_ / std::to_string(shardIndex)}; - if (!move(srcDir, dstDir)) + // Rename source directory to the shard database directory + if (!renameDir(srcDir, dstDir)) return false; // Create the new shard auto shard {std::make_unique(app_, *this, shardIndex, j_)}; - auto fail = [&](std::string const& msg) + if (!shard->open(scheduler_, *ctx_) || !shard->isBackendComplete()) { - if (!msg.empty()) - { - JLOG(j_.error()) << "shard " << shardIndex << " " << msg; - } + JLOG(j_.error()) << + "shard " << shardIndex << " failed to import"; shard.reset(); - move(dstDir, srcDir); + renameDir(dstDir, srcDir); return false; - }; - - if (!shard->open(scheduler_, *ctx_)) - return fail({}); - if (!shard->complete()) - return fail("is incomplete"); - - try - { - // Verify database integrity - shard->getBackend()->verify(); - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); } - // Validate shard ledgers - if (validate) + std::lock_guard lock(mutex_); + auto const it {shards_.find(shardIndex)}; + if (it == shards_.end() || + it->second.shard || + it->second.state != ShardInfo::State::import) { - // Shard validation requires releasing the lock - // so the database can fetch data from it - it->second = shard.get(); - lock.unlock(); - auto const valid {shard->validate()}; - lock.lock(); - if (!valid) - { - it = preShards_.find(shardIndex); - if(it != preShards_.end()) - it->second = nullptr; - return fail("failed validation"); - } + JLOG(j_.error()) << + "shard " << shardIndex << " failed to import"; + return false; } - // Add the shard - complete_.emplace(shardIndex, std::move(shard)); - preShards_.erase(shardIndex); - - std::lock_guard lockg(*lock.release(), std::adopt_lock); - setFileStats(lockg); - updateStatus(lockg); + it->second.shard = std::move(shard); + finalizeShard(it->second, true, lock); return true; } std::shared_ptr DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) { - if (!contains(seq)) - return {}; + auto const shardIndex {seqToShardIndex(seq)}; + { + ShardInfo shardInfo; + { + std::lock_guard lock(mutex_); + assert(init_); + + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + shardInfo = it->second; + else + return {}; + } - auto nObj = fetch(hash, seq); + // Check if the ledger is stored in a final shard + // or in the shard being acquired + switch (shardInfo.state) + { + case ShardInfo::State::final: + break; + case ShardInfo::State::acquire: + if (shardInfo.shard->contains(seq)) + break; + [[fallthrough]]; + default: + return {}; + } + } + + auto nObj {fetch(hash, seq)}; if (!nObj) return {}; @@ -549,69 +567,79 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) void DatabaseShardImp::setStored(std::shared_ptr const& ledger) { - auto const shardIndex {seqToShardIndex(ledger->info().seq)}; - auto fail = [j = j_, shardIndex](std::string const& msg) - { - JLOG(j.error()) << "shard " << shardIndex << " " << msg; - }; + auto const seq {ledger->info().seq}; + auto const shardIndex {seqToShardIndex(seq)}; if (ledger->info().hash.isZero()) { - return fail("encountered a zero ledger hash on sequence " + - std::to_string(ledger->info().seq)); + JLOG(j_.error()) << + "shard " << shardIndex << + " zero ledger hash on sequence " << std::to_string(seq); + return; } if (ledger->info().accountHash.isZero()) { - return fail("encountered a zero account hash on sequence " + - std::to_string(ledger->info().seq)); + JLOG(j_.error()) << + "shard " << shardIndex << + " zero account hash on sequence " << std::to_string(seq); + return; } - std::lock_guard lock(m_); - assert(init_); - - if (!incomplete_ || shardIndex != incomplete_->index()) + std::shared_ptr shard; { - return fail("ledger sequence " + std::to_string(ledger->info().seq) + - " is not being acquired"); + std::lock_guard lock(mutex_); + assert(init_); + + if (shardIndex != acquireIndex_) + { + JLOG(j_.trace()) << + "shard " << shardIndex << " is not being acquired"; + return; + } + + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + shard = it->second.shard; + else + { + JLOG(j_.error()) << + "shard " << shardIndex << " is not being acquired"; + return; + } } - if (!incomplete_->setStored(ledger)) + + if (!shard->store(ledger)) return; - if (incomplete_->complete()) + + auto const complete {shard->isBackendComplete()}; { - complete_.emplace(incomplete_->index(), std::move(incomplete_)); - incomplete_.reset(); - updateStatus(lock); + std::lock_guard lock(mutex_); + if (complete) + { + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + { + if (shardIndex == acquireIndex_) + acquireIndex_ = 0; - // Update peers with new shard index - protocol::TMPeerShardInfo message; - PublicKey const& publicKey {app_.nodeIdentity().first}; - message.set_nodepubkey(publicKey.data(), publicKey.size()); - message.set_shardindexes(std::to_string(shardIndex)); - app_.overlay().foreach(send_always( - std::make_shared(message, protocol::mtPEER_SHARD_INFO))); + if (it->second.state != ShardInfo::State::finalize) + finalizeShard(it->second, false, lock); + } + else + { + JLOG(j_.debug()) << + "shard " << shardIndex << + " is no longer being acquired"; + return; + } + } } - setFileStats(lock); -} - -bool -DatabaseShardImp::contains(std::uint32_t seq) -{ - auto const shardIndex {seqToShardIndex(seq)}; - std::lock_guard lock(m_); - assert(init_); - - if (complete_.find(shardIndex) != complete_.end()) - return true; - if (incomplete_ && incomplete_->index() == shardIndex) - return incomplete_->contains(seq); - return false; + setFileStats(); } std::string DatabaseShardImp::getCompleteShards() { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); return status_; @@ -620,36 +648,49 @@ DatabaseShardImp::getCompleteShards() void DatabaseShardImp::validate() { - std::vector> completeShards; + std::vector> wptrShards; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - if (complete_.empty()) - { - JLOG(j_.error()) << "no shards found to validate"; + // Only shards with a state of final should be validated + for (auto& e : shards_) + if (e.second.state == ShardInfo::State::final) + wptrShards.push_back(e.second.shard); + + if (wptrShards.empty()) return; - } JLOG(j_.debug()) << "Validating shards " << status_; - - completeShards.reserve(complete_.size()); - for (auto const& shard : complete_) - completeShards.push_back(shard.second); } - // Verify each complete stored shard - for (auto const& shard : completeShards) - shard->validate(); + for (auto const& wptr : wptrShards) + { + if (auto shard {wptr.lock()}; shard) + shard->finalize(true); + } app_.shardFamily()->reset(); } +void +DatabaseShardImp::onStop() +{ + { + std::lock_guard lock(mutex_); + for (auto const& e : shards_) + if (e.second.shard) + e.second.shard->stop(); + } + + Database::onStop(); +} + void DatabaseShardImp::import(Database& source) { { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); // Only the application local node store can be imported @@ -729,10 +770,10 @@ DatabaseShardImp::import(Database& source) } // Skip if already stored - if (complete_.find(shardIndex) != complete_.end() || - (incomplete_ && incomplete_->index() == shardIndex)) + if (shards_.find(shardIndex) != shards_.end()) { - JLOG(j_.debug()) << "shard " << shardIndex << " already exists"; + JLOG(j_.debug()) << + "shard " << shardIndex << " already exists"; continue; } @@ -781,8 +822,7 @@ DatabaseShardImp::import(Database& source) JLOG(j_.error()) << "shard " << shardIndex << " is unable to create temp marker file"; - shard.reset(); - removeAll(shardDir, j_); + remove_all(shardDir); continue; } ofs.close(); @@ -790,38 +830,58 @@ DatabaseShardImp::import(Database& source) // Copy the ledgers from node store while (auto seq = shard->prepare()) { - auto ledger = loadByIndex(*seq, app_, false); - if (!ledger || ledger->info().seq != seq || - !Database::copyLedger(*shard->getBackend(), *ledger, - nullptr, nullptr, shard->lastStored())) + auto ledger {loadByIndex(*seq, app_, false)}; + if (!ledger || ledger->info().seq != seq) break; - if (!shard->setStored(ledger)) + std::shared_ptr backend; + std::shared_ptr lastStored; + std::tie(backend, std::ignore, std::ignore, lastStored) = + shard->getBackendAll(); + + if (!Database::copyLedger( + *ledger, + backend, + nullptr, + nullptr, + lastStored)) + { break; - if (shard->complete()) + } + + if (!shard->store(ledger)) + break; + if (shard->isBackendComplete()) { JLOG(j_.debug()) << "shard " << shardIndex << " was successfully imported"; - removeAll(markerFile, j_); + try + { + boost::filesystem::remove_all(markerFile); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "shard " << shardIndex << + " failed to import." << + " exception " << e.what() << + " in function " << __func__; + } break; } } - if (!shard->complete()) + if (!shard->isBackendComplete()) { JLOG(j_.error()) << "shard " << shardIndex << " failed to import"; - shard.reset(); - removeAll(shardDir, j_); + remove_all(shardDir); } - else - setFileStats(lock); } // Re initialize the shard store init_ = false; - complete_.clear(); - incomplete_.reset(); + shards_.clear(); } if (!init()) @@ -831,63 +891,85 @@ DatabaseShardImp::import(Database& source) std::int32_t DatabaseShardImp::getWriteLoad() const { - std::int32_t wl {0}; + std::shared_ptr shard; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - for (auto const& e : complete_) - wl += e.second->getBackend()->getWriteLoad(); - if (incomplete_) - wl += incomplete_->getBackend()->getWriteLoad(); + if (auto const it {shards_.find(acquireIndex_)}; it != shards_.end()) + shard = it->second.shard; + else + return 0; } - return wl; + + return shard->getBackend()->getWriteLoad(); } void -DatabaseShardImp::store(NodeObjectType type, - Blob&& data, uint256 const& hash, std::uint32_t seq) +DatabaseShardImp::store( + NodeObjectType type, + Blob&& data, + uint256 const& hash, + std::uint32_t seq) { #if RIPPLE_VERIFY_NODEOBJECT_KEYS assert(hash == sha512Hash(makeSlice(data))); #endif - std::shared_ptr nObj; + auto const shardIndex {seqToShardIndex(seq)}; + std::shared_ptr shard; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - if (!incomplete_ || shardIndex != incomplete_->index()) + if (shardIndex != acquireIndex_) + { + JLOG(j_.trace()) << + "shard " << shardIndex << " is not being acquired"; + return; + } + + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + shard = it->second.shard; + else { - JLOG(j_.warn()) << - "shard " << shardIndex << - " ledger sequence " << seq << - " is not being acquired"; + JLOG(j_.error()) << + "shard " << shardIndex << " is not being acquired"; return; } - nObj = NodeObject::createObject( - type, std::move(data), hash); - incomplete_->pCache()->canonicalize(hash, nObj, true); - incomplete_->getBackend()->store(nObj); - incomplete_->nCache()->erase(hash); } + + std::shared_ptr backend; + std::shared_ptr pCache; + std::shared_ptr nCache; + std::tie(backend, pCache, nCache, std::ignore) = + shard->getBackendAll(); + + auto nObj {NodeObject::createObject(type, std::move(data), hash)}; + + pCache->canonicalize(hash, nObj, true); + backend->store(nObj); + nCache->erase(hash); + storeStats(nObj->getData().size()); } std::shared_ptr DatabaseShardImp::fetch(uint256 const& hash, std::uint32_t seq) { - auto cache {selectCache(seq)}; + auto cache {getCache(seq)}; if (cache.first) return doFetch(hash, seq, *cache.first, *cache.second, false); return {}; } bool -DatabaseShardImp::asyncFetch(uint256 const& hash, - std::uint32_t seq, std::shared_ptr& object) +DatabaseShardImp::asyncFetch( + uint256 const& hash, + std::uint32_t seq, + std::shared_ptr& object) { - auto cache {selectCache(seq)}; + auto cache {getCache(seq)}; if (cache.first) { // See if the object is in cache @@ -901,38 +983,73 @@ DatabaseShardImp::asyncFetch(uint256 const& hash, } bool -DatabaseShardImp::copyLedger(std::shared_ptr const& ledger) +DatabaseShardImp::copyLedger(std::shared_ptr const& srcLedger) { - auto const shardIndex {seqToShardIndex(ledger->info().seq)}; - std::lock_guard lock(m_); - assert(init_); + if (isStopping()) + return false; - if (!incomplete_ || shardIndex != incomplete_->index()) + auto const seq {srcLedger->info().seq}; + auto const shardIndex {seqToShardIndex(seq)}; + std::shared_ptr shard; { - JLOG(j_.warn()) << - "shard " << shardIndex << - " source ledger sequence " << ledger->info().seq << - " is not being acquired"; - return false; + std::lock_guard lock(mutex_); + assert(init_); + + if (shardIndex != acquireIndex_) + { + JLOG(j_.trace()) << + "shard " << shardIndex << " is not being acquired"; + return false; + } + + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + shard = it->second.shard; + else + { + JLOG(j_.error()) << + "shard " << shardIndex << " is not being acquired"; + return false; + } } - if (!Database::copyLedger(*incomplete_->getBackend(), *ledger, - incomplete_->pCache(), incomplete_->nCache(), - incomplete_->lastStored())) + if (shard->contains(seq)) { + JLOG(j_.trace()) << + "shard " << shardIndex << " ledger already stored"; return false; } - if (!incomplete_->setStored(ledger)) + auto [backend, pCache, nCache, lastStored] = shard->getBackendAll(); + if (!Database::copyLedger(*srcLedger, backend, pCache, nCache, lastStored)) return false; - if (incomplete_->complete()) + + if (!shard->store(srcLedger)) + return false; + + auto const complete {shard->isBackendComplete()}; { - complete_.emplace(incomplete_->index(), std::move(incomplete_)); - incomplete_.reset(); - updateStatus(lock); + std::lock_guard lock(mutex_); + if (complete) + { + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + { + if (shardIndex == acquireIndex_) + acquireIndex_ = 0; + + if (it->second.state != ShardInfo::State::finalize) + finalizeShard(it->second, false, lock); + } + else + { + JLOG(j_.debug()) << + "shard " << shardIndex << + " is no longer being acquired"; + return false; + } + } } - setFileStats(lock); + setFileStats(); return true; } @@ -940,85 +1057,91 @@ int DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; + std::shared_ptr shard; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - auto it = complete_.find(shardIndex); - if (it != complete_.end()) - return it->second->pCache()->getTargetSize() / asyncDivider; - if (incomplete_ && incomplete_->index() == shardIndex) - return incomplete_->pCache()->getTargetSize() / asyncDivider; + if (auto const it {shards_.find(shardIndex)}; + it != shards_.end() && + (it->second.state == ShardInfo::State::final || + it->second.state == ShardInfo::State::acquire)) + { + shard = it->second.shard; + } + else + return 0; } - return cacheTargetSize / asyncDivider; + + return shard->pCache()->getTargetSize() / asyncDivider; } float DatabaseShardImp::getCacheHitRate() { - float sz, f {0}; + std::shared_ptr shard; { - std::lock_guard lock(m_); + std::lock_guard lock(mutex_); assert(init_); - sz = complete_.size(); - for (auto const& e : complete_) - f += e.second->pCache()->getHitRate(); - if (incomplete_) - { - f += incomplete_->pCache()->getHitRate(); - ++sz; - } + if (auto const it {shards_.find(acquireIndex_)}; it != shards_.end()) + shard = it->second.shard; + else + return 0; } - return f / std::max(1.0f, sz); + + return shard->pCache()->getHitRate(); } void DatabaseShardImp::sweep() { - std::lock_guard lock(m_); - assert(init_); + std::vector> wptrShards; + { + std::lock_guard lock(mutex_); + assert(init_); - for (auto const& e : complete_) - e.second->sweep(); + for (auto const& e : shards_) + if (e.second.state == ShardInfo::State::final || + e.second.state == ShardInfo::State::acquire) + { + wptrShards.push_back(e.second.shard); + } + } - if (incomplete_) - incomplete_->sweep(); + for (auto const& wptr : wptrShards) + { + if (auto shard {wptr.lock()}; shard) + shard->sweep(); + } } std::shared_ptr DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; - std::unique_lock lock(m_); - assert(init_); + std::shared_ptr shard; { - auto it = complete_.find(shardIndex); - if (it != complete_.end()) + std::lock_guard lock(mutex_); + assert(init_); + + if (auto const it {shards_.find(shardIndex)}; + it != shards_.end() && + it->second.shard) { - lock.unlock(); - return fetchInternal(hash, *it->second->getBackend()); + shard = it->second.shard; } - } - if (incomplete_ && incomplete_->index() == shardIndex) - { - lock.unlock(); - return fetchInternal(hash, *incomplete_->getBackend()); + else + return {}; } - // Used to validate import shards - auto it = preShards_.find(shardIndex); - if (it != preShards_.end() && it->second) - { - lock.unlock(); - return fetchInternal(hash, *it->second->getBackend()); - } - return {}; + return fetchInternal(hash, shard->getBackend()); } boost::optional -DatabaseShardImp::findShardIndexToAdd( - std::uint32_t validLedgerSeq, std::lock_guard&) +DatabaseShardImp::findAcquireIndex( + std::uint32_t validLedgerSeq, + std::lock_guard&) { auto const maxShardIndex {[this, validLedgerSeq]() { @@ -1027,31 +1150,25 @@ DatabaseShardImp::findShardIndexToAdd( --shardIndex; return shardIndex; }()}; - auto const numShards {complete_.size() + - (incomplete_ ? 1 : 0) + preShards_.size()}; // Check if the shard store has all shards - if (numShards >= maxShardIndex) + if (shards_.size() >= maxShardIndex) return boost::none; if (maxShardIndex < 1024 || - static_cast(numShards) / maxShardIndex > 0.5f) + static_cast(shards_.size()) / maxShardIndex > 0.5f) { // Small or mostly full index space to sample // Find the available indexes and select one at random std::vector available; - available.reserve(maxShardIndex - numShards + 1); + available.reserve(maxShardIndex - shards_.size() + 1); for (auto shardIndex = earliestShardIndex(); shardIndex <= maxShardIndex; ++shardIndex) { - if (complete_.find(shardIndex) == complete_.end() && - (!incomplete_ || incomplete_->index() != shardIndex) && - preShards_.find(shardIndex) == preShards_.end()) - { + if (shards_.find(shardIndex) == shards_.end()) available.push_back(shardIndex); - } } if (available.empty()) @@ -1070,12 +1187,8 @@ DatabaseShardImp::findShardIndexToAdd( for (int i = 0; i < 40; ++i) { auto const shardIndex {rand_int(earliestShardIndex(), maxShardIndex)}; - if (complete_.find(shardIndex) == complete_.end() && - (!incomplete_ || incomplete_->index() != shardIndex) && - preShards_.find(shardIndex) == preShards_.end()) - { + if (shards_.find(shardIndex) == shards_.end()) return shardIndex; - } } assert(false); @@ -1083,33 +1196,134 @@ DatabaseShardImp::findShardIndexToAdd( } void -DatabaseShardImp::setFileStats(std::lock_guard&) +DatabaseShardImp::finalizeShard( + ShardInfo& shardInfo, + bool writeSQLite, + std::lock_guard&) { - fileSz_ = 0; - fdRequired_ = 0; - if (!complete_.empty()) + assert(shardInfo.shard); + assert(shardInfo.shard->index() != acquireIndex_); + assert(shardInfo.shard->isBackendComplete()); + assert(shardInfo.state != ShardInfo::State::finalize); + + auto const shardIndex {shardInfo.shard->index()}; + + shardInfo.state = ShardInfo::State::finalize; + taskQueue_->addTask([this, shardIndex, writeSQLite]() mutable { - for (auto const& e : complete_) + if (isStopping()) + return; + + std::shared_ptr shard; { - fileSz_ += e.second->fileSize(); - fdRequired_ += e.second->fdRequired(); + std::lock_guard lock(mutex_); + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + shard = it->second.shard; + else + { + JLOG(j_.error()) << + "Unable to finalize shard " << shardIndex; + return; + } } - avgShardFileSz_ = fileSz_ / complete_.size(); - } - else - avgShardFileSz_ = 0; - if (incomplete_) + if (!shard->finalize(writeSQLite)) + { + if (isStopping()) + return; + + // Finalize failed, remove shard + { + std::lock_guard lock(mutex_); + shards_.erase(shardIndex); + + using namespace boost::filesystem; + path const dir {shard->getDir()}; + shard.reset(); + try + { + remove_all(dir); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << "exception " << e.what(); + } + + updateStatus(lock); + } + + setFileStats(); + return; + } + + if (isStopping()) + return; + + { + std::lock_guard lock(mutex_); + auto const it {shards_.find(shardIndex)}; + if (it == shards_.end()) + return; + it->second.state = ShardInfo::State::final; + updateStatus(lock); + } + + setFileStats(); + + // Update peers with new shard index + if (!app_.config().standalone() && + app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED) + { + protocol::TMPeerShardInfo message; + PublicKey const& publicKey {app_.nodeIdentity().first}; + message.set_nodepubkey(publicKey.data(), publicKey.size()); + message.set_shardindexes(std::to_string(shardIndex)); + app_.overlay().foreach(send_always( + std::make_shared( + message, + protocol::mtPEER_SHARD_INFO))); + } + }); +} + +void +DatabaseShardImp::setFileStats() +{ + std::vector> wptrShards; { - fileSz_ += incomplete_->fileSize(); - fdRequired_ += incomplete_->fdRequired(); + std::lock_guard lock(mutex_); + assert(init_); + + fileSz_ = 0; + fdRequired_ = 0; + avgShardFileSz_ = 0; + + if (shards_.empty()) + return; + + for (auto const& e : shards_) + if (e.second.shard) + wptrShards.push_back(e.second.shard); } - if (!backed_) - return; + std::uint64_t sumSz {0}; + std::uint32_t sumFd {0}; + std::uint32_t numShards {0}; + for (auto const& wptr : wptrShards) + { + if (auto shard {wptr.lock()}; shard) + { + auto[sz, fd] = shard->fileInfo(); + sumSz += sz; + sumFd += fd; + ++numShards; + } + } - // Require at least 15 file descriptors - fdRequired_ = std::max(fdRequired_, 15); + std::lock_guard lock(mutex_); + fileSz_ = sumSz; + fdRequired_ = sumFd; + avgShardFileSz_ = fileSz_ / numShards; if (fileSz_ >= maxFileSz_) { @@ -1126,11 +1340,12 @@ DatabaseShardImp::setFileStats(std::lock_guard&) void DatabaseShardImp::updateStatus(std::lock_guard&) { - if (!complete_.empty()) + if (!shards_.empty()) { RangeSet rs; - for (auto const& e : complete_) - rs.insert(e.second->index()); + for (auto const& e : shards_) + if (e.second.state == ShardInfo::State::final) + rs.insert(e.second.shard->index()); status_ = to_string(rs); } else @@ -1138,32 +1353,28 @@ DatabaseShardImp::updateStatus(std::lock_guard&) } std::pair, std::shared_ptr> -DatabaseShardImp::selectCache(std::uint32_t seq) +DatabaseShardImp::getCache(std::uint32_t seq) { auto const shardIndex {seqToShardIndex(seq)}; - std::lock_guard lock(m_); - assert(init_); - + std::shared_ptr shard; { - auto it = complete_.find(shardIndex); - if (it != complete_.end()) + std::lock_guard lock(mutex_); + assert(init_); + + if (auto const it {shards_.find(shardIndex)}; + it != shards_.end() && it->second.shard) { - return std::make_pair(it->second->pCache(), - it->second->nCache()); + shard = it->second.shard; } + else + return {}; } - if (incomplete_ && incomplete_->index() == shardIndex) - { - return std::make_pair(incomplete_->pCache(), - incomplete_->nCache()); - } + std::shared_ptr pCache; + std::shared_ptr nCache; + std::tie(std::ignore, pCache, nCache, std::ignore) = shard->getBackendAll(); - // Used to validate import shards - auto it = preShards_.find(shardIndex); - if (it != preShards_.end() && it->second) - return std::make_pair(it->second->pCache(), it->second->nCache()); - return {}; + return std::make_pair(pCache, nCache); } std::uint64_t @@ -1175,8 +1386,8 @@ DatabaseShardImp::available() const } catch (std::exception const& e) { - JLOG(j_.error()) << "exception " << e.what() << - " in function " << __func__; + JLOG(j_.error()) << + "exception " << e.what() << " in function " << __func__; return 0; } } @@ -1197,19 +1408,13 @@ make_ShardStore( if (section.empty()) return nullptr; - auto shardStore = std::make_unique( + return std::make_unique( app, parent, "ShardStore", scheduler, readThreads, j); - if (shardStore->init()) - shardStore->setParent(parent); - else - shardStore.reset(); - - return shardStore; } } // NodeStore diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 9d90e4d5baa..d279d92bf3a 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -22,10 +22,13 @@ #include #include +#include namespace ripple { namespace NodeStore { +class TaskQueue; + class DatabaseShardImp : public DatabaseShard { public: @@ -61,8 +64,9 @@ class DatabaseShardImp : public DatabaseShard getPreShards() override; bool - importShard(std::uint32_t shardIndex, - boost::filesystem::path const& srcDir, bool validate) override; + importShard( + std::uint32_t shardIndex, + boost::filesystem::path const& srcDir) override; std::shared_ptr fetchLedger(uint256 const& hash, std::uint32_t seq) override; @@ -70,9 +74,6 @@ class DatabaseShardImp : public DatabaseShard void setStored(std::shared_ptr const& ledger) override; - bool - contains(std::uint32_t seq) override; - std::string getCompleteShards() override; @@ -126,6 +127,9 @@ class DatabaseShardImp : public DatabaseShard return backendName_; } + void + onStop() override; + /** Import the application local node store @param source The application node store. @@ -137,18 +141,23 @@ class DatabaseShardImp : public DatabaseShard getWriteLoad() const override; void - store(NodeObjectType type, Blob&& data, - uint256 const& hash, std::uint32_t seq) override; + store( + NodeObjectType type, + Blob&& data, + uint256 const& hash, + std::uint32_t seq) override; std::shared_ptr fetch(uint256 const& hash, std::uint32_t seq) override; bool - asyncFetch(uint256 const& hash, std::uint32_t seq, + asyncFetch( + uint256 const& hash, + std::uint32_t seq, std::shared_ptr& object) override; bool - copyLedger(std::shared_ptr const& ledger) override; + copyLedger(std::shared_ptr const& srcLedger) override; int getDesiredAsyncReadCount(std::uint32_t seq) override; @@ -163,21 +172,43 @@ class DatabaseShardImp : public DatabaseShard sweep() override; private: + struct ShardInfo + { + enum class State + { + none, + final, // Immutable, complete and validated + acquire, // Being acquired + import, // Being imported + finalize // Being finalized + }; + + ShardInfo() = default; + ShardInfo(std::shared_ptr shard_, State state_) + : shard(std::move(shard_)) + , state(state_) + {} + + std::shared_ptr shard; + State state {State::none}; + }; + Application& app_; - mutable std::mutex m_; + Stoppable& parent_; + mutable std::mutex mutex_; bool init_ {false}; // The context shared with all shard backend databases std::unique_ptr ctx_; - // Complete shards - std::map> complete_; + // Queue of background tasks to be performed + std::unique_ptr taskQueue_; - // A shard being acquired from the peer network - std::unique_ptr incomplete_; + // Shards held by this server + std::map shards_; - // Shards prepared for import - std::map preShards_; + // Shard index being acquired from the peer network + std::uint32_t acquireIndex_ {0}; // The shard store root directory boost::filesystem::path dir_; @@ -188,9 +219,6 @@ class DatabaseShardImp : public DatabaseShard // Complete shard indexes std::string status_; - // If backend type uses permanent storage - bool backed_; - // The name associated with the backend used with the shard store std::string backendName_; @@ -223,17 +251,25 @@ class DatabaseShardImp : public DatabaseShard Throw("Shard store import not supported"); } - // Finds a random shard index that is not stored + // Randomly select a shard index not stored // Lock must be held boost::optional - findShardIndexToAdd( + findAcquireIndex( std::uint32_t validLedgerSeq, std::lock_guard&); - // Set storage and file descriptor usage stats + // Queue a task to finalize a shard by validating its databases // Lock must be held void - setFileStats(std::lock_guard&); + finalizeShard( + ShardInfo& shardInfo, + bool writeSQLite, + std::lock_guard&); + + // Set storage and file descriptor usage stats + // Lock must NOT be held + void + setFileStats(); // Update status string // Lock must be held @@ -241,7 +277,7 @@ class DatabaseShardImp : public DatabaseShard updateStatus(std::lock_guard&); std::pair, std::shared_ptr> - selectCache(std::uint32_t seq); + getCache(std::uint32_t seq); // Returns available storage space std::uint64_t diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 2b685a661c7..20c3b821ef3 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -26,15 +26,13 @@ #include #include -#include -#include #include -#include - namespace ripple { namespace NodeStore { +uint256 const Shard::finalKey_; + Shard::Shard( Application& app, DatabaseShard const& db, @@ -47,7 +45,6 @@ Shard::Shard( , maxLedgers_(index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1 : db.ledgersPerShard()) , dir_(db.getRootDir() / std::to_string(index_)) - , control_(dir_ / controlFileName) , j_(j) { if (index_ < db.earliestShardIndex()) @@ -57,27 +54,29 @@ Shard::Shard( bool Shard::open(Scheduler& scheduler, nudb::context& ctx) { - using namespace boost::filesystem; - std::lock_guard lock(mutex_); + std::lock_guard lock {mutex_}; assert(!backend_); Config const& config {app_.config()}; - Section section {config.section(ConfigSection::shardDatabase())}; - std::string const type (get(section, "type", "nudb")); - auto factory {Manager::instance().find(type)}; - if (!factory) { - JLOG(j_.error()) << - "shard " << index_ << - " failed to create backend type " << type; - return false; - } + Section section {config.section(ConfigSection::shardDatabase())}; + std::string const type {get(section, "type", "nudb")}; + auto factory {Manager::instance().find(type)}; + if (!factory) + { + JLOG(j_.error()) << + "shard " << index_ << + " failed to create backend type " << type; + return false; + } - section.set("path", dir_.string()); - backend_ = factory->createInstance( - NodeObject::keyBytes, section, scheduler, ctx, j_); + section.set("path", dir_.string()); + backend_ = factory->createInstance( + NodeObject::keyBytes, section, scheduler, ctx, j_); + } - auto const preexist {exists(dir_)}; + using namespace boost::filesystem; + auto preexist {false}; auto fail = [this, preexist](std::string const& msg) { pCache_.reset(); @@ -89,64 +88,111 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) lastStored_.reset(); if (!preexist) - removeAll(dir_, j_); + remove_all(dir_); if (!msg.empty()) { - JLOG(j_.error()) << - "shard " << index_ << " " << msg; + JLOG(j_.fatal()) << "shard " << index_ << " " << msg; } return false; }; + auto openIncompltDB = [this, &config]() + { + DatabaseCon::Setup setup; + setup.startUp = config.START_UP; + setup.standAlone = config.standalone(); + setup.dataDir = dir_; + + acquireSQLiteDB_ = std::make_unique( + setup, + AcquireDBName, + AcquireDBPragma, + AcquireDBInit); + acquireSQLiteDB_->setupCheckpointing( + &app_.getJobQueue(), + app_.logs()); + }; + try { - // Open/Create the NuDB key/value store for node objects + // Open or create the NuDB key/value store + preexist = exists(dir_); backend_->open(!preexist); - if (!backend_->backed()) - return true; - if (!preexist) { - // New shard, create a control file - if (!saveControl(lock)) - return fail({}); + // A new shard + openIncompltDB(); + acquireSQLiteDB_->getSession() << + "INSERT INTO Shard (ShardIndex) " + "VALUES (:shardIndex);" + , soci::use(index_); } - else if (is_regular_file(control_)) + else if (exists(dir_ / AcquireDBName)) { - // Incomplete shard, inspect control file - std::ifstream ifs(control_.string()); - if (!ifs.is_open()) - return fail("failed to open control file"); + // An incomplete shard, being acquired + openIncompltDB(); - boost::archive::text_iarchive ar(ifs); - ar & storedSeqs_; - if (!storedSeqs_.empty()) + boost::optional index; + soci::blob sociBlob(acquireSQLiteDB_->getSession()); + soci::indicator blobPresent; + + acquireSQLiteDB_->getSession() << + "SELECT ShardIndex, StoredLedgerSeqs " + "FROM Shard " + "WHERE ShardIndex = :index;" + , soci::into(index) + , soci::into(sociBlob, blobPresent) + , soci::use(index_); + + if (!index || index != index_) + return fail("invalid acquire SQLite database"); + + if (blobPresent == soci::i_ok) { + std::string s; + if (convert(sociBlob, s); !from_string(storedSeqs_, s)) + return fail("invalid StoredLedgerSeqs"); + if (boost::icl::first(storedSeqs_) < firstSeq_ || boost::icl::last(storedSeqs_) > lastSeq_) { - return fail("has an invalid control file"); + return fail("invalid StoredLedgerSeqs"); } - if (boost::icl::length(storedSeqs_) >= maxLedgers_) + if (boost::icl::length(storedSeqs_) == maxLedgers_) { - JLOG(j_.warn()) << - "shard " << index_ << - " has a control file for complete shard"; - setComplete(lock); + storedSeqs_.clear(); + backendComplete_ = true; } } } else - setComplete(lock); - - if (!complete_) { - setCache(lock); - if (!initSQLite(lock) ||!setFileStats(lock)) - return fail({}); + // A finalized shard or has all ledgers stored in the backend + std::shared_ptr nObj; + if (backend_->fetch(finalKey_.begin(), &nObj) != Status::ok) + { + legacy_ = true; + return fail("incompatible, missing backend final key"); + } + + // Check final key's value + SerialIter sIt(nObj->getData().data(), nObj->getData().size()); + if (sIt.get32() != version_) + return fail("invalid version"); + + if (sIt.get32() != firstSeq_ || sIt.get32() != lastSeq_) + return fail("out of range ledger sequences"); + + if (sIt.get256().isZero()) + return fail("invalid last ledger hash"); + + if (exists(dir_ / LgrDBName) && exists(dir_ / TxDBName)) + final_ = true; + + backendComplete_ = true; } } catch (std::exception const& e) @@ -155,56 +201,66 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) e.what() + " in function " + __func__); } + setBackendCache(lock); + if (!initSQLite(lock) || !setFileStats(lock)) + return fail({}); + return true; } +boost::optional +Shard::prepare() +{ + std::lock_guard lock(mutex_); + assert(backend_); + + if (storedSeqs_.empty()) + return lastSeq_; + return prevMissing(storedSeqs_, 1 + lastSeq_, firstSeq_); +} + bool -Shard::setStored(std::shared_ptr const& ledger) +Shard::store(std::shared_ptr const& ledger) { + auto const seq {ledger->info().seq}; + std::lock_guard lock(mutex_); - assert(backend_ && !complete_); + assert(backend_); - if (boost::icl::contains(storedSeqs_, ledger->info().seq)) + if (backendComplete_ || boost::icl::contains(storedSeqs_, seq)) { JLOG(j_.debug()) << "shard " << index_ << - " has ledger sequence " << ledger->info().seq << " already stored"; + " has ledger sequence " << seq << + " already stored"; return false; } - if (!setSQLiteStored(ledger, lock)) + storedSeqs_.insert(seq); + lastStored_ = ledger; + + if (!storeSQLite(ledger, lock)) return false; - // Check if the shard is complete - if (boost::icl::length(storedSeqs_) >= maxLedgers_ - 1) - setComplete(lock); - else + if (boost::icl::length(storedSeqs_) >= maxLedgers_) { - storedSeqs_.insert(ledger->info().seq); - if (backend_->backed() && !saveControl(lock)) + storedSeqs_.clear(); + lastStored_.reset(); + backendComplete_ = true; + setBackendCache(lock); + + if (!initSQLite(lock) || !setFileStats(lock)) return false; } JLOG(j_.debug()) << "shard " << index_ << - " stored ledger sequence " << ledger->info().seq << - (complete_ ? " and is complete" : ""); + " stored ledger sequence " << seq << + (backendComplete_ ? " . All ledgers stored" : ""); - lastStored_ = ledger; return true; } -boost::optional -Shard::prepare() -{ - std::lock_guard lock(mutex_); - assert(backend_ && !complete_); - - if (storedSeqs_.empty()) - return lastSeq_; - return prevMissing(storedSeqs_, 1 + lastSeq_, firstSeq_); -} - bool Shard::contains(std::uint32_t seq) const { @@ -214,7 +270,7 @@ Shard::contains(std::uint32_t seq) const std::lock_guard lock(mutex_); assert(backend_); - return complete_ || boost::icl::contains(storedSeqs_, seq); + return backendComplete_ || boost::icl::contains(storedSeqs_, seq); } void @@ -227,7 +283,20 @@ Shard::sweep() nCache_->sweep(); } -std::shared_ptr const& +std::tuple< + std::shared_ptr, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr> +Shard::getBackendAll() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return {backend_, pCache_, nCache_, lastStored_}; +} + +std::shared_ptr Shard::getBackend() const { std::lock_guard lock(mutex_); @@ -237,12 +306,12 @@ Shard::getBackend() const } bool -Shard::complete() const +Shard::isBackendComplete() const { std::lock_guard lock(mutex_); assert(backend_); - return complete_; + return backendComplete_; } std::shared_ptr @@ -263,94 +332,165 @@ Shard::nCache() const return nCache_; } -std::uint64_t -Shard::fileSize() const +std::pair +Shard::fileInfo() const { std::lock_guard lock(mutex_); assert(backend_); - return fileSz_; + return {fileSz_, fdRequired_}; } -std::uint32_t -Shard::fdRequired() const +std::shared_ptr +Shard::lastStored() const { std::lock_guard lock(mutex_); assert(backend_); - return fdRequired_; + return lastStored_; } -std::shared_ptr -Shard::lastStored() const +bool +Shard::isFinal() const { std::lock_guard lock(mutex_); assert(backend_); - return lastStored_; + return final_; +} + +bool +Shard::isLegacy() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return legacy_; } bool -Shard::validate() const +Shard::finalize(const bool writeSQLite) { + assert(backend_); + uint256 hash; std::uint32_t seq {0}; auto fail = [j = j_, index = index_, &hash, &seq](std::string const& msg) { - JLOG(j.error()) << + JLOG(j.fatal()) << "shard " << index << ". " << msg << (hash.isZero() ? "" : ". Ledger hash " + to_string(hash)) << (seq == 0 ? "" : ". Ledger sequence " + std::to_string(seq)); return false; }; - std::shared_ptr ledger; - // Find the hash of the last ledger in this shard + try { - std::tie(ledger, seq, hash) = loadLedgerHelper( - "WHERE LedgerSeq >= " + std::to_string(lastSeq_) + - " order by LedgerSeq desc limit 1", app_, false); - if (!ledger) - return fail("Unable to validate due to lacking lookup data"); - - if (seq != lastSeq_) + std::unique_lock lock(mutex_); + if (!backendComplete_) + return fail("incomplete"); + + /* + TODO MP + A lock is required when calling the NuDB verify function. Because + this can be a time consuming process, the server may desync. + Until this function is modified to work on an open database, we + are unable to use it from rippled. + + // Verify backend integrity + backend_->verify(); + */ + + // Check if a final key has been stored + lock.unlock(); + if (std::shared_ptr nObj; + backend_->fetch(finalKey_.begin(), &nObj) == Status::ok) { - boost::optional h; + // Check final key's value + SerialIter sIt(nObj->getData().data(), nObj->getData().size()); + if (sIt.get32() != version_) + return fail("invalid version"); - ledger->setImmutable(app_.config()); - try - { - h = hashOfSeq(*ledger, lastSeq_, j_); - } - catch (std::exception const& e) + if (sIt.get32() != firstSeq_ || sIt.get32() != lastSeq_) + return fail("out of range ledger sequences"); + + if (hash = sIt.get256(); hash.isZero()) + return fail("invalid last ledger hash"); + } + else + { + // In the absence of a final key, an acquiredSQLite database + // must be present in order to validate the shard + lock.lock(); + if (!acquireSQLiteDB_) + return fail("missing acquire SQLite database"); + + boost::optional index; + boost::optional sHash; + soci::blob sociBlob(acquireSQLiteDB_->getSession()); + soci::indicator blobPresent; + acquireSQLiteDB_->getSession() << + "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs " + "FROM Shard " + "WHERE ShardIndex = :index;" + , soci::into(index) + , soci::into(sHash) + , soci::into(sociBlob, blobPresent) + , soci::use(index_); + + lock.unlock(); + if (!index || index != index_) + return fail("missing or invalid ShardIndex"); + + if (!sHash) + return fail("missing LastLedgerHash"); + + if (hash.SetHexExact(*sHash); hash.isZero()) + return fail("invalid LastLedgerHash"); + + if (blobPresent != soci::i_ok) + return fail("missing StoredLedgerSeqs"); + + std::string s; + convert(sociBlob, s); + + lock.lock(); + if (!from_string(storedSeqs_, s)) + return fail("invalid StoredLedgerSeqs"); + + if (boost::icl::first(storedSeqs_) != firstSeq_ || + boost::icl::last(storedSeqs_) != lastSeq_) { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); + return fail("invalid StoredLedgerSeqs"); } - - if (!h) - return fail("Missing hash for last ledger sequence"); - hash = *h; - seq = lastSeq_; } } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } - // Validate every ledger stored in this shard + // Validate every ledger stored in the backend + std::shared_ptr ledger; std::shared_ptr next; + auto const lastLedgerHash {hash}; + + seq = lastSeq_; while (seq >= firstSeq_) { auto nObj = valFetch(hash); if (!nObj) - return fail("Invalid ledger"); + return fail("invalid ledger"); ledger = std::make_shared( InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), app_.config(), *app_.shardFamily()); if (ledger->info().seq != seq) - return fail("Invalid ledger header sequence"); + return fail("invalid ledger header sequence"); if (ledger->info().hash != hash) - return fail("Invalid ledger header hash"); + return fail("invalid ledger header hash"); ledger->stateMap().setLedgerSeq(seq); ledger->txMap().setLedgerSeq(seq); @@ -358,96 +498,133 @@ Shard::validate() const if (!ledger->stateMap().fetchRoot( SHAMapHash {ledger->info().accountHash}, nullptr)) { - return fail("Missing root STATE node"); + return fail("missing root STATE node"); } if (ledger->info().txHash.isNonZero() && !ledger->txMap().fetchRoot( SHAMapHash {ledger->info().txHash}, nullptr)) { - return fail("Missing root TXN node"); + return fail("missing root TXN node"); } if (!valLedger(ledger, next)) return false; + if (writeSQLite) + { + std::lock_guard lock(mutex_); + if (!storeSQLite(ledger, lock)) + return false; + } + hash = ledger->info().parentHash; --seq; next = ledger; } - { - std::lock_guard lock(mutex_); - pCache_->reset(); - nCache_->reset(); - } - JLOG(j_.debug()) << "shard " << index_ << " is valid"; - return true; -} -bool -Shard::setComplete(std::lock_guard const& lock) -{ - // Remove the control file if one exists + /* + TODO MP + SQLite VACUUM blocks all database access while processing. + Depending on the file size, that can take a while. Until we find + a non-blocking way of doing this, we cannot enable vacuum as + it can desync a server. + try { - using namespace boost::filesystem; - if (is_regular_file(control_)) - remove_all(control_); + // VACUUM the SQLite databases + auto const tmpDir {dir_ / "tmp_vacuum"}; + create_directory(tmpDir); + auto vacuum = [&tmpDir](std::unique_ptr& sqliteDB) + { + auto& session {sqliteDB->getSession()}; + session << "PRAGMA synchronous=OFF;"; + session << "PRAGMA journal_mode=OFF;"; + session << "PRAGMA temp_store_directory='" << + tmpDir.string() << "';"; + session << "VACUUM;"; + }; + vacuum(lgrSQLiteDB_); + vacuum(txSQLiteDB_); + remove_all(tmpDir); } catch (std::exception const& e) { - JLOG(j_.error()) << - "shard " << index_ << - " exception " << e.what() << - " in function " << __func__; - return false; + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } + */ + + // Store final key's value, may already be stored + Serializer s; + s.add32(version_); + s.add32(firstSeq_); + s.add32(lastSeq_); + s.add256(lastLedgerHash); + auto nObj {NodeObject::createObject( + hotUNKNOWN, + std::move(s.modData()), + finalKey_)}; + try + { + backend_->store(nObj); - storedSeqs_.clear(); - complete_ = true; + std::lock_guard lock(mutex_); + final_ = true; - setCache(lock); - return initSQLite(lock) && setFileStats(lock); + // Remove the acquire SQLite database if present + if (acquireSQLiteDB_) + acquireSQLiteDB_.reset(); + remove_all(dir_ / AcquireDBName); + + return initSQLite(lock) && setFileStats(lock); + } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } } void -Shard::setCache(std::lock_guard const&) +Shard::setBackendCache(std::lock_guard const&) { - // complete shards use the smallest cache and + // Complete shards use the smallest cache and // fastest expiration to reduce memory consumption. - // The incomplete shard is set according to configuration. + // An incomplete shard is set according to configuration. + + Config const& config {app_.config()}; if (!pCache_) { auto const name {"shard " + std::to_string(index_)}; - auto const sz = app_.config().getValueFor(SizedItem::nodeCacheSize, - complete_ ? boost::optional(0) : boost::none); - auto const age = std::chrono::seconds{ - app_.config().getValueFor(SizedItem::nodeCacheAge, - complete_ ? boost::optional(0) : boost::none)}; + auto const sz {config.getValueFor( + SizedItem::nodeCacheSize, + backendComplete_ ? boost::optional(0) : boost::none)}; + auto const age {std::chrono::seconds{config.getValueFor( + SizedItem::nodeCacheAge, + backendComplete_ ? boost::optional(0) : boost::none)}}; pCache_ = std::make_shared(name, sz, age, stopwatch(), j_); nCache_ = std::make_shared(name, stopwatch(), sz, age); } else { - auto const sz = app_.config().getValueFor( - SizedItem::nodeCacheSize, 0); + auto const sz {config.getValueFor(SizedItem::nodeCacheSize, 0)}; pCache_->setTargetSize(sz); nCache_->setTargetSize(sz); - auto const age = std::chrono::seconds{ - app_.config().getValueFor( - SizedItem::nodeCacheAge, 0)}; + auto const age {std::chrono::seconds{ + config.getValueFor(SizedItem::nodeCacheAge, 0)}}; pCache_->setTargetAge(age); nCache_->setTargetAge(age); } } bool -Shard::initSQLite(std::lock_guard const&) +Shard::initSQLite(std::lock_guard const&) { Config const& config {app_.config()}; DatabaseCon::Setup setup; @@ -457,61 +634,40 @@ Shard::initSQLite(std::lock_guard const&) try { - if (complete_) - { - // Remove WAL files if they exist - using namespace boost::filesystem; - for (auto const& d : directory_iterator(dir_)) - { - if (is_regular_file(d) && - boost::iends_with(extension(d), "-wal")) - { - // Closing the session forces a checkpoint - if (!lgrSQLiteDB_) - { - lgrSQLiteDB_ = std::make_unique ( - setup, - LgrDBName, - LgrDBPragma, - LgrDBInit); - } - lgrSQLiteDB_->getSession().close(); + if (lgrSQLiteDB_) + lgrSQLiteDB_.reset(); - if (!txSQLiteDB_) - { - txSQLiteDB_ = std::make_unique ( - setup, - TxDBName, - TxDBPragma, - TxDBInit); - } - txSQLiteDB_->getSession().close(); - break; - } - } + if (txSQLiteDB_) + txSQLiteDB_.reset(); - lgrSQLiteDB_ = std::make_unique ( + if (backendComplete_) + { + lgrSQLiteDB_ = std::make_unique( setup, LgrDBName, - CompleteShardDBPragma, + CompltDBPragma, LgrDBInit); lgrSQLiteDB_->getSession() << boost::str(boost::format("PRAGMA cache_size=-%d;") % - kilobytes(config.getValueFor(SizedItem::lgrDBCache, boost::none))); + kilobytes(config.getValueFor( + SizedItem::lgrDBCache, + boost::none))); - txSQLiteDB_ = std::make_unique ( + txSQLiteDB_ = std::make_unique( setup, TxDBName, - CompleteShardDBPragma, + CompltDBPragma, TxDBInit); txSQLiteDB_->getSession() << boost::str(boost::format("PRAGMA cache_size=-%d;") % - kilobytes(config.getValueFor(SizedItem::txnDBCache, boost::none))); + kilobytes(config.getValueFor( + SizedItem::txnDBCache, + boost::none))); } else { // The incomplete shard uses a Write Ahead Log for performance - lgrSQLiteDB_ = std::make_unique ( + lgrSQLiteDB_ = std::make_unique( setup, LgrDBName, LgrDBPragma, @@ -521,7 +677,7 @@ Shard::initSQLite(std::lock_guard const&) kilobytes(config.getValueFor(SizedItem::lgrDBCache))); lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs()); - txSQLiteDB_ = std::make_unique ( + txSQLiteDB_ = std::make_unique( setup, TxDBName, TxDBPragma, @@ -534,7 +690,7 @@ Shard::initSQLite(std::lock_guard const&) } catch (std::exception const& e) { - JLOG(j_.error()) << + JLOG(j_.fatal()) << "shard " << index_ << " exception " << e.what() << " in function " << __func__; @@ -544,25 +700,26 @@ Shard::initSQLite(std::lock_guard const&) } bool -Shard::setSQLiteStored( +Shard::storeSQLite( std::shared_ptr const& ledger, - std::lock_guard const&) + std::lock_guard const&) { auto const seq {ledger->info().seq}; - assert(backend_ && !complete_); - assert(!boost::icl::contains(storedSeqs_, seq)); try { + // Update the transactions database { auto& session {txSQLiteDB_->getSession()}; soci::transaction tr(session); session << - "DELETE FROM Transactions WHERE LedgerSeq = :seq;" + "DELETE FROM Transactions " + "WHERE LedgerSeq = :seq;" , soci::use(seq); session << - "DELETE FROM AccountTransactions WHERE LedgerSeq = :seq;" + "DELETE FROM AccountTransactions " + "WHERE LedgerSeq = :seq;" , soci::use(seq); if (ledger->info().txHash.isNonZero()) @@ -585,18 +742,20 @@ Shard::setSQLiteStored( txID, ledger->seq(), *item.second)}; session << - "DELETE FROM AccountTransactions WHERE TransID = :txID;" + "DELETE FROM AccountTransactions " + "WHERE TransID = :txID;" , soci::use(sTxID); auto const& accounts = txMeta->getAffectedAccounts(j_); if (!accounts.empty()) { - auto const s(boost::str(boost::format( + auto const sTxnSeq {std::to_string(txMeta->getIndex())}; + auto const s {boost::str(boost::format( "('%s','%s',%s,%s)") % sTxID % "%s" % sSeq - % std::to_string(txMeta->getIndex()))); + % sTxnSeq)}; std::string sql; sql.reserve((accounts.size() + 1) * 128); sql = "INSERT INTO AccountTransactions " @@ -638,37 +797,81 @@ Shard::setSQLiteStored( tr.commit (); } - auto& session {lgrSQLiteDB_->getSession()}; - soci::transaction tr(session); - - session << - "DELETE FROM Ledgers WHERE LedgerSeq = :seq;" - , soci::use(seq); - session << - "INSERT OR REPLACE INTO Ledgers (" - "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime," - "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash," - "TransSetHash)" - "VALUES (" - ":ledgerHash, :ledgerSeq, :prevHash, :totalCoins, :closingTime," - ":prevClosingTime, :closeTimeRes, :closeFlags, :accountSetHash," - ":transSetHash);", - soci::use(to_string(ledger->info().hash)), - soci::use(seq), - soci::use(to_string(ledger->info().parentHash)), - soci::use(to_string(ledger->info().drops)), - soci::use(ledger->info().closeTime.time_since_epoch().count()), - soci::use(ledger->info().parentCloseTime.time_since_epoch().count()), - soci::use(ledger->info().closeTimeResolution.count()), - soci::use(ledger->info().closeFlags), - soci::use(to_string(ledger->info().accountHash)), - soci::use(to_string(ledger->info().txHash)); - - tr.commit(); + auto const sHash {to_string(ledger->info().hash)}; + + // Update the ledger database + { + auto& session {lgrSQLiteDB_->getSession()}; + soci::transaction tr(session); + + auto const sParentHash {to_string(ledger->info().parentHash)}; + auto const sDrops {to_string(ledger->info().drops)}; + auto const sAccountHash {to_string(ledger->info().accountHash)}; + auto const sTxHash {to_string(ledger->info().txHash)}; + + session << + "DELETE FROM Ledgers " + "WHERE LedgerSeq = :seq;" + , soci::use(seq); + session << + "INSERT OR REPLACE INTO Ledgers (" + "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime," + "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash," + "TransSetHash)" + "VALUES (" + ":ledgerHash, :ledgerSeq, :prevHash, :totalCoins," + ":closingTime, :prevClosingTime, :closeTimeRes," + ":closeFlags, :accountSetHash, :transSetHash);", + soci::use(sHash), + soci::use(seq), + soci::use(sParentHash), + soci::use(sDrops), + soci::use(ledger->info().closeTime.time_since_epoch().count()), + soci::use( + ledger->info().parentCloseTime.time_since_epoch().count()), + soci::use(ledger->info().closeTimeResolution.count()), + soci::use(ledger->info().closeFlags), + soci::use(sAccountHash), + soci::use(sTxHash); + + tr.commit(); + } + + // Update the acquire database if present + if (acquireSQLiteDB_) + { + auto& session {acquireSQLiteDB_->getSession()}; + soci::blob sociBlob(session); + + if (!storedSeqs_.empty()) + convert(to_string(storedSeqs_), sociBlob); + + if (ledger->info().seq == lastSeq_) + { + // Store shard's last ledger hash + session << + "UPDATE Shard " + "SET LastLedgerHash = :lastLedgerHash," + "StoredLedgerSeqs = :storedLedgerSeqs " + "WHERE ShardIndex = :shardIndex;" + , soci::use(sHash) + , soci::use(sociBlob) + , soci::use(index_); + } + else + { + session << + "UPDATE Shard " + "SET StoredLedgerSeqs = :storedLedgerSeqs " + "WHERE ShardIndex = :shardIndex;" + , soci::use(sociBlob) + , soci::use(index_); + } + } } catch (std::exception const& e) { - JLOG(j_.error()) << + JLOG(j_.fatal()) << "shard " << index_ << " exception " << e.what() << " in function " << __func__; @@ -678,60 +881,41 @@ Shard::setSQLiteStored( } bool -Shard::setFileStats(std::lock_guard const&) +Shard::setFileStats(std::lock_guard const&) { fileSz_ = 0; fdRequired_ = 0; - if (backend_->backed()) + try { - try + using namespace boost::filesystem; + for (auto const& d : directory_iterator(dir_)) { - using namespace boost::filesystem; - for (auto const& d : directory_iterator(dir_)) + if (is_regular_file(d)) { - if (is_regular_file(d)) - { - fileSz_ += file_size(d); - ++fdRequired_; - } + fileSz_ += file_size(d); + ++fdRequired_; } } - catch (std::exception const& e) - { - JLOG(j_.error()) << - "shard " << index_ << - " exception " << e.what() << - " in function " << __func__; - return false; - } } - return true; -} - -bool -Shard::saveControl(std::lock_guard const&) -{ - std::ofstream ofs {control_.string(), std::ios::trunc}; - if (!ofs.is_open()) + catch (std::exception const& e) { JLOG(j_.fatal()) << - "shard " << index_ << " is unable to save control file"; + "shard " << index_ << + " exception " << e.what() << + " in function " << __func__; return false; } - - boost::archive::text_oarchive ar(ofs); - ar & storedSeqs_; return true; } bool Shard::valLedger( std::shared_ptr const& ledger, - std::shared_ptr const& next) const + std::shared_ptr const& next) { auto fail = [j = j_, index = index_, &ledger](std::string const& msg) { - JLOG(j.error()) << + JLOG(j.fatal()) << "shard " << index << ". " << msg << (ledger->info().hash.isZero() ? "" : ". Ledger header hash " + @@ -750,6 +934,8 @@ Shard::valLedger( bool error {false}; auto visit = [this, &error](SHAMapAbstractNode& node) { + if (stop_) + return false; if (!valFetch(node.getNodeHash().as_uint256())) error = true; return !error; @@ -773,6 +959,8 @@ Shard::valLedger( return fail(std::string("exception ") + e.what() + " in function " + __func__); } + if (stop_) + return false; if (error) return fail("Invalid state map"); } @@ -792,37 +980,34 @@ Shard::valLedger( return fail(std::string("exception ") + e.what() + " in function " + __func__); } + if (stop_) + return false; if (error) return fail("Invalid transaction map"); } + return true; }; std::shared_ptr -Shard::valFetch(uint256 const& hash) const +Shard::valFetch(uint256 const& hash) { std::shared_ptr nObj; auto fail = [j = j_, index = index_, &hash, &nObj](std::string const& msg) { - JLOG(j.error()) << + JLOG(j.fatal()) << "shard " << index << ". " << msg << ". Node object hash " << to_string(hash); nObj.reset(); return nObj; }; - Status status; try { - { - std::lock_guard lock(mutex_); - status = backend_->fetch(hash.begin(), &nObj); - } - - switch (status) + switch (backend_->fetch(hash.begin(), &nObj)) { case ok: - break; + return nObj; case notFound: return fail("Missing node object"); case dataCorrupt: @@ -836,7 +1021,6 @@ Shard::valFetch(uint256 const& hash) const return fail(std::string("exception ") + e.what() + " in function " + __func__); } - return nObj; } } // NodeStore diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 687de6c20c8..223d11b3ee6 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -30,29 +30,12 @@ #include #include +#include +#include + namespace ripple { namespace NodeStore { -// Removes a path in its entirety -inline static -bool -removeAll( - boost::filesystem::path const& path, - beast::Journal const& j) -{ - try - { - boost::filesystem::remove_all(path); - } - catch (std::exception const& e) - { - JLOG(j.error()) << - "exception: " << e.what(); - return false; - } - return true; -} - using PCache = TaggedCache; using NCache = KeyCache; class DatabaseShard; @@ -65,7 +48,7 @@ class DatabaseShard; Public functions can be called concurrently from any thread. */ -class Shard +class Shard final { public: Shard( @@ -77,12 +60,12 @@ class Shard bool open(Scheduler& scheduler, nudb::context& ctx); - bool - setStored(std::shared_ptr const& ledger); - boost::optional prepare(); + bool + store(std::shared_ptr const& ledger); + bool contains(std::uint32_t seq) const; @@ -90,16 +73,25 @@ class Shard sweep(); std::uint32_t - index() const - { - return index_; - } + index() const {return index_;} - std::shared_ptr const& + boost::filesystem::path const& + getDir() const {return dir_;} + + std::tuple< + std::shared_ptr, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr> + getBackendAll() const; + + std::shared_ptr getBackend() const; + /** Returns `true` if all ledgers shard have been stored in the backend + */ bool - complete() const; + isBackendComplete() const; std::shared_ptr pCache() const; @@ -107,23 +99,44 @@ class Shard std::shared_ptr nCache() const; - std::uint64_t - fileSize() const; - - std::uint32_t - fdRequired() const; + std::pair + fileInfo() const; std::shared_ptr lastStored() const; + /** Returns `true` if the shard is complete, validated, and immutable. + */ + bool + isFinal() const; + + /** Returns `true` if the shard is older, without final key data + */ + bool + isLegacy() const; + + /** Finalize shard by walking its ledgers and verifying each Merkle tree. + + @param writeSQLite If true, SQLite entries will be rewritten using + verified backend data. + */ bool - validate() const; + finalize(const bool writeSQLite); + + void + stop() {stop_ = true;} private: - static constexpr auto controlFileName = "control.txt"; + // Current shard version + static constexpr std::uint32_t version_ {1}; + + // The finalKey_ is a hard coded value of zero. It is used to store + // finalizing shard data to the backend. The data contains a version, + // last ledger's hash, and the first and last ledger sequences. + static uint256 const finalKey_; Application& app_; - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; // Shard Index std::uint32_t const index_; @@ -135,8 +148,7 @@ class Shard std::uint32_t const lastSeq_; // The maximum number of ledgers this shard can store - // The earliest shard may store less ledgers than - // subsequent shards + // The earliest shard may store less ledgers than subsequent shards std::uint32_t const maxLedgers_; // Database positive cache @@ -148,9 +160,6 @@ class Shard // Path to database files boost::filesystem::path const dir_; - // Path to control file - boost::filesystem::path const control_; - // Storage space utilized by the shard std::uint64_t fileSz_; @@ -160,6 +169,9 @@ class Shard // NuDB key/value store for node objects std::shared_ptr backend_; + // Temporary SQLite database used while acquiring a shard + std::unique_ptr acquireSQLiteDB_; + // Ledger SQLite database used for indexes std::unique_ptr lgrSQLiteDB_; @@ -168,58 +180,56 @@ class Shard beast::Journal const j_; - // True if shard has its entire ledger range stored - bool complete_ {false}; + // True if backend has stored all ledgers pertaining to the shard + bool backendComplete_ {false}; - // Sequences of ledgers stored with an incomplete shard + // Tracks ledger sequences stored in the backend when building a shard RangeSet storedSeqs_; // Used as an optimization for visitDifferences std::shared_ptr lastStored_; - // Marks shard immutable - // Lock over mutex_ required - bool - setComplete(std::lock_guard const& lock); + // Older shard without an acquire database or final key + // Eventually there will be no need for this and should be removed + bool legacy_ {false}; + + // True if the backend has a final key stored + bool final_ {false}; + + // Determines if the shard needs to stop processing for shutdown + std::atomic stop_ {false}; // Set the backend cache // Lock over mutex_ required void - setCache(std::lock_guard const& lock); + setBackendCache(std::lock_guard const& lock); // Open/Create SQLite databases // Lock over mutex_ required bool - initSQLite(std::lock_guard const& lock); + initSQLite(std::lock_guard const& lock); - // Write SQLite entries for a ledger stored in this shard's backend + // Write SQLite entries for this ledger // Lock over mutex_ required bool - setSQLiteStored( + storeSQLite( std::shared_ptr const& ledger, - std::lock_guard const& lock); + std::lock_guard const& lock); // Set storage and file descriptor usage stats // Lock over mutex_ required bool - setFileStats(std::lock_guard const& lock); - - // Save the control file for an incomplete shard - // Lock over mutex_ required - bool - saveControl(std::lock_guard const& lock); + setFileStats(std::lock_guard const& lock); - // Validate this ledger by walking its SHAMaps - // and verifying each merkle tree + // Validate this ledger by walking its SHAMaps and verifying Merkle trees bool valLedger( std::shared_ptr const& ledger, - std::shared_ptr const& next) const; + std::shared_ptr const& next); - // Fetches from the backend and will log - // errors based on status codes + // Fetches from backend and log errors based on status codes std::shared_ptr - valFetch(uint256 const& hash) const; + valFetch(uint256 const& hash); }; } // namespace NodeStore diff --git a/src/ripple/nodestore/impl/TaskQueue.cpp b/src/ripple/nodestore/impl/TaskQueue.cpp new file mode 100644 index 00000000000..8aac7e6faf8 --- /dev/null +++ b/src/ripple/nodestore/impl/TaskQueue.cpp @@ -0,0 +1,58 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2019 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +#include + +namespace ripple { +namespace NodeStore { + +TaskQueue::TaskQueue() + : workers_(*this, nullptr, "Shard store taskQueue", 1) +{ +} + +void +TaskQueue::addTask(std::function task) +{ + std::lock_guard lock {mutex_}; + + tasks_.emplace(std::move(task)); + workers_.addTask(); +} + +void +TaskQueue::processTask(int instance) +{ + std::function task; + + { + std::lock_guard lock {mutex_}; + assert(!tasks_.empty()); + + task = std::move(tasks_.front()); + tasks_.pop(); + } + + task(); +} + +} // NodeStore +} // ripple diff --git a/src/ripple/nodestore/impl/TaskQueue.h b/src/ripple/nodestore/impl/TaskQueue.h new file mode 100644 index 00000000000..309a418aa81 --- /dev/null +++ b/src/ripple/nodestore/impl/TaskQueue.h @@ -0,0 +1,55 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2019 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_NODESTORE_TASKQUEUE_H_INCLUDED +#define RIPPLE_NODESTORE_TASKQUEUE_H_INCLUDED + +#include + +#include +#include + +namespace ripple { +namespace NodeStore { + +class TaskQueue : private Workers::Callback +{ +public: + TaskQueue(); + + /** Adds a task to the queue + + @param task std::function with signature void() + */ + void + addTask(std::function task); + +private: + std::mutex mutex_; + Workers workers_; + std::queue> tasks_; + + void + processTask(int instance) override; +}; + +} // NodeStore +} // ripple + +#endif diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 122e3dd197c..ff1cb451c6f 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1259,6 +1259,9 @@ PeerImp::onMessage(std::shared_ptr const& m) // Parse the shard indexes received in the shard info RangeSet shardIndexes; { + if (!from_string(shardIndexes, m->shardindexes())) + return badData("Invalid shard indexes"); + std::uint32_t earliestShard; boost::optional latestShard; { @@ -1279,58 +1282,10 @@ PeerImp::onMessage(std::shared_ptr const& m) } } - auto getIndex = [this, &earliestShard, &latestShard] - (std::string const& s) -> boost::optional + if (boost::icl::first(shardIndexes) < earliestShard || + (latestShard && boost::icl::last(shardIndexes) > latestShard)) { - std::uint32_t shardIndex; - if (!beast::lexicalCastChecked(shardIndex, s)) - { - fee_ = Resource::feeBadData; - return boost::none; - } - if (shardIndex < earliestShard || - (latestShard && shardIndex > latestShard)) - { - fee_ = Resource::feeBadData; - JLOG(p_journal_.error()) << - "Invalid shard index " << shardIndex; - return boost::none; - } - return shardIndex; - }; - - std::vector tokens; - boost::split(tokens, m->shardindexes(), - boost::algorithm::is_any_of(",")); - std::vector indexes; - for (auto const& t : tokens) - { - indexes.clear(); - boost::split(indexes, t, boost::algorithm::is_any_of("-")); - switch (indexes.size()) - { - case 1: - { - auto const first {getIndex(indexes.front())}; - if (!first) - return; - shardIndexes.insert(*first); - break; - } - case 2: - { - auto const first {getIndex(indexes.front())}; - if (!first) - return; - auto const second {getIndex(indexes.back())}; - if (!second) - return; - shardIndexes.insert(range(*first, *second)); - break; - } - default: - return badData("Invalid shard indexes"); - } + return badData("Invalid shard indexes"); } } @@ -1340,7 +1295,7 @@ PeerImp::onMessage(std::shared_ptr const& m) { if (m->endpoint() != "0") { - auto result = + auto result = beast::IP::Endpoint::from_string_checked(m->endpoint()); if (!result) return badData("Invalid incoming endpoint: " + m->endpoint()); diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index e14b15e28de..46608ed4d5a 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -554,7 +554,6 @@ JSS ( url_password ); // in: Subscribe JSS ( url_username ); // in: Subscribe JSS ( urlgravatar ); // JSS ( username ); // in: Subscribe -JSS ( validate ); // in: DownloadShard JSS ( validated ); // out: NetworkOPs, RPCHelpers, AccountTx* // Tx JSS ( validator_list_expires ); // out: NetworkOps, ValidatorList diff --git a/src/ripple/rpc/ShardArchiveHandler.h b/src/ripple/rpc/ShardArchiveHandler.h index ed7bc2a5cea..f1026603a8d 100644 --- a/src/ripple/rpc/ShardArchiveHandler.h +++ b/src/ripple/rpc/ShardArchiveHandler.h @@ -41,8 +41,7 @@ class ShardArchiveHandler ShardArchiveHandler& operator= (ShardArchiveHandler&&) = delete; ShardArchiveHandler& operator= (ShardArchiveHandler const&) = delete; - /** @param validate if shard data should be verified with network. */ - ShardArchiveHandler(Application& app, bool validate); + ShardArchiveHandler(Application& app); ~ShardArchiveHandler(); @@ -80,7 +79,6 @@ class ShardArchiveHandler Application& app_; std::shared_ptr downloader_; boost::filesystem::path const downloadDir_; - bool const validate_; boost::asio::basic_waitable_timer timer_; bool process_; std::map archives_; diff --git a/src/ripple/rpc/handlers/DownloadShard.cpp b/src/ripple/rpc/handlers/DownloadShard.cpp index 6b943f665b5..174867ce1ee 100644 --- a/src/ripple/rpc/handlers/DownloadShard.cpp +++ b/src/ripple/rpc/handlers/DownloadShard.cpp @@ -34,7 +34,6 @@ namespace ripple { /** RPC command that downloads and import shard archives. { shards: [{index: , url: }] - validate: // optional, default is true } example: @@ -124,20 +123,9 @@ doDownloadShard(RPC::JsonContext& context) } } - bool validate {true}; - if (context.params.isMember(jss::validate)) - { - if (!context.params[jss::validate].isBool()) - { - return RPC::expected_field_error( - std::string(jss::validate), "a bool"); - } - validate = context.params[jss::validate].asBool(); - } - // Begin downloading. The handler keeps itself alive while downloading. auto handler { - std::make_shared(context.app, validate)}; + std::make_shared(context.app)}; for (auto& [index, url] : archives) { if (!handler->add(index, std::move(url))) diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index dc20704c0dc..1405b52da1d 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -31,11 +31,10 @@ namespace RPC { using namespace boost::filesystem; using namespace std::chrono_literals; -ShardArchiveHandler::ShardArchiveHandler(Application& app, bool validate) +ShardArchiveHandler::ShardArchiveHandler(Application& app) : app_(app) , downloadDir_(get(app_.config().section( ConfigSection::shardDatabase()), "path", "") + "/download") - , validate_(validate) , timer_(app_.getIOService()) , process_(false) , j_(app.journal("ShardArchiveHandler")) @@ -209,7 +208,7 @@ ShardArchiveHandler::complete(path dstPath) { // If validating and not synced then defer and retry auto const mode {ptr->app_.getOPs().getOperatingMode()}; - if (ptr->validate_ && mode != OperatingMode::FULL) + if (mode != OperatingMode::FULL) { std::lock_guard lock(m_); timer_.expires_from_now(static_cast( @@ -265,7 +264,7 @@ ShardArchiveHandler::process(path const& dstPath) } // Import the shard into the shard store - if (!app_.getShardStore()->importShard(shardIndex, shardDir, validate_)) + if (!app_.getShardStore()->importShard(shardIndex, shardDir)) { JLOG(j_.error()) << "Importing shard " << shardIndex; diff --git a/src/ripple/unity/nodestore.cpp b/src/ripple/unity/nodestore.cpp new file mode 100644 index 00000000000..5ea62a3675f --- /dev/null +++ b/src/ripple/unity/nodestore.cpp @@ -0,0 +1,37 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include diff --git a/src/test/basics/RangeSet_test.cpp b/src/test/basics/RangeSet_test.cpp index 2318398bfd3..078daed9030 100644 --- a/src/test/basics/RangeSet_test.cpp +++ b/src/test/basics/RangeSet_test.cpp @@ -19,8 +19,6 @@ #include #include -#include -#include namespace ripple { @@ -78,39 +76,70 @@ class RangeSet_test : public beast::unit_test::suite } void - testSerialization() + testFromString() { + testcase("fromString"); - auto works = [](RangeSet const & orig) - { - std::stringstream ss; - boost::archive::binary_oarchive oa(ss); - oa << orig; - - boost::archive::binary_iarchive ia(ss); - RangeSet deser; - ia >> deser; - - return orig == deser; - }; - - RangeSet rs; - - BEAST_EXPECT(works(rs)); - - rs.insert(3); - BEAST_EXPECT(works(rs)); - - rs.insert(range(7u, 10u)); - BEAST_EXPECT(works(rs)); + RangeSet set; + BEAST_EXPECT(!from_string(set, "")); + BEAST_EXPECT(boost::icl::length(set) == 0); + + BEAST_EXPECT(!from_string(set, "#")); + BEAST_EXPECT(boost::icl::length(set) == 0); + + BEAST_EXPECT(!from_string(set, ",")); + BEAST_EXPECT(boost::icl::length(set) == 0); + + BEAST_EXPECT(!from_string(set, ",-")); + BEAST_EXPECT(boost::icl::length(set) == 0); + + BEAST_EXPECT(!from_string(set, "1,,2")); + BEAST_EXPECT(boost::icl::length(set) == 0); + + set.clear(); + BEAST_EXPECT(from_string(set, "1")); + BEAST_EXPECT(boost::icl::length(set) == 1); + BEAST_EXPECT(boost::icl::first(set) == 1); + + set.clear(); + BEAST_EXPECT(from_string(set, "1,1")); + BEAST_EXPECT(boost::icl::length(set) == 1); + BEAST_EXPECT(boost::icl::first(set) == 1); + + set.clear(); + BEAST_EXPECT(from_string(set, "1-1")); + BEAST_EXPECT(boost::icl::length(set) == 1); + BEAST_EXPECT(boost::icl::first(set) == 1); + + set.clear(); + BEAST_EXPECT(from_string(set, "1,4-6")); + BEAST_EXPECT(boost::icl::length(set) == 4); + BEAST_EXPECT(boost::icl::first(set) == 1); + BEAST_EXPECT(boost::icl::contains(set, 4)); + BEAST_EXPECT(boost::icl::last(set) == 6); + + set.clear(); + BEAST_EXPECT(from_string(set, "1-2,4-6")); + BEAST_EXPECT(boost::icl::length(set) == 5); + BEAST_EXPECT(boost::icl::first(set) == 1); + BEAST_EXPECT(boost::icl::contains(set, 2)); + BEAST_EXPECT(boost::icl::contains(set, 4)); + BEAST_EXPECT(boost::icl::last(set) == 6); + + set.clear(); + BEAST_EXPECT(from_string(set, "1-2,6")); + BEAST_EXPECT(boost::icl::length(set) == 3); + BEAST_EXPECT(boost::icl::first(set) == 1); + BEAST_EXPECT(boost::icl::contains(set, 2)); + BEAST_EXPECT(boost::icl::last(set) == 6); } void run() override { testPrevMissing(); testToString(); - testSerialization(); + testFromString(); } }; diff --git a/src/test/core/Workers_test.cpp b/src/test/core/Workers_test.cpp index cf3edcc84be..931f6d2206f 100644 --- a/src/test/core/Workers_test.cpp +++ b/src/test/core/Workers_test.cpp @@ -109,7 +109,7 @@ class Workers_test : public beast::unit_test::suite std::unique_ptr perfLog = std::make_unique(); - Workers w(cb, *perfLog, "Test", tc1); + Workers w(cb, perfLog.get(), "Test", tc1); BEAST_EXPECT(w.getNumberOfThreads() == tc1); auto testForThreadCount = [this, &cb, &w] (int const threadCount) diff --git a/src/test/rpc/RPCCall_test.cpp b/src/test/rpc/RPCCall_test.cpp index f2ca3758241..bc45dd5c6c1 100644 --- a/src/test/rpc/RPCCall_test.cpp +++ b/src/test/rpc/RPCCall_test.cpp @@ -2998,10 +2998,9 @@ static RPCCallTestData const rpcCallTestArray [] = })" }, { - "download_shard: novalidate.", __LINE__, + "download_shard:", __LINE__, { "download_shard", - "novalidate", "20", "url_NotValidated", }, @@ -3016,8 +3015,7 @@ static RPCCallTestData const rpcCallTestArray [] = "index" : 20, "url" : "url_NotValidated" } - ], - "validate" : false + ] } ] })" @@ -3064,10 +3062,9 @@ static RPCCallTestData const rpcCallTestArray [] = })" }, { - "download_shard: novalidate many shards.", __LINE__, + "download_shard: many shards.", __LINE__, { "download_shard", - "novalidate", "2000000", "url_NotValidated0", "2000001", @@ -3106,8 +3103,7 @@ static RPCCallTestData const rpcCallTestArray [] = "index" : 2000004, "url" : "url_NotValidated4" } - ], - "validate" : false + ] } ] })" @@ -3160,8 +3156,7 @@ static RPCCallTestData const rpcCallTestArray [] = "index" : 20, "url" : "url_NotValidated" } - ], - "validate" : false + ] } ] })"