Skip to content

Commit

Permalink
Update TxQ lock safety
Browse files Browse the repository at this point in the history
  • Loading branch information
ximinez committed Aug 9, 2024
1 parent c19a88f commit 807f30b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
20 changes: 13 additions & 7 deletions src/xrpld/app/misc/TxQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ class TxQ
std::optional<TxQAccount::TxMap::iterator>
removeFromByFee(
std::optional<TxQAccount::TxMap::iterator> const& replacedTxIter,
std::shared_ptr<STTx const> const& tx);
std::shared_ptr<STTx const> const& tx,
std::lock_guard<std::mutex> const&);

using FeeHook = boost::intrusive::member_hook<
MaybeTx,
Expand Down Expand Up @@ -802,7 +803,7 @@ class TxQ
/// Is the queue at least `fillPercentage` full?
template <size_t fillPercentage = 100>
bool
isFull() const;
isFull(std::lock_guard<std::mutex> const&) const;

/** Checks if the indicated transaction fits the conditions
for being stored in the queue.
Expand All @@ -815,22 +816,26 @@ class TxQ
std::shared_ptr<SLE const> const& sleAccount,
AccountMap::iterator const&,
std::optional<TxQAccount::TxMap::iterator> const&,
std::lock_guard<std::mutex> const& lock);
std::lock_guard<std::mutex> const&);

/// Erase and return the next entry in byFee_ (lower fee level)
FeeMultiSet::iterator_type erase(FeeMultiSet::const_iterator_type);
FeeMultiSet::iterator_type
erase(FeeMultiSet::const_iterator_type, std::lock_guard<std::mutex> const&);
/** Erase and return the next entry for the account (if fee level
is higher), or next entry in byFee_ (lower fee level).
Used to get the next "applyable" MaybeTx for accept().
*/
FeeMultiSet::iterator_type eraseAndAdvance(
FeeMultiSet::const_iterator_type);
FeeMultiSet::iterator_type
eraseAndAdvance(
FeeMultiSet::const_iterator_type,
std::lock_guard<std::mutex> const&);
/// Erase a range of items, based on TxQAccount::TxMap iterators
TxQAccount::TxMap::iterator
erase(
TxQAccount& txQAccount,
TxQAccount::TxMap::const_iterator begin,
TxQAccount::TxMap::const_iterator end);
TxQAccount::TxMap::const_iterator end,
std::lock_guard<std::mutex> const&);

