diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 54b106b6dbc..5c760f6e1b7 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -282,17 +282,23 @@ Database::copyLedger( batch.reserve(batchWritePreallocationSize); }; bool error = false; - auto f = [&](SHAMapAbstractNode& node) { + auto visit = [&](SHAMapAbstractNode& node) + { if (auto nObj = srcDB.fetch( node.getNodeHash().as_uint256(), srcLedger.info().seq)) { batch.emplace_back(std::move(nObj)); - if (batch.size() >= batchWritePreallocationSize) - storeBatch(); + if (batch.size() < batchWritePreallocationSize) + return true; + + storeBatch(); + + if (!isStopping()) + return true; } - else - error = true; - return !error; + + error = true; + return false; }; // Store ledger header @@ -319,10 +325,10 @@ Database::copyLedger( { auto have = next->stateMap().snapShot(false); srcLedger.stateMap().snapShot( - false)->visitDifferences(&(*have), f); + false)->visitDifferences(&(*have), visit); } else - srcLedger.stateMap().snapShot(false)->visitNodes(f); + srcLedger.stateMap().snapShot(false)->visitNodes(visit); if (error) return false; } @@ -337,7 +343,7 @@ Database::copyLedger( " transaction map invalid"; return false; } - srcLedger.txMap().snapShot(false)->visitNodes(f); + srcLedger.txMap().snapShot(false)->visitNodes(visit); if (error) return false; } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 38aa231a9db..7a5128e33fd 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -59,8 +59,7 @@ DatabaseShardImp::DatabaseShardImp( DatabaseShardImp::~DatabaseShardImp() { - // Stop threads before data members are destroyed - stopThreads(); + onStop(); // Close backend databases before destroying the context std::lock_guard lock(mutex_); @@ -673,6 +672,17 @@ DatabaseShardImp::validate() app_.shardFamily()->reset(); } +void +DatabaseShardImp::onStop() +{ + Database::onStop(); + + std::lock_guard lock(mutex_); + for (auto const& e : shards_) + if (e.second.shard) + e.second.shard->stop(); +} + void DatabaseShardImp::import(Database& source) { @@ -972,6 +982,9 @@ DatabaseShardImp::asyncFetch( bool DatabaseShardImp::copyLedger(std::shared_ptr const& srcLedger) { + if (isStopping()) + return false; + auto const seq {srcLedger->info().seq}; auto const shardIndex {seqToShardIndex(seq)}; std::shared_ptr shard; @@ -1195,6 +1208,9 @@ DatabaseShardImp::finalizeShard( shardInfo.state = ShardInfo::State::finalize; taskQueue_->addTask([this, shardIndex, writeSQLite]() mutable { + if (isStopping()) + return; + std::shared_ptr shard; { std::lock_guard lock(mutex_); @@ -1210,6 +1226,9 @@ DatabaseShardImp::finalizeShard( if (!shard->finalize(writeSQLite)) { + if (isStopping()) + return; + // Finalize failed, remove shard { std::lock_guard lock(mutex_); @@ -1234,6 +1253,9 @@ DatabaseShardImp::finalizeShard( return; } + if (isStopping()) + return; + { std::lock_guard lock(mutex_); auto const it {shards_.find(shardIndex)}; @@ -1277,7 +1299,8 @@ DatabaseShardImp::setFileStats() return; for (auto const& e : shards_) - wptrShards.push_back(e.second.shard); + if (e.second.shard) + wptrShards.push_back(e.second.shard); } std::uint64_t sumSz {0}; @@ -1318,7 +1341,8 @@ DatabaseShardImp::updateStatus(std::lock_guard&) { RangeSet rs; for (auto const& e : shards_) - rs.insert(e.second.shard->index()); + if (e.second.state == ShardInfo::State::final) + rs.insert(e.second.shard->index()); status_ = to_string(rs); } else @@ -1336,7 +1360,9 @@ DatabaseShardImp::getCache(std::uint32_t seq) if (auto const it {shards_.find(shardIndex)}; it != shards_.end() && it->second.shard) + { shard = it->second.shard; + } else return {}; } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 8f2e749e0ec..d279d92bf3a 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -127,6 +127,9 @@ class DatabaseShardImp : public DatabaseShard return backendName_; } + void + onStop() override; + /** Import the application local node store @param source The application node store. diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index b64144d3338..20c3b821ef3 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -934,6 +934,8 @@ Shard::valLedger( bool error {false}; auto visit = [this, &error](SHAMapAbstractNode& node) { + if (stop_) + return false; if (!valFetch(node.getNodeHash().as_uint256())) error = true; return !error; @@ -957,6 +959,8 @@ Shard::valLedger( return fail(std::string("exception ") + e.what() + " in function " + __func__); } + if (stop_) + return false; if (error) return fail("Invalid state map"); } @@ -976,6 +980,8 @@ Shard::valLedger( return fail(std::string("exception ") + e.what() + " in function " + __func__); } + if (stop_) + return false; if (error) return fail("Invalid transaction map"); } @@ -1001,7 +1007,7 @@ Shard::valFetch(uint256 const& hash) switch (backend_->fetch(hash.begin(), &nObj)) { case ok: - break; + return nObj; case notFound: return fail("Missing node object"); case dataCorrupt: @@ -1015,7 +1021,6 @@ Shard::valFetch(uint256 const& hash) return fail(std::string("exception ") + e.what() + " in function " + __func__); } - return nObj; } } // NodeStore diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 350f7bd6b4d..223d11b3ee6 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -30,6 +30,7 @@ #include #include +#include #include namespace ripple { @@ -122,6 +123,9 @@ class Shard final bool finalize(const bool writeSQLite); + void + stop() {stop_ = true;} + private: // Current shard version static constexpr std::uint32_t version_ {1}; @@ -147,10 +151,6 @@ class Shard final // The earliest shard may store less ledgers than subsequent shards std::uint32_t const maxLedgers_; - // Older shard without an acquire database or final key - // Eventually this should be removed - bool legacy_ {false}; - // Database positive cache std::shared_ptr pCache_; @@ -183,15 +183,22 @@ class Shard final // True if backend has stored all ledgers pertaining to the shard bool backendComplete_ {false}; - // True if the shard is complete, validated, and immutable - bool final_ {false}; - // Tracks ledger sequences stored in the backend when building a shard RangeSet storedSeqs_; // Used as an optimization for visitDifferences std::shared_ptr lastStored_; + // Older shard without an acquire database or final key + // Eventually there will be no need for this and should be removed + bool legacy_ {false}; + + // True if the backend has a final key stored + bool final_ {false}; + + // Determines if the shard needs to stop processing for shutdown + std::atomic stop_ {false}; + // Set the backend cache // Lock over mutex_ required void