Skip to content

Commit

Permalink
Reduce lock contention in manifest cache:
Browse files Browse the repository at this point in the history
This commit combines the `apply_mutex` and `read_mutex` into a single `mutex_`
var. This new `mutex_` var is a `shared_mutex`, and most operations only need to
lock it with a `shared_lock`. The only exception is `applyMutex`, which may need
a `unique_lock`.

One consequence of removing the `apply_mutex` is more than one `applyMutex`
function can run at the same time. To help reduce lock contention that a
`unique_lock` would cause, checks that only require reading data are run a
`shared_lock` (call these the "prewriteChecks"), then the lock is released, then
a `unique_lock` is acquired. Since a currently running `applyManifest` may write
data between the time a `shared_lock` is released and the `write_lock` is
acquired, the "prewriteChecks" need to be rerun. Duplicating this work isn't
ideal, but the "prewirteChecks" are relatively inexpensive.

A couple of other designs were considered. We could restrict more than one
`applyMutex` function from running concurrently - either with a `applyMutex` or
my setting the max number of manifest jobs on the job queue to one. The biggest
issue with this is if any other function ever adds a write lock for any reason,
`applyManifest` would not be broken - data could be written between the release
of the `shared_lock` and the acquisition of the `unique_lock`. Note: it is
tempting to solve this problem by not releasing the `shared_mutex` and simply
upgrading the lock. In the presence of concurrently running `applyManifest`
functions, this will deadlock (both function need to wait for the other to
release their read locks before they can acquire a write lock).
  • Loading branch information
seelabs authored and nbougalis committed Mar 24, 2022
1 parent a07a729 commit 59f5844
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 88 deletions.
23 changes: 14 additions & 9 deletions src/ripple/app/misc/Manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include <ripple/beast/utility/Journal.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/SecretKey.h>

#include <optional>
#include <shared_mutex>
#include <string>

