diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 70e0931c374..6721e096337 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -474,33 +474,75 @@ Shard::setLedgerStored(std::shared_ptr const& ledger) return false; } - std::lock_guard lock(mutex_); - if (!acquireInfo_) - { - JLOG(j_.error()) << "shard " << index_ - << " missing acquire SQLite database"; + auto const scopedCount{makeBackendCount()}; + if (!scopedCount) return false; - } - if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq)) + + std::lock_guard storedLock(storedMutex_); + { - // Ignore redundant calls - JLOG(j_.debug()) << "shard " << index_ << " ledger sequence " - << ledgerSeq << " already stored"; - return true; + std::lock_guard lock(mutex_); + if (!acquireInfo_) + { + JLOG(j_.error()) + << "shard " << index_ << " missing acquire SQLite database"; + return false; + } + if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq)) + { + // Ignore redundant calls + JLOG(j_.debug()) << "shard " << index_ << " ledger sequence " + << ledgerSeq << " already stored"; + return true; + } } - // storeSQLite looks at storedSeqs so insert before the call + + if (!storeSQLite(ledger)) + return false; + + std::lock_guard lock(mutex_); + + // Update the acquire database acquireInfo_->storedSeqs.insert(ledgerSeq); - if (!storeSQLite(ledger, lock)) + try + { + auto session{acquireInfo_->SQLiteDB->checkoutDb()}; + soci::blob sociBlob(*session); + convert(to_string(acquireInfo_->storedSeqs), sociBlob); + if (ledgerSeq == lastSeq_) + { + // Store shard's last ledger hash + auto const sHash{to_string(ledger->info().hash)}; + *session << "UPDATE Shard " + "SET LastLedgerHash = :lastLedgerHash," + "StoredLedgerSeqs = :storedLedgerSeqs " + "WHERE ShardIndex = :shardIndex;", + soci::use(sHash), soci::use(sociBlob), soci::use(index_); + } + else + { + *session << "UPDATE Shard " + "SET StoredLedgerSeqs = :storedLedgerSeqs " + "WHERE ShardIndex = :shardIndex;", + soci::use(sociBlob), soci::use(index_); + } + } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "shard " << index_ + << ". Exception caught in function " << __func__ + << ". Error: " << e.what(); + acquireInfo_->storedSeqs.erase(ledgerSeq); return false; + } if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_) state_ = complete; - JLOG(j_.debug()) << "shard " << index_ << " stored ledger sequence " - << ledgerSeq; - setFileStats(lock); + JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence " + << ledgerSeq; return true; } @@ -649,14 +691,14 @@ Shard::finalize( if (!acquireInfo_) return fail("missing acquire SQLite database"); - auto& session{acquireInfo_->SQLiteDB->getSession()}; + auto session{acquireInfo_->SQLiteDB->checkoutDb()}; boost::optional index; boost::optional sHash; - soci::blob sociBlob(session); + soci::blob sociBlob(*session); soci::indicator blobPresent; - session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs " - "FROM Shard " - "WHERE ShardIndex = :index;", + *session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs " + "FROM Shard " + "WHERE ShardIndex = :index;", soci::into(index), soci::into(sHash), soci::into(sociBlob, blobPresent), soci::use(index_); @@ -751,12 +793,8 @@ Shard::finalize( if (!verifyLedger(ledger, next)) return fail("failed to validate ledger"); - if (writeSQLite) - { - std::lock_guard lock(mutex_); - if (!storeSQLite(ledger, lock)) - return fail("failed storing to SQLite databases"); - } + if (writeSQLite && !storeSQLite(ledger)) + return fail("failed storing to SQLite databases"); hash = ledger->info().parentHash; next = std::move(ledger); @@ -785,12 +823,12 @@ Shard::finalize( auto vacuum = [&tmpDir](std::unique_ptr& sqliteDB) { - auto& session {sqliteDB->getSession()}; - session << "PRAGMA synchronous=OFF;"; - session << "PRAGMA journal_mode=OFF;"; - session << "PRAGMA temp_store_directory='" << + auto session {sqliteDB->checkoutDb()}; + *session << "PRAGMA synchronous=OFF;"; + *session << "PRAGMA journal_mode=OFF;"; + *session << "PRAGMA temp_store_directory='" << tmpDir.string() << "';"; - session << "VACUUM;"; + *session << "VACUUM;"; }; vacuum(lgrSQLiteDB_); vacuum(txSQLiteDB_); @@ -838,7 +876,6 @@ Shard::finalize( ". Error: " + e.what()); } - state_ = final; return true; } @@ -1061,9 +1098,7 @@ Shard::initSQLite(std::lock_guard const&) } bool -Shard::storeSQLite( - std::shared_ptr const& ledger, - std::lock_guard const&) +Shard::storeSQLite(std::shared_ptr const& ledger) { if (stop_) return false; @@ -1074,14 +1109,14 @@ Shard::storeSQLite( { // Update the transactions database { - auto& session{txSQLiteDB_->getSession()}; - soci::transaction tr(session); + auto session{txSQLiteDB_->checkoutDb()}; + soci::transaction tr(*session); - session << "DELETE FROM Transactions " - "WHERE LedgerSeq = :seq;", + *session << "DELETE FROM Transactions " + "WHERE LedgerSeq = :seq;", soci::use(ledgerSeq); - session << "DELETE FROM AccountTransactions " - "WHERE LedgerSeq = :seq;", + *session << "DELETE FROM AccountTransactions " + "WHERE LedgerSeq = :seq;", soci::use(ledgerSeq); if (ledger->info().txHash.isNonZero()) @@ -1105,8 +1140,8 @@ Shard::storeSQLite( auto const txMeta{std::make_shared( txID, ledger->seq(), *item.second)}; - session << "DELETE FROM AccountTransactions " - "WHERE TransID = :txID;", + *session << "DELETE FROM AccountTransactions " + "WHERE TransID = :txID;", soci::use(sTxID); auto const& accounts = txMeta->getAffectedAccounts(j_); @@ -1131,7 +1166,7 @@ Shard::storeSQLite( }), ","); sql += ';'; - session << sql; + *session << sql; JLOG(j_.trace()) << "shard " << index_ << " account transaction: " << sql; @@ -1145,7 +1180,7 @@ Shard::storeSQLite( Serializer s; item.second->add(s); - session + *session << (STTx::getMetaSQLInsertReplaceHeader() + item.first->getMetaSQL( ledgerSeq, sqlEscape(s.modData())) + @@ -1156,22 +1191,21 @@ Shard::storeSQLite( tr.commit(); } - auto const sHash{to_string(ledger->info().hash)}; - // Update the ledger database { - auto& session{lgrSQLiteDB_->getSession()}; - soci::transaction tr(session); - auto const sParentHash{to_string(ledger->info().parentHash)}; auto const sDrops{to_string(ledger->info().drops)}; auto const sAccountHash{to_string(ledger->info().accountHash)}; auto const sTxHash{to_string(ledger->info().txHash)}; + auto const sHash{to_string(ledger->info().hash)}; + + auto session{lgrSQLiteDB_->checkoutDb()}; + soci::transaction tr(*session); - session << "DELETE FROM Ledgers " - "WHERE LedgerSeq = :seq;", + *session << "DELETE FROM Ledgers " + "WHERE LedgerSeq = :seq;", soci::use(ledgerSeq); - session + *session << "INSERT OR REPLACE INTO Ledgers (" "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime," "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash," @@ -1191,33 +1225,6 @@ Shard::storeSQLite( tr.commit(); } - - // Update the acquire database if present - if (acquireInfo_) - { - auto& session{acquireInfo_->SQLiteDB->getSession()}; - soci::blob sociBlob(session); - - if (!acquireInfo_->storedSeqs.empty()) - convert(to_string(acquireInfo_->storedSeqs), sociBlob); - - if (ledger->info().seq == lastSeq_) - { - // Store shard's last ledger hash - session << "UPDATE Shard " - "SET LastLedgerHash = :lastLedgerHash," - "StoredLedgerSeqs = :storedLedgerSeqs " - "WHERE ShardIndex = :shardIndex;", - soci::use(sHash), soci::use(sociBlob), soci::use(index_); - } - else - { - session << "UPDATE Shard " - "SET StoredLedgerSeqs = :storedLedgerSeqs " - "WHERE ShardIndex = :shardIndex;", - soci::use(sociBlob), soci::use(index_); - } - } } catch (std::exception const& e) { diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index ec01134e07f..aed4b99dd1c 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -264,6 +264,7 @@ class Shard final Application& app_; beast::Journal const j_; mutable std::mutex mutex_; + std::mutex storedMutex_; // Shard Index std::uint32_t const index_; @@ -333,11 +334,8 @@ class Shard final initSQLite(std::lock_guard const&); // Write SQLite entries for this ledger - // Lock over mutex_ required [[nodiscard]] bool - storeSQLite( - std::shared_ptr const& ledger, - std::lock_guard const&); + storeSQLite(std::shared_ptr const& ledger); // Set storage and file descriptor usage stats // Lock over mutex_ required