Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deadlock when notifying shard of ledger #3683

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ inline constexpr std::array<char const*, 1> AcquireShardDBInit{

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
// Pragma for Ledger and Transaction databases with final shards
// These override the CommonDBPragma values defined above.
inline constexpr std::array<char const*, 2> CompleteShardDBPragma{
inline constexpr std::array<char const*, 2> FinalShardDBPragma{
{"PRAGMA synchronous=OFF;", "PRAGMA journal_mode=OFF;"}};

////////////////////////////////////////////////////////////////////////////////
Expand Down
208 changes: 107 additions & 101 deletions src/ripple/nodestore/impl/Shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,38 +429,77 @@ Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
return false;
}

std::lock_guard lock(mutex_);
if (!acquireInfo_)
{
JLOG(j_.error()) << "shard " << index_
<< " missing acquire SQLite database";
auto const scopedCount{makeBackendCount()};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 153 of this file (not in this changeset), we check the backendCount_ and exit early if in use. We also lock mutex_ but not storedMutex_ there. The old code would have closed the backend_ but the new code may not.

Who closes backend_ in such a case? Is it DatabaseShardImp::sweep()? If nobody does, an easy fix is to lock the storedMutex there as well. If it's not an issue, the of couse fine as-is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, DatabaseShardImp::sweep() will close it or the shard Dtor. I don't see an issue with tryClose and storedMutex_ as is, but maybe I missed something you saw.

if (!scopedCount)
return false;
}
if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))

// This lock is used as an optimization to prevent unneeded
// calls to storeSQLite before acquireInfo_ is updated
std::lock_guard storedLock(storedMutex_);
miguelportilla marked this conversation as resolved.
Show resolved Hide resolved

{
// Ignore redundant calls
JLOG(j_.debug()) << "shard " << index_ << " ledger sequence "
<< ledgerSeq << " already stored";
return true;
std::lock_guard lock(mutex_);
if (!acquireInfo_)
{
JLOG(j_.error())
<< "shard " << index_ << " missing acquire SQLite database";
return false;
}
if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
{
// Ignore redundant calls
JLOG(j_.debug()) << "shard " << index_ << " ledger sequence "
<< ledgerSeq << " already stored";
return true;
}
}
// storeSQLite looks at storedSeqs so insert before the call
acquireInfo_->storedSeqs.insert(ledgerSeq);

if (!storeSQLite(ledger, lock))
if (!storeSQLite(ledger))
return false;

if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_)
{
if (!initSQLite(lock))
return false;
std::lock_guard lock(mutex_);

state_ = complete;
// Update the acquire database
acquireInfo_->storedSeqs.insert(ledgerSeq);

try
{
auto session{acquireInfo_->SQLiteDB->checkoutDb()};
soci::blob sociBlob(*session);
convert(to_string(acquireInfo_->storedSeqs), sociBlob);
if (ledgerSeq == lastSeq_)
{
// Store shard's last ledger hash
auto const sHash{to_string(ledger->info().hash)};
*session << "UPDATE Shard "
"SET LastLedgerHash = :lastLedgerHash,"
"StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sHash), soci::use(sociBlob), soci::use(index_);
}
else
{
*session << "UPDATE Shard "
"SET StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sociBlob), soci::use(index_);
}
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "shard " << index_
<< ". Exception caught in function " << __func__
<< ". Error: " << e.what();
acquireInfo_->storedSeqs.erase(ledgerSeq);
return false;
}

JLOG(j_.debug()) << "shard " << index_ << " stored ledger sequence "
<< ledgerSeq;
if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_)
state_ = complete;

setFileStats(lock);
JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence "
<< ledgerSeq;
return true;
}

