Skip to content

Commit

Permalink
Prevent deadlock in storeSQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Dec 2, 2020
1 parent 14984b8 commit 52c7e94
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 85 deletions.
169 changes: 88 additions & 81 deletions src/ripple/nodestore/impl/Shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,33 +474,75 @@ Shard::setLedgerStored(std::shared_ptr<Ledger const> const& ledger)
return false;
}

std::lock_guard lock(mutex_);
if (!acquireInfo_)
{
JLOG(j_.error()) << "shard " << index_
<< " missing acquire SQLite database";
auto const scopedCount{makeBackendCount()};
if (!scopedCount)
return false;
}
if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))

std::lock_guard storedLock(storedMutex_);

{
// Ignore redundant calls
JLOG(j_.debug()) << "shard " << index_ << " ledger sequence "
<< ledgerSeq << " already stored";
return true;
std::lock_guard lock(mutex_);
if (!acquireInfo_)
{
JLOG(j_.error())
<< "shard " << index_ << " missing acquire SQLite database";
return false;
}
if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
{
// Ignore redundant calls
JLOG(j_.debug()) << "shard " << index_ << " ledger sequence "
<< ledgerSeq << " already stored";
return true;
}
}
// storeSQLite looks at storedSeqs so insert before the call

if (!storeSQLite(ledger))
return false;

std::lock_guard lock(mutex_);

// Update the acquire database
acquireInfo_->storedSeqs.insert(ledgerSeq);

if (!storeSQLite(ledger, lock))
try
{
auto session{acquireInfo_->SQLiteDB->checkoutDb()};
soci::blob sociBlob(*session);
convert(to_string(acquireInfo_->storedSeqs), sociBlob);
if (ledgerSeq == lastSeq_)
{
// Store shard's last ledger hash
auto const sHash{to_string(ledger->info().hash)};
*session << "UPDATE Shard "
"SET LastLedgerHash = :lastLedgerHash,"
"StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sHash), soci::use(sociBlob), soci::use(index_);
}
else
{
*session << "UPDATE Shard "
"SET StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sociBlob), soci::use(index_);
}
}
catch (std::exception const& e)
{
JLOG(j_.fatal()) << "shard " << index_
<< ". Exception caught in function " << __func__
<< ". Error: " << e.what();
acquireInfo_->storedSeqs.erase(ledgerSeq);
return false;
}

if (boost::icl::length(acquireInfo_->storedSeqs) >= maxLedgers_)
state_ = complete;

JLOG(j_.debug()) << "shard " << index_ << " stored ledger sequence "
<< ledgerSeq;

setFileStats(lock);
JLOG(j_.trace()) << "shard " << index_ << " stored ledger sequence "
<< ledgerSeq;
return true;
}

Expand Down Expand Up @@ -649,14 +691,14 @@ Shard::finalize(
if (!acquireInfo_)
return fail("missing acquire SQLite database");

auto& session{acquireInfo_->SQLiteDB->getSession()};
auto session{acquireInfo_->SQLiteDB->checkoutDb()};
boost::optional<std::uint32_t> index;
boost::optional<std::string> sHash;
soci::blob sociBlob(session);
soci::blob sociBlob(*session);
soci::indicator blobPresent;
session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
"FROM Shard "
"WHERE ShardIndex = :index;",
*session << "SELECT ShardIndex, LastLedgerHash, StoredLedgerSeqs "
"FROM Shard "
"WHERE ShardIndex = :index;",
soci::into(index), soci::into(sHash),
soci::into(sociBlob, blobPresent), soci::use(index_);

Expand Down Expand Up @@ -751,12 +793,8 @@ Shard::finalize(
if (!verifyLedger(ledger, next))
return fail("failed to validate ledger");

if (writeSQLite)
{
std::lock_guard lock(mutex_);
if (!storeSQLite(ledger, lock))
return fail("failed storing to SQLite databases");
}
if (writeSQLite && !storeSQLite(ledger))
return fail("failed storing to SQLite databases");

hash = ledger->info().parentHash;
next = std::move(ledger);
Expand Down Expand Up @@ -785,12 +823,12 @@ Shard::finalize(
auto vacuum = [&tmpDir](std::unique_ptr<DatabaseCon>& sqliteDB)
{
auto& session {sqliteDB->getSession()};
session << "PRAGMA synchronous=OFF;";
session << "PRAGMA journal_mode=OFF;";
session << "PRAGMA temp_store_directory='" <<
auto session {sqliteDB->checkoutDb()};
*session << "PRAGMA synchronous=OFF;";
*session << "PRAGMA journal_mode=OFF;";
*session << "PRAGMA temp_store_directory='" <<
tmpDir.string() << "';";
session << "VACUUM;";
*session << "VACUUM;";
};
vacuum(lgrSQLiteDB_);
vacuum(txSQLiteDB_);
Expand Down Expand Up @@ -838,7 +876,6 @@ Shard::finalize(
". Error: " + e.what());
}

state_ = final;
return true;
}

Expand Down Expand Up @@ -1061,9 +1098,7 @@ Shard::initSQLite(std::lock_guard<std::mutex> const&)
}