namespace ripple {
Expand Down Expand Up @@ -223,9 +225,8 @@ class DatabaseCon;
class ManifestCache
{
private:
beast::Journal mutable j_;
std::mutex apply_mutex_;
std::mutex mutable read_mutex_;
beast::Journal j_;
std::shared_mutex mutable mutex_;

/** Active manifests stored by master public key. */
hash_map<PublicKey, Manifest> map_;
Expand Down Expand Up @@ -378,8 +379,10 @@ class ManifestCache

/** Invokes the callback once for every populated manifest.
@note Undefined behavior results when calling ManifestCache members from
within the callback
@note Do not call ManifestCache member functions from within the
callback. This can re-lock the mutex from the same thread, which is UB.
@note Do not write ManifestCache member variables from within the
callback. This can lead to data races.
@param f Function called for each manifest
Expand All @@ -391,7 +394,7 @@ class ManifestCache
void
for_each_manifest(Function&& f) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
for (auto const& [_, manifest] : map_)
{
(void)_;
Expand All @@ -401,8 +404,10 @@ class ManifestCache

/** Invokes the callback once for every populated manifest.
@note Undefined behavior results when calling ManifestCache members from
within the callback
@note Do not call ManifestCache member functions from within the
callback. This can re-lock the mutex from the same thread, which is UB.
@note Do not write ManifestCache member variables from
within the callback. This can lead to data races.
@param pf Pre-function called with the maximum number of times f will be
called (useful for memory allocations)
Expand All @@ -417,7 +422,7 @@ class ManifestCache
void
for_each_manifest(PreFun&& pf, EachFun&& f) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
pf(map_.size());
for (auto const& [_, manifest] : map_)
{
Expand Down
182 changes: 107 additions & 75 deletions src/ripple/app/misc/impl/Manifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
#include <ripple/json/json_reader.h>
#include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/Sign.h>

#include <boost/algorithm/string/trim.hpp>

#include <numeric>
#include <shared_mutex>
#include <stdexcept>

namespace ripple {
Expand Down Expand Up @@ -283,7 +286,7 @@ loadValidatorToken(std::vector<std::string> const& blob)
PublicKey
ManifestCache::getSigningKey(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
auto const iter = map_.find(pk);

if (iter != map_.end() && !iter->second.revoked())
Expand All @@ -295,7 +298,7 @@ ManifestCache::getSigningKey(PublicKey const& pk) const
PublicKey
ManifestCache::getMasterKey(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};

if (auto const iter = signingToMasterKeys_.find(pk);
iter != signingToMasterKeys_.end())
Expand All @@ -307,7 +310,7 @@ ManifestCache::getMasterKey(PublicKey const& pk) const
std::optional<std::uint32_t>
ManifestCache::getSequence(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
auto const iter = map_.find(pk);

if (iter != map_.end() && !iter->second.revoked())
Expand All @@ -319,7 +322,7 @@ ManifestCache::getSequence(PublicKey const& pk) const
std::optional<std::string>
ManifestCache::getDomain(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
auto const iter = map_.find(pk);

if (iter != map_.end() && !iter->second.revoked())
Expand All @@ -331,7 +334,7 @@ ManifestCache::getDomain(PublicKey const& pk) const
std::optional<std::string>
ManifestCache::getManifest(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
auto const iter = map_.find(pk);

if (iter != map_.end() && !iter->second.revoked())
Expand All @@ -343,7 +346,7 @@ ManifestCache::getManifest(PublicKey const& pk) const
bool
ManifestCache::revoked(PublicKey const& pk) const
{
std::lock_guard lock{read_mutex_};
std::shared_lock lock{mutex_};
auto const iter = map_.find(pk);

if (iter != map_.end())
Expand All @@ -355,86 +358,115 @@ ManifestCache::revoked(PublicKey const& pk) const
ManifestDisposition
ManifestCache::applyManifest(Manifest m)
{
std::lock_guard applyLock{apply_mutex_};

// Before we spend time checking the signature, make sure the
// sequence number is newer than any we have.
auto const iter = map_.find(m.masterKey);

if (iter != map_.end() && m.sequence <= iter->second.sequence)
{
// We received a manifest whose sequence number is not strictly greater
// than the one we already know about. This can happen in several cases
// including when we receive manifests from a peer who doesn't have the
// latest data.
if (auto stream = j_.debug())
logMftAct(
stream,
"Stale",
m.masterKey,
m.sequence,
iter->second.sequence);
return ManifestDisposition::stale;
}

// Now check the signature
if (!m.verify())
{
if (auto stream = j_.warn())
logMftAct(stream, "Invalid", m.masterKey, m.sequence);
return ManifestDisposition::invalid;
}

// If the master key associated with a manifest is or might be compromised
// and is, therefore, no longer trustworthy.
//
// A manifest revocation essentially marks a manifest as compromised. By
// setting the sequence number to the highest value possible, the manifest
// is effectively neutered and cannot be superseded by a forged one.
bool const revoked = m.revoked();

if (auto stream = j_.warn(); stream && revoked)
logMftAct(stream, "Revoked", m.masterKey, m.sequence);

std::lock_guard readLock{read_mutex_};

// Sanity check: the master key of this manifest should not be used as
// the ephemeral key of another manifest:
if (auto const x = signingToMasterKeys_.find(m.masterKey);
x != signingToMasterKeys_.end())
{
JLOG(j_.warn()) << to_string(m)
<< ": Master key already used as ephemeral key for "
<< toBase58(TokenType::NodePublic, x->second);
// Check the manifest against the conditions that do not require a
// `unique_lock` (write lock) on the `mutex_`. Since the signature can be
// relatively expensive, the `checkSignature` parameter determines if the
// signature should be checked. Since `prewriteCheck` is run twice (see
// comment below), `checkSignature` only needs to be set to true on the
// first run.
auto prewriteCheck =
[this, &m](auto const& iter, bool checkSignature, auto const& lock)
-> std::optional<ManifestDisposition> {
assert(lock.owns_lock());
(void)lock; // not used. parameter is present to ensure the mutex is
// locked when the lambda is called.
if (iter != map_.end() && m.sequence <= iter->second.sequence)
{
// We received a manifest whose sequence number is not strictly
// greater than the one we already know about. This can happen in
// several cases including when we receive manifests from a peer who
// doesn't have the latest data.
if (auto stream = j_.debug())
logMftAct(
stream,
"Stale",
m.masterKey,
m.sequence,
iter->second.sequence);
return ManifestDisposition::stale;
}

return ManifestDisposition::badMasterKey;
}
if (checkSignature && !m.verify())
{
if (auto stream = j_.warn())
logMftAct(stream, "Invalid", m.masterKey, m.sequence);
return ManifestDisposition::invalid;
}

if (!revoked)
{
// Sanity check: the ephemeral key of this manifest should not be used
// as the master or ephemeral key of another manifest:
if (auto const x = signingToMasterKeys_.find(m.signingKey);
// If the master key associated with a manifest is or might be
// compromised and is, therefore, no longer trustworthy.
//
// A manifest revocation essentially marks a manifest as compromised. By
// setting the sequence number to the highest value possible, the
// manifest is effectively neutered and cannot be superseded by a forged
// one.
bool const revoked = m.revoked();

if (auto stream = j_.warn(); stream && revoked)
logMftAct(stream, "Revoked", m.masterKey, m.sequence);

// Sanity check: the master key of this manifest should not be used as
// the ephemeral key of another manifest:
if (auto const x = signingToMasterKeys_.find(m.masterKey);
x != signingToMasterKeys_.end())
{
JLOG(j_.warn())
<< to_string(m)
<< ": Ephemeral key already used as ephemeral key for "
<< toBase58(TokenType::NodePublic, x->second);
JLOG(j_.warn()) << to_string(m)
<< ": Master key already used as ephemeral key for "
<< toBase58(TokenType::NodePublic, x->second);

return ManifestDisposition::badEphemeralKey;
return ManifestDisposition::badMasterKey;
}

if (auto const x = map_.find(m.signingKey); x != map_.end())
if (!revoked)
{
JLOG(j_.warn())
<< to_string(m) << ": Ephemeral key used as master key for "
<< to_string(x->second);
// Sanity check: the ephemeral key of this manifest should not be
// used as the master or ephemeral key of another manifest:
if (auto const x = signingToMasterKeys_.find(m.signingKey);
x != signingToMasterKeys_.end())
{
JLOG(j_.warn())
<< to_string(m)
<< ": Ephemeral key already used as ephemeral key for "
<< toBase58(TokenType::NodePublic, x->second);

return ManifestDisposition::badEphemeralKey;
}

if (auto const x = map_.find(m.signingKey); x != map_.end())
{
JLOG(j_.warn())
<< to_string(m) << ": Ephemeral key used as master key for "
<< to_string(x->second);

return ManifestDisposition::badEphemeralKey;
return ManifestDisposition::badEphemeralKey;
}
}

return std::nullopt;
};

{
std::shared_lock sl{mutex_};
if (auto d =
prewriteCheck(map_.find(m.masterKey), /*checkSig*/ true, sl))
return *d;
}