Expand Down Expand Up @@ -525,16 +564,16 @@ Shard::finalize(
{
uint256 hash{0};
std::uint32_t ledgerSeq{0};
auto fail =
[j = j_, index = index_, &hash, &ledgerSeq](std::string const& msg) {
JLOG(j.fatal())
<< "shard " << index << ". " << msg
<< (hash.isZero() ? "" : ". Ledger hash " + to_string(hash))
<< (ledgerSeq == 0
? ""
: ". Ledger sequence " + std::to_string(ledgerSeq));
return false;
};
auto fail = [&](std::string const& msg) {
JLOG(j_.fatal()) << "shard " << index_ << ". " << msg
<< (hash.isZero() ? ""
: ". Ledger hash " + to_string(hash))
<< (ledgerSeq == 0 ? ""
: ". Ledger sequence " +
std::to_string(ledgerSeq));
state_ = finalizing;
return false;
};

auto const scopedCount{makeBackendCount()};
if (!scopedCount)
Expand Down Expand Up @@ -578,14 +617,14 @@ Shard::finalize(
if (!acquireInfo_)
return fail("missing acquire SQLite database");

auto& session{acquireInfo_->SQLiteDB->getSession()};
auto session{acquireInfo_->SQLiteDB->checkoutDb()};
boost::optional<std::uint32_t> index;
boost::optional<std::string> sHash;
soci::blob sociBlob(session);
soci::blob sociBlob(*session);
soci::indicator blobPresent;
session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
"FROM Shard "
"WHERE ShardIndex = :index;",
*session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
"FROM Shard "
"WHERE ShardIndex = :index;",
soci::into(index), soci::into(sHash),
soci::into(sociBlob, blobPresent), soci::use(index_);

Expand Down Expand Up @@ -678,12 +717,8 @@ Shard::finalize(
if (!verifyLedger(ledger, next))
return fail("failed to validate ledger");

if (writeSQLite)
{
std::lock_guard lock(mutex_);
if (!storeSQLite(ledger, lock))
return fail("failed storing to SQLite databases");
}
if (writeSQLite && !storeSQLite(ledger))
return fail("failed storing to SQLite databases");

hash = ledger->info().parentHash;
next = std::move(ledger);
Expand All @@ -710,12 +745,12 @@ Shard::finalize(

auto vacuum = [&tmpDir](std::unique_ptr<DatabaseCon>& sqliteDB)
{
auto& session {sqliteDB->getSession()};
session << "PRAGMA synchronous=OFF;";
session << "PRAGMA journal_mode=OFF;";
session << "PRAGMA temp_store_directory='" <<
auto session {sqliteDB->checkoutDb()};
*session << "PRAGMA synchronous=OFF;";
*session << "PRAGMA journal_mode=OFF;";
*session << "PRAGMA temp_store_directory='" <<
tmpDir.string() << "';";
session << "VACUUM;";
*session << "VACUUM;";
};
vacuum(lgrSQLiteDB_);
vacuum(txSQLiteDB_);
Expand Down Expand Up @@ -750,11 +785,13 @@ Shard::finalize(
remove_all(dir_ / AcquireShardDBName);
}

lastAccess_ = std::chrono::steady_clock::now();
state_ = final;

if (!initSQLite(lock))
return fail("failed to initialize SQLite databases");

setFileStats(lock);
lastAccess_ = std::chrono::steady_clock::now();
}
catch (std::exception const& e)
{
Expand All @@ -763,7 +800,6 @@ Shard::finalize(
". Error: " + e.what());
}

state_ = final;
return true;
}

Expand Down Expand Up @@ -927,25 +963,25 @@ Shard::initSQLite(std::lock_guard<std::mutex> const&)
if (txSQLiteDB_)
txSQLiteDB_.reset();

if (state_ != acquire)
if (state_ == final)
{
lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
setup, LgrDBName, CompleteShardDBPragma, LgrDBInit);
setup, LgrDBName, FinalShardDBPragma, LgrDBInit);
lgrSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(
config.getValueFor(SizedItem::lgrDBCache, boost::none)));

txSQLiteDB_ = std::make_unique<DatabaseCon>(
setup, TxDBName, CompleteShardDBPragma, TxDBInit);
setup, TxDBName, FinalShardDBPragma, TxDBInit);
txSQLiteDB_->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(
config.getValueFor(SizedItem::txnDBCache, boost::none)));
}
else
{
// The incomplete shard uses a Write Ahead Log for performance
// Non final shards use a Write Ahead Log for performance
lgrSQLiteDB_ = std::make_unique<DatabaseCon>(
setup,
LgrDBName,
Expand Down Expand Up @@ -981,9 +1017,7 @@ Shard::initSQLite(std::lock_guard<std::mutex> const&)
}

bool
Shard::storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::mutex> const&)
Shard::storeSQLite(std::shared_ptr<Ledger const> const& ledger)
{
if (stop_)
return false;
Expand All @@ -994,14 +1028,14 @@ Shard::storeSQLite(
{
// Update the transactions database
{
auto& session{txSQLiteDB_->getSession()};
soci::transaction tr(session);
auto session{txSQLiteDB_->checkoutDb()};
soci::transaction tr(*session);

session << "DELETE FROM Transactions "
"WHERE LedgerSeq = :seq;",
*session << "DELETE FROM Transactions "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);
session << "DELETE FROM AccountTransactions "
"WHERE LedgerSeq = :seq;",
*session << "DELETE FROM AccountTransactions "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);

if (ledger->info().txHash.isNonZero())
Expand All @@ -1025,8 +1059,8 @@ Shard::storeSQLite(
auto const txMeta{std::make_shared<TxMeta>(
txID, ledger->seq(), *item.second)};

session << "DELETE FROM AccountTransactions "
"WHERE TransID = :txID;",
*session << "DELETE FROM AccountTransactions "
"WHERE TransID = :txID;",
soci::use(sTxID);

auto const& accounts = txMeta->getAffectedAccounts(j_);
Expand All @@ -1051,7 +1085,7 @@ Shard::storeSQLite(
}),
",");
sql += ';';
session << sql;
*session << sql;

JLOG(j_.trace()) << "shard " << index_
<< " account transaction: " << sql;
Expand All @@ -1065,7 +1099,7 @@ Shard::storeSQLite(

Serializer s;
item.second->add(s);
session
*session
<< (STTx::getMetaSQLInsertReplaceHeader() +
item.first->getMetaSQL(
ledgerSeq, sqlBlobLiteral(s.modData())) +
Expand All @@ -1076,22 +1110,21 @@ Shard::storeSQLite(
tr.commit();
}

auto const sHash{to_string(ledger->info().hash)};

// Update the ledger database
{
auto& session{lgrSQLiteDB_->getSession()};
soci::transaction tr(session);

auto const sParentHash{to_string(ledger->info().parentHash)};
auto const sDrops{to_string(ledger->info().drops)};
auto const sAccountHash{to_string(ledger->info().accountHash)};
auto const sTxHash{to_string(ledger->info().txHash)};
auto const sHash{to_string(ledger->info().hash)};

session << "DELETE FROM Ledgers "
"WHERE LedgerSeq = :seq;",
auto session{lgrSQLiteDB_->checkoutDb()};
soci::transaction tr(*session);

*session << "DELETE FROM Ledgers "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);
session
*session
<< "INSERT OR REPLACE INTO Ledgers ("
"LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime,"
"PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash,"
Expand All @@ -1111,33 +1144,6 @@ Shard::storeSQLite(

tr.commit();
}

// Update the acquire database if present
if (acquireInfo_)
{
auto& session{acquireInfo_->SQLiteDB->getSession()};
soci::blob sociBlob(session);

if (!acquireInfo_->storedSeqs.empty())
convert(to_string(acquireInfo_->storedSeqs), sociBlob);

if (ledger->info().seq == lastSeq_)
{
// Store shard's last ledger hash
session << "UPDATE Shard "
"SET LastLedgerHash = :lastLedgerHash,"
"StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sHash), soci::use(sociBlob), soci::use(index_);
}
else
{
session << "UPDATE Shard "
"SET StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sociBlob), soci::use(index_);
}
}
}
catch (std::exception const& e)
{
Expand Down
7 changes: 2 additions & 5 deletions src/ripple/nodestore/impl/Shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <nudb/nudb.hpp>

#include <atomic>
#include <tuple>

namespace ripple {
namespace NodeStore {
Expand Down Expand Up @@ -253,6 +252,7 @@ class Shard final
Application& app_;
beast::Journal const j_;
mutable std::mutex mutex_;
mutable std::mutex storedMutex_;

// Shard Index
std::uint32_t const index_;
Expand Down Expand Up @@ -316,11 +316,8 @@ class Shard final
initSQLite(std::lock_guard<std::mutex> const&);

// Write SQLite entries for this ledger
// Lock over mutex_ required
[[nodiscard]] bool
storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::mutex> const&);
storeSQLite(std::shared_ptr<Ledger const> const& ledger);

// Set storage and file descriptor usage stats
// Lock over mutex_ required
Expand Down