bool
Shard::storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::mutex> const&)
Shard::storeSQLite(std::shared_ptr<Ledger const> const& ledger)
{
if (stop_)
return false;
Expand All @@ -1074,14 +1109,14 @@ Shard::storeSQLite(
{
// Update the transactions database
{
auto& session{txSQLiteDB_->getSession()};
soci::transaction tr(session);
auto session{txSQLiteDB_->checkoutDb()};
soci::transaction tr(*session);

session << "DELETE FROM Transactions "
"WHERE LedgerSeq = :seq;",
*session << "DELETE FROM Transactions "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);
session << "DELETE FROM AccountTransactions "
"WHERE LedgerSeq = :seq;",
*session << "DELETE FROM AccountTransactions "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);

if (ledger->info().txHash.isNonZero())
Expand All @@ -1105,8 +1140,8 @@ Shard::storeSQLite(
auto const txMeta{std::make_shared<TxMeta>(
txID, ledger->seq(), *item.second)};

session << "DELETE FROM AccountTransactions "
"WHERE TransID = :txID;",
*session << "DELETE FROM AccountTransactions "
"WHERE TransID = :txID;",
soci::use(sTxID);

auto const& accounts = txMeta->getAffectedAccounts(j_);
Expand All @@ -1131,7 +1166,7 @@ Shard::storeSQLite(
}),
",");
sql += ';';
session << sql;
*session << sql;

JLOG(j_.trace()) << "shard " << index_
<< " account transaction: " << sql;
Expand All @@ -1145,7 +1180,7 @@ Shard::storeSQLite(

Serializer s;
item.second->add(s);
session
*session
<< (STTx::getMetaSQLInsertReplaceHeader() +
item.first->getMetaSQL(
ledgerSeq, sqlEscape(s.modData())) +
Expand All @@ -1156,22 +1191,21 @@ Shard::storeSQLite(
tr.commit();
}

auto const sHash{to_string(ledger->info().hash)};

// Update the ledger database
{
auto& session{lgrSQLiteDB_->getSession()};
soci::transaction tr(session);

auto const sParentHash{to_string(ledger->info().parentHash)};
auto const sDrops{to_string(ledger->info().drops)};
auto const sAccountHash{to_string(ledger->info().accountHash)};
auto const sTxHash{to_string(ledger->info().txHash)};
auto const sHash{to_string(ledger->info().hash)};

auto session{lgrSQLiteDB_->checkoutDb()};
soci::transaction tr(*session);

session << "DELETE FROM Ledgers "
"WHERE LedgerSeq = :seq;",
*session << "DELETE FROM Ledgers "
"WHERE LedgerSeq = :seq;",
soci::use(ledgerSeq);
session
*session
<< "INSERT OR REPLACE INTO Ledgers ("
"LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime,"
"PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash,"
Expand All @@ -1191,33 +1225,6 @@ Shard::storeSQLite(

tr.commit();
}

// Update the acquire database if present
if (acquireInfo_)
{
auto& session{acquireInfo_->SQLiteDB->getSession()};
soci::blob sociBlob(session);

if (!acquireInfo_->storedSeqs.empty())
convert(to_string(acquireInfo_->storedSeqs), sociBlob);

if (ledger->info().seq == lastSeq_)
{
// Store shard's last ledger hash
session << "UPDATE Shard "
"SET LastLedgerHash = :lastLedgerHash,"
"StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sHash), soci::use(sociBlob), soci::use(index_);
}
else
{
session << "UPDATE Shard "
"SET StoredLedgerSeqs = :storedLedgerSeqs "
"WHERE ShardIndex = :shardIndex;",
soci::use(sociBlob), soci::use(index_);
}
}
}
catch (std::exception const& e)
{
Expand Down
6 changes: 2 additions & 4 deletions src/ripple/nodestore/impl/Shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class Shard final
Application& app_;
beast::Journal const j_;
mutable std::mutex mutex_;
std::mutex storedMutex_;

// Shard Index
std::uint32_t const index_;
Expand Down Expand Up @@ -333,11 +334,8 @@ class Shard final
initSQLite(std::lock_guard<std::mutex> const&);

// Write SQLite entries for this ledger
// Lock over mutex_ required
[[nodiscard]] bool
storeSQLite(
std::shared_ptr<Ledger const> const& ledger,
std::lock_guard<std::mutex> const&);
storeSQLite(std::shared_ptr<Ledger const> const& ledger);

// Set storage and file descriptor usage stats
// Lock over mutex_ required
Expand Down

0 comments on commit 52c7e94

Please sign in to comment.