std::unique_lock sl{mutex_};
auto const iter = map_.find(m.masterKey);
// Since we released the previously held read lock, it's possible that the
// collections have been written to. This means we need to run
// `prewriteCheck` again. This re-does work, but `prewriteCheck` is
// relatively inexpensive to run, and doing it this way allows us to run
// `prewriteCheck` under a `shared_lock` above.
// Note, the signature has already been checked above, so it
// doesn't need to happen again (signature checks are somewhat expensive).
// Note: It's a mistake to use an upgradable lock. This is a recipe for
// deadlock.
if (auto d = prewriteCheck(iter, /*checkSig*/ false, sl))
return *d;

bool const revoked = m.revoked();
// This is the first manifest we are seeing for a master key. This should
// only ever happen once per validator run.
if (iter == map_.end())
Expand Down Expand Up @@ -543,7 +575,7 @@ ManifestCache::save(
std::string const& dbTable,
std::function<bool(PublicKey const&)> const& isTrusted)
{
std::lock_guard lock{apply_mutex_};
std::shared_lock lock{mutex_};
auto db = dbCon.checkoutDb();

saveManifests(*db, dbTable, isTrusted, map_, j_);
Expand Down
1 change: 1 addition & 0 deletions src/ripple/core/Job.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum JobType {
jtRPC, // A websocket command from the client
jtSWEEP, // Sweep for stale structures
jtVALIDATION_ut, // A validation from an untrusted source
jtMANIFEST, // A validator's manifest
jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION_l, // A local transaction
jtREPLAY_REQ, // Peer request a ledger delta or a skip list
Expand Down
1 change: 1 addition & 0 deletions src/ripple/core/JobTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class JobTypes
add(jtPACK, "makeFetchPack", 1, 0ms, 0ms);
add(jtPUBOLDLEDGER, "publishAcqLedger", 2, 10000ms, 15000ms);
add(jtVALIDATION_ut, "untrustedValidation", maxLimit, 2000ms, 5000ms);
add(jtMANIFEST, "manifest", maxLimit, 2000ms, 5000ms);
add(jtTRANSACTION_l, "localTransaction", maxLimit, 100ms, 500ms);
add(jtREPLAY_REQ, "ledgerReplayRequest", 10, 250ms, 1000ms);
add(jtLEDGER_REQ, "ledgerRequest", 3, 0ms, 0ms);
Expand Down
6 changes: 2 additions & 4 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ PeerImp::fail(std::string const& reason)
return post(
strand_,
std::bind(
(void (Peer::*)(std::string const&)) & PeerImp::fail,
(void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
Expand Down Expand Up @@ -1067,10 +1067,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
if (s > 100)
fee_ = Resource::feeMediumBurdenPeer;

// VFALCO What's the right job type?
auto that = shared_from_this();
app_.getJobQueue().addJob(
jtVALIDATION_ut, "receiveManifests", [this, that, m]() {
jtMANIFEST, "receiveManifests", [this, that = shared_from_this(), m]() {
overlay_.onManifests(m, that);
});
}
Expand Down

0 comments on commit 59f5844

Please sign in to comment.