/**
All-or-nothing attempt to try to apply the queued txs for
Expand All @@ -849,6 +854,7 @@ class TxQ
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const&,
beast::Journal j);
};

Expand Down
53 changes: 29 additions & 24 deletions src/xrpld/app/misc/detail/TxQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ TxQ::~TxQ()

template <size_t fillPercentage>
bool
TxQ::isFull() const
TxQ::isFull(std::lock_guard<std::mutex> const&) const
{
static_assert(
fillPercentage > 0 && fillPercentage <= 100, "Invalid fill percentage");
Expand Down Expand Up @@ -437,8 +437,9 @@ TxQ::canBeHeld(
}

auto
TxQ::erase(TxQ::FeeMultiSet::const_iterator_type candidateIter)
-> FeeMultiSet::iterator_type
TxQ::erase(
TxQ::FeeMultiSet::const_iterator_type candidateIter,
std::lock_guard<std::mutex> const&) -> FeeMultiSet::iterator_type
{
auto& txQAccount = byAccount_.at(candidateIter->account);
auto const seqProx = candidateIter->seqProxy;
Expand All @@ -454,8 +455,9 @@ TxQ::erase(TxQ::FeeMultiSet::const_iterator_type candidateIter)
}

auto
TxQ::eraseAndAdvance(TxQ::FeeMultiSet::const_iterator_type candidateIter)
-> FeeMultiSet::iterator_type
TxQ::eraseAndAdvance(
TxQ::FeeMultiSet::const_iterator_type candidateIter,
std::lock_guard<std::mutex> const&) -> FeeMultiSet::iterator_type
{
auto& txQAccount = byAccount_.at(candidateIter->account);
auto const accountIter =
Expand Down Expand Up @@ -491,7 +493,8 @@ auto
TxQ::erase(
TxQ::TxQAccount& txQAccount,
TxQ::TxQAccount::TxMap::const_iterator begin,
TxQ::TxQAccount::TxMap::const_iterator end) -> TxQAccount::TxMap::iterator
TxQ::TxQAccount::TxMap::const_iterator end,
std::lock_guard<std::mutex> const&) -> TxQAccount::TxMap::iterator
{
for (auto it = begin; it != end; ++it)
{
Expand All @@ -512,6 +515,7 @@ TxQ::tryClearAccountQueueUpThruTx(
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock,
beast::Journal j)
{
SeqProxy const tSeqProx{tx.getSeqProxy()};
Expand Down Expand Up @@ -587,11 +591,11 @@ TxQ::tryClearAccountQueueUpThruTx(
{
// All of the queued transactions applied, so remove them from the
// queue.
endTxIter = erase(accountIter->second, beginTxIter, endTxIter);
endTxIter = erase(accountIter->second, beginTxIter, endTxIter, lock);
// If `tx` is replacing a queued tx, delete that one, too.
if (endTxIter != accountIter->second.transactions.end() &&
endTxIter->first == tSeqProx)
erase(accountIter->second, endTxIter, std::next(endTxIter));
erase(accountIter->second, endTxIter, std::next(endTxIter), lock);
}

return txResult;
Expand Down Expand Up @@ -1194,6 +1198,7 @@ TxQ::apply(
view.txCount(),
flags,
metricsSnapshot,
lock,
j);
if (result.second)
{
Expand Down Expand Up @@ -1222,7 +1227,7 @@ TxQ::apply(
// If the queue is full, decide whether to drop the current
// transaction or the last transaction for the account with
// the lowest fee.
if (!replacedTxIter && isFull())
if (!replacedTxIter && isFull(lock))
{
auto lastRIter = byFee_.rbegin();
while (lastRIter != byFee_.rend() && lastRIter->account == account)
Expand Down Expand Up @@ -1283,7 +1288,7 @@ TxQ::apply(
<< " from queue with average fee of " << endEffectiveFeeLevel
<< " in favor of " << transactionID << " with fee of "
<< feeLevelPaid;
erase(byFee_.iterator_to(dropRIter->second));
erase(byFee_.iterator_to(dropRIter->second), lock);
}
else
{
Expand All @@ -1297,7 +1302,7 @@ TxQ::apply(
// Hold the transaction in the queue.
if (replacedTxIter)
{
replacedTxIter = removeFromByFee(replacedTxIter, tx);
replacedTxIter = removeFromByFee(replacedTxIter, tx, lock);
}

if (!accountIsInQueue)
Expand Down Expand Up @@ -1363,7 +1368,7 @@ TxQ::processClosedLedger(Application& app, ReadView const& view, bool timeLeap)
if (candidateIter->lastValid && *candidateIter->lastValid <= ledgerSeq)
{
byAccount_.at(candidateIter->account).dropPenalty = true;
candidateIter = erase(candidateIter);
candidateIter = erase(candidateIter, lock);
}
else
{
Expand Down Expand Up @@ -1467,7 +1472,7 @@ TxQ::accept(Application& app, OpenView& view)
<< " applied successfully with " << transToken(txnResult)
<< ". Remove from queue.";

candidateIter = eraseAndAdvance(candidateIter);
candidateIter = eraseAndAdvance(candidateIter, lock);
ledgerChanged = true;
}
else if (
Expand All @@ -1481,7 +1486,7 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.debug()) << "Queued transaction " << candidateIter->txID
<< " failed with " << transToken(txnResult)
<< ". Remove from queue.";
candidateIter = eraseAndAdvance(candidateIter);
candidateIter = eraseAndAdvance(candidateIter, lock);
}
else
{
Expand All @@ -1496,7 +1501,7 @@ TxQ::accept(Application& app, OpenView& view)
--candidateIter->retriesRemaining;
candidateIter->lastResult = txnResult;
if (account.dropPenalty && account.transactions.size() > 1 &&
isFull<95>())
isFull<95>(lock))
{
// The queue is close to full, this account has multiple
// txs queued, and this account has had a transaction
Expand All @@ -1511,7 +1516,7 @@ TxQ::accept(Application& app, OpenView& view)
<< transToken(txnResult)
<< ". Removing ticketed tx from account "
<< account.account;
candidateIter = eraseAndAdvance(candidateIter);
candidateIter = eraseAndAdvance(candidateIter, lock);
}
else
{
Expand All @@ -1532,7 +1537,7 @@ TxQ::accept(Application& app, OpenView& view)
<< account.account;
auto endIter = byFee_.iterator_to(dropRIter->second);
if (endIter != candidateIter)
erase(endIter);
erase(endIter, lock);
++candidateIter;
}
}
Expand Down Expand Up @@ -1676,8 +1681,8 @@ TxQ::tryDirectApply(
if (txSeqProx.isSeq() && txSeqProx != acctSeqProx)
return {};

FeeLevel64 const requiredFeeLevel = [this, &view, flags]() {
std::lock_guard lock(mutex_);
std::lock_guard lock(mutex_);
FeeLevel64 const requiredFeeLevel = [this, &view, flags, &lock]() {
return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock);
}();
Expand Down Expand Up @@ -1705,7 +1710,6 @@ TxQ::tryDirectApply(
{
// If the applied transaction replaced a transaction in the
// queue then remove the replaced transaction.
std::lock_guard lock(mutex_);

AccountMap::iterator accountIter = byAccount_.find(account);
if (accountIter != byAccount_.end())
Expand All @@ -1715,7 +1719,7 @@ TxQ::tryDirectApply(
txQAcct.transactions.find(txSeqProx);
existingIter != txQAcct.transactions.end())
{
removeFromByFee(existingIter, tx);
removeFromByFee(existingIter, tx, lock);
}
}
}
Expand All @@ -1727,7 +1731,8 @@ TxQ::tryDirectApply(
std::optional<TxQ::TxQAccount::TxMap::iterator>
TxQ::removeFromByFee(
std::optional<TxQAccount::TxMap::iterator> const& replacedTxIter,
std::shared_ptr<STTx const> const& tx)
std::shared_ptr<STTx const> const& tx,
std::lock_guard<std::mutex> const& lock)
{
if (replacedTxIter && tx)
{
Expand All @@ -1739,7 +1744,7 @@ TxQ::removeFromByFee(
assert(deleteIter->seqProxy == tx->getSeqProxy());
assert(deleteIter->account == (*tx)[sfAccount]);

erase(deleteIter);
erase(deleteIter, lock);
}
return std::nullopt;
}
Expand All @@ -1759,7 +1764,7 @@ TxQ::getMetrics(OpenView const& view) const
result.txPerLedger = snapshot.txnsExpected;
result.referenceFeeLevel = baseLevel;
result.minProcessingFeeLevel =
isFull() ? byFee_.rbegin()->feeLevel + FeeLevel64{1} : baseLevel;
isFull(lock) ? byFee_.rbegin()->feeLevel + FeeLevel64{1} : baseLevel;
result.medFeeLevel = snapshot.escalationMultiplier;
result.openLedgerFeeLevel = FeeMetrics::scaleFeeLevel(snapshot, view);

Expand Down

0 comments on commit 807f30b

Please sign in to comment.