From 2691234783c5507e1a3c8892d0091ade7dc60ae0 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Fri, 13 Sep 2019 18:44:24 -0400 Subject: [PATCH] Improve shard concurrency * Reduce lock scope on all public functions * Use TaskQueue to process shard finalization in separate thread * Store shard last ledger hash and other info in backend * Use temp SQLite DB versus control file when acquiring * Remove boost serialization from cmake files --- .../nodestore/impl/DatabaseShardImp.cpp | 480 +++++++++--------- src/ripple/nodestore/impl/DatabaseShardImp.h | 7 +- src/ripple/nodestore/impl/Shard.cpp | 45 +- src/ripple/nodestore/impl/Shard.h | 27 +- 4 files changed, 276 insertions(+), 283 deletions(-) diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index a6c4df694e9..57b7224dc15 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -26,7 +26,6 @@ #include #include #include -#include #include #include #include @@ -82,187 +81,177 @@ DatabaseShardImp::init() }; std::lock_guard lock(mutex_); - if (init_) - return fail("already initialized"); + { + std::lock_guard lock(mutex_); - Config const& config {app_.config()}; - Section const& section {config.section(ConfigSection::shardDatabase())}; - if (section.empty()) - return fail("missing configuration"); + if (init_) + return fail("already initialized"); + + Config const& config {app_.config()}; + Section const& section {config.section(ConfigSection::shardDatabase())}; + if (section.empty()) + return fail("missing configuration"); - { - // Node and shard stores must use same earliest ledger sequence - std::uint32_t seq; - if (get_if_exists( - config.section(ConfigSection::nodeDatabase()), - "earliest_seq", - seq)) { - std::uint32_t seq2; - if (get_if_exists(section, "earliest_seq", seq2) && - seq != seq2) + // Node and shard stores must use same earliest ledger sequence + std::uint32_t seq; + if (get_if_exists( + config.section(ConfigSection::nodeDatabase()), + "earliest_seq", + seq)) { - return fail("and [" + ConfigSection::shardDatabase() + - "] both define 'earliest_seq'"); + std::uint32_t seq2; + if (get_if_exists( + section, + "earliest_seq", seq2) && + seq != seq2) + { + return fail("and [" + ConfigSection::shardDatabase() + + "] both define 'earliest_seq'"); + } } } - } - if (!get_if_exists(section, "path", dir_)) - return fail("'path' missing"); + if (!get_if_exists(section, "path", dir_)) + return fail("'path' missing"); - if (exists(dir_)) - { - if (!is_directory(dir_)) - return fail("'path' must be a directory"); - } - else - create_directories(dir_); - - { - std::uint64_t sz; - if (!get_if_exists(section, "max_size_gb", sz)) - return fail("'max_size_gb' missing"); - - if ((sz << 30) < sz) - return fail("'max_size_gb' overflow"); - - // Minimum storage space required (in gigabytes) - if (sz < 10) - return fail("'max_size_gb' must be at least 10"); + if (exists(dir_)) + { + if (!is_directory(dir_)) + return fail("'path' must be a directory"); + } + else + create_directories(dir_); - // Convert to bytes - maxFileSz_ = sz << 30; - } + { + std::uint64_t sz; + if (!get_if_exists(section, "max_size_gb", sz)) + return fail("'max_size_gb' missing"); - if (section.exists("ledgers_per_shard")) - { - // To be set only in standalone for testing - if (!config.standalone()) - return fail("'ledgers_per_shard' only honored in stand alone"); + if ((sz << 30) < sz) + return fail("'max_size_gb' overflow"); - ledgersPerShard_ = get(section, "ledgers_per_shard"); - if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) - return fail("'ledgers_per_shard' must be a multiple of 256"); - } + // Minimum storage space required (in gigabytes) + if (sz < 10) + return fail("'max_size_gb' must be at least 10"); - // NuDB is the default and only supported permanent storage backend - // "Memory" and "none" types are supported for tests - backendName_ = get(section, "type", "nudb"); - if (!boost::iequals(backendName_, "NuDB") && - !boost::iequals(backendName_, "Memory") && - !boost::iequals(backendName_, "none")) - { - return fail("'type' value unsupported"); - } + // Convert to bytes + maxFileSz_ = sz << 30; + } - // Check if backend uses permanent storage - if (auto factory = Manager::instance().find(backendName_)) - { - auto backend {factory->createInstance( - NodeObject::keyBytes, section, scheduler_, j_)}; - backed_ = backend->backed(); - if (!backed_) + if (section.exists("ledgers_per_shard")) { - setFileStats(lock); - init_ = true; - return true; + // To be set only in standalone for testing + if (!config.standalone()) + return fail("'ledgers_per_shard' only honored in stand alone"); + + ledgersPerShard_ = get(section, "ledgers_per_shard"); + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + return fail("'ledgers_per_shard' must be a multiple of 256"); } - } - else - return fail(backendName_ + " backend unsupported"); - try - { - ctx_ = std::make_unique(); - ctx_->start(); + // NuDB is the default and only supported permanent storage backend + backendName_ = get(section, "type", "nudb"); + if (!boost::iequals(backendName_, "NuDB")) + return fail("'type' value unsupported"); - // Find shards - for (auto const& d : directory_iterator(dir_)) + try { - if (!is_directory(d)) - continue; - - // Check shard directory name is numeric - auto dirName = d.path().stem().string(); - if (!std::all_of( - dirName.begin(), - dirName.end(), - [](auto c) { - return ::isdigit(static_cast(c)); - })) - { - continue; - } + ctx_ = std::make_unique(); + ctx_->start(); - auto const shardIndex {std::stoul(dirName)}; - if (shardIndex < earliestShardIndex()) + // Find shards + for (auto const& d : directory_iterator(dir_)) { - return fail("shard " + std::to_string(shardIndex) + - " comes before earliest shard index " + - std::to_string(earliestShardIndex())); - } + if (!is_directory(d)) + continue; - auto const shardDir {dir_ / std::to_string(shardIndex)}; + // Check shard directory name is numeric + auto dirName = d.path().stem().string(); + if (!std::all_of( + dirName.begin(), + dirName.end(), + [](auto c) { + return ::isdigit(static_cast(c)); + })) + { + continue; + } - // Check if a previous import failed - if (is_regular_file(shardDir / importMarker_)) - { - JLOG(j_.warn()) << - "shard " << shardIndex << - " previously failed import, removing"; - remove_all(shardDir); - continue; - } + auto const shardIndex {std::stoul(dirName)}; + if (shardIndex < earliestShardIndex()) + { + return fail("shard " + std::to_string(shardIndex) + + " comes before earliest shard index " + + std::to_string(earliestShardIndex())); + } - auto shard {std::make_unique(app_, *this, shardIndex, j_)}; - if (!shard->open(scheduler_, *ctx_)) - { - if (!shard->isLegacy()) - return false; + auto const shardDir {dir_ / std::to_string(shardIndex)}; - // Remove legacy shard - JLOG(j_.warn()) << - "shard " << shardIndex << - " incompatible legacy shard, removing"; - remove_all(shardDir); - continue; - } + // Check if a previous import failed + if (is_regular_file(shardDir / importMarker_)) + { + JLOG(j_.warn()) << + "shard " << shardIndex << + " previously failed import, removing"; + remove_all(shardDir); + continue; + } - if (shard->isFinal()) - { - shards_.emplace( + auto shard {std::make_unique( + app_, + *this, shardIndex, - ShardInfo(std::move(shard), ShardInfo::State::final)); - } - else if (shard->isBackendComplete()) - { - auto const result {shards_.emplace( - shardIndex, - ShardInfo(std::move(shard), ShardInfo::State::none))}; - finalizeShard(result.first->second, true, lock); - } - else - { - if (acquireIndex_ != 0) - return fail("more than one shard being acquired"); - acquireIndex_ = shardIndex; - shards_.emplace( - shardIndex, - ShardInfo(std::move(shard), ShardInfo::State::acquire)); + j_)}; + if (!shard->open(scheduler_, *ctx_)) + { + if (!shard->isLegacy()) + return false; + + // Remove legacy shard + JLOG(j_.warn()) << + "shard " << shardIndex << + " incompatible legacy shard, removing"; + remove_all(shardDir); + continue; + } + + if (shard->isFinal()) + { + shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::final)); + } + else if (shard->isBackendComplete()) + { + auto const result {shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::none))}; + finalizeShard(result.first->second, true, lock); + } + else + { + if (acquireIndex_ != 0) + return fail("more than one shard being acquired"); + acquireIndex_ = shardIndex; + shards_.emplace( + shardIndex, + ShardInfo(std::move(shard), ShardInfo::State::acquire)); + } } } - } - catch (std::exception const& e) - { - return fail(std::string("exception ") + - e.what() + " in function " + __func__); + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + + updateStatus(lock); + setParent(parent_); + init_ = true; } - setFileStats(lock); - updateStatus(lock); - setParent(parent_); - init_ = true; + setFileStats(); return true; } @@ -283,21 +272,18 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) if (!canAdd_) return boost::none; - if (backed_) + // Check available storage space + if (fileSz_ + avgShardFileSz_ > maxFileSz_) { - // Check available storage space - if (fileSz_ + avgShardFileSz_ > maxFileSz_) - { - JLOG(j_.debug()) << "maximum storage size reached"; - canAdd_ = false; - return boost::none; - } - if (avgShardFileSz_ > available()) - { - JLOG(j_.error()) << "insufficient storage space available"; - canAdd_ = false; - return boost::none; - } + JLOG(j_.debug()) << "maximum storage size reached"; + canAdd_ = false; + return boost::none; + } + if (avgShardFileSz_ > available()) + { + JLOG(j_.error()) << "insufficient storage space available"; + canAdd_ = false; + return boost::none; } auto const shardIndex {findAcquireIndex(validLedgerSeq, lock)}; @@ -364,13 +350,10 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) } // Check available storage space - if (backed_) - { - if (fileSz_ + avgShardFileSz_ > maxFileSz_) - return fail("maximum storage size reached"); - if (avgShardFileSz_ > available()) - return fail("insufficient storage space available"); - } + if (fileSz_ + avgShardFileSz_ > maxFileSz_) + return fail("maximum storage size reached"); + if (avgShardFileSz_ > available()) + return fail("insufficient storage space available"); shards_.emplace(shardIndex, ShardInfo(nullptr, ShardInfo::State::import)); return true; @@ -611,22 +594,24 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) return; auto const complete {shard->isBackendComplete()}; - std::lock_guard lock(mutex_); - if (complete) { - if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + std::lock_guard lock(mutex_); + if (complete) { - if (shardIndex == acquireIndex_) - acquireIndex_ = 0; + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + { + if (shardIndex == acquireIndex_) + acquireIndex_ = 0; - if (it->second.state != ShardInfo::State::finalize) - finalizeShard(it->second, false, lock); + if (it->second.state != ShardInfo::State::finalize) + finalizeShard(it->second, false, lock); + } + else + return fail("is not being acquired"); } - else - return fail("is not being acquired"); } - setFileStats(lock); + setFileStats(); } std::string @@ -641,7 +626,7 @@ DatabaseShardImp::getCompleteShards() void DatabaseShardImp::validate() { - std::vector> shardsToValidate; + std::vector> wptrShards; { std::lock_guard lock(mutex_); assert(init_); @@ -649,15 +634,15 @@ DatabaseShardImp::validate() // Only shards with a state of final should be validated for (auto& e : shards_) if (e.second.state == ShardInfo::State::final) - shardsToValidate.push_back(e.second.shard); + wptrShards.push_back(e.second.shard); - if (shardsToValidate.empty()) + if (wptrShards.empty()) return; JLOG(j_.debug()) << "Validating shards " << status_; } - for (auto const& wptr : shardsToValidate) + for (auto const& wptr : wptrShards) { if (auto shard {wptr.lock()}; shard) shard->finalize(true); @@ -814,7 +799,7 @@ DatabaseShardImp::import(Database& source) if (!ledger || ledger->info().seq != seq) break; - auto[backend, pCache, nCache, lastStored] = + [[maybe_unused]] auto [backend, pCache, nCache, lastStored] = shard->getBackendAll(); if (!Database::copyLedger( *ledger, @@ -854,8 +839,6 @@ DatabaseShardImp::import(Database& source) "shard " << shardIndex << " failed to import"; remove_all(shardDir); } - else - setFileStats(lock); } // Re initialize the shard store @@ -916,7 +899,8 @@ DatabaseShardImp::store( } auto nObj {NodeObject::createObject(type, std::move(data), hash)}; - auto[backend, pCache, nCache, lastStored] = shard->getBackendAll(); + [[maybe_unused]] auto [backend, pCache, nCache, lastStored] = + shard->getBackendAll(); pCache->canonicalize(hash, nObj, true); backend->store(nObj); nCache->erase(hash); @@ -975,12 +959,12 @@ DatabaseShardImp::copyLedger(std::shared_ptr const& srcLedger) shard = it->second.shard; else return fail("is not being acquired"); - - if (shard->contains(seq)) - return fail("ledger already stored"); } - auto[backend, pCache, nCache, lastStored] = shard->getBackendAll(); + if (shard->contains(seq)) + return fail("ledger already stored"); + + auto [backend, pCache, nCache, lastStored] = shard->getBackendAll(); if (!Database::copyLedger(*srcLedger, backend, pCache, nCache, lastStored)) return false; @@ -988,22 +972,24 @@ DatabaseShardImp::copyLedger(std::shared_ptr const& srcLedger) return false; auto const complete {shard->isBackendComplete()}; - std::lock_guard lock(mutex_); - if (complete) { - if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + std::lock_guard lock(mutex_); + if (complete) { - if (shardIndex == acquireIndex_) - acquireIndex_ = 0; + if (auto const it {shards_.find(shardIndex)}; it != shards_.end()) + { + if (shardIndex == acquireIndex_) + acquireIndex_ = 0; - if (it->second.state != ShardInfo::State::finalize) - finalizeShard(it->second, false, lock); + if (it->second.state != ShardInfo::State::finalize) + finalizeShard(it->second, false, lock); + } + else + return fail("is not being acquired"); } - else - return fail("is not being acquired"); } - setFileStats(lock); + setFileStats(); return true; } @@ -1050,7 +1036,7 @@ DatabaseShardImp::getCacheHitRate() void DatabaseShardImp::sweep() { - std::vector> shardsToSweep; + std::vector> wptrShards; { std::lock_guard lock(mutex_); assert(init_); @@ -1059,11 +1045,11 @@ DatabaseShardImp::sweep() if (e.second.state == ShardInfo::State::final || e.second.state == ShardInfo::State::acquire) { - shardsToSweep.push_back(e.second.shard); + wptrShards.push_back(e.second.shard); } } - for (auto const& wptr : shardsToSweep) + for (auto const& wptr : wptrShards) { if (auto shard {wptr.lock()}; shard) shard->sweep(); @@ -1181,23 +1167,26 @@ DatabaseShardImp::finalizeShard( if (!shard->finalize(writeSQLite)) { // Finalize failed, remove shard - std::lock_guard lock(mutex_); - shards_.erase(shardIndex); - - using namespace boost::filesystem; - path const dir {shard->getDir()}; - shard.reset(); - try { - remove_all(dir); - } - catch (std::exception const& e) - { - JLOG(j_.error()) << "exception " << e.what(); + std::lock_guard lock(mutex_); + shards_.erase(shardIndex); + + using namespace boost::filesystem; + path const dir {shard->getDir()}; + shard.reset(); + try + { + remove_all(dir); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << "exception " << e.what(); + } + + updateStatus(lock); } - setFileStats(lock); - updateStatus(lock); + setFileStats(); return; } @@ -1207,10 +1196,11 @@ DatabaseShardImp::finalizeShard( if (it == shards_.end()) return; it->second.state = ShardInfo::State::final; - setFileStats(lock); updateStatus(lock); } + setFileStats(); + // Update peers with new shard index if (!app_.config().standalone() && app_.getOPs().getOperatingMode() != OperatingMode::DISCONNECTED) @@ -1228,27 +1218,40 @@ DatabaseShardImp::finalizeShard( } void -DatabaseShardImp::setFileStats(std::lock_guard&) +DatabaseShardImp::setFileStats() { - fileSz_ = 0; - fdRequired_ = 0; - if (!shards_.empty()) + std::vector> wptrShards; { + std::lock_guard lock(mutex_); + assert(init_); + + fileSz_ = 0; + fdRequired_ = 0; + avgShardFileSz_ = 0; + + if (shards_.empty()) + return; + for (auto const& e : shards_) + wptrShards.push_back(e.second.shard); + } + + std::uint64_t sumSz {0}; + std::uint32_t sumFd {0}; + for (auto const& wptr : wptrShards) + { + if (auto shard {wptr.lock()}; shard) { - fileSz_ += e.second.shard->fileSize(); - fdRequired_ += e.second.shard->fdRequired(); + auto[sz, fd] = shard->fileInfo(); + sumSz += sz; + sumFd += fd; } - avgShardFileSz_ = fileSz_ / shards_.size(); } - else - avgShardFileSz_ = 0; - - if (!backed_) - return; - // Require at least 15 file descriptors - fdRequired_ = std::max(fdRequired_, 15); + std::lock_guard lock(mutex_); + fileSz_ = sumSz; + fdRequired_ = sumFd; + avgShardFileSz_ = fileSz_ / wptrShards.size(); if (fileSz_ >= maxFileSz_) { @@ -1292,7 +1295,8 @@ DatabaseShardImp::getCache(std::uint32_t seq) return {}; } - auto[backend, pCache, nCache, lastStored] = shard->getBackendAll(); + [[maybe_unused]] auto [backend, pCache, nCache, lastStored] = + shard->getBackendAll(); return std::make_pair(pCache, nCache); } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 39ac5175ea4..8f2e749e0ec 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -216,9 +216,6 @@ class DatabaseShardImp : public DatabaseShard // Complete shard indexes std::string status_; - // If backend type uses permanent storage - bool backed_; - // The name associated with the backend used with the shard store std::string backendName_; @@ -267,9 +264,9 @@ class DatabaseShardImp : public DatabaseShard std::lock_guard&); // Set storage and file descriptor usage stats - // Lock must be held + // Lock must NOT be held void - setFileStats(std::lock_guard&); + setFileStats(); // Update status string // Lock must be held diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 414a709c0bb..b64144d3338 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -314,24 +314,6 @@ Shard::isBackendComplete() const return backendComplete_; } -bool -Shard::isFinal() const -{ - std::lock_guard lock(mutex_); - assert(backend_); - - return final_; -} - -bool -Shard::isLegacy() const -{ - std::lock_guard lock(mutex_); - assert(backend_); - - return legacy_; -} - std::shared_ptr Shard::pCache() const { @@ -350,31 +332,40 @@ Shard::nCache() const return nCache_; } -std::uint64_t -Shard::fileSize() const +std::pair +Shard::fileInfo() const { std::lock_guard lock(mutex_); assert(backend_); - return fileSz_; + return {fileSz_, fdRequired_}; } -std::uint32_t -Shard::fdRequired() const +std::shared_ptr +Shard::lastStored() const { std::lock_guard lock(mutex_); assert(backend_); - return fdRequired_; + return lastStored_; } -std::shared_ptr -Shard::lastStored() const +bool +Shard::isFinal() const { std::lock_guard lock(mutex_); assert(backend_); - return lastStored_; + return final_; +} + +bool +Shard::isLegacy() const +{ + std::lock_guard lock(mutex_); + assert(backend_); + + return legacy_; } bool diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index c4d7c9ee9fe..350f7bd6b4d 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -30,6 +30,8 @@ #include #include +#include + namespace ripple { namespace NodeStore { @@ -90,29 +92,28 @@ class Shard final bool isBackendComplete() const; - /** Returns `true` if the shard is complete, validated, and immutable. - */ - bool - isFinal() const; - - bool - isLegacy() const; - std::shared_ptr pCache() const; std::shared_ptr nCache() const; - std::uint64_t - fileSize() const; - - std::uint32_t - fdRequired() const; + std::pair + fileInfo() const; std::shared_ptr lastStored() const; + /** Returns `true` if the shard is complete, validated, and immutable. + */ + bool + isFinal() const; + + /** Returns `true` if the shard is older, without final key data + */ + bool + isLegacy() const; + /** Finalize shard by walking its ledgers and verifying each Merkle tree. @param writeSQLite If true, SQLite entries will be rewritten using