Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lifetime management of ledger objects (SLEs) to prevent runaway memory usage. AKA "Is it caching? It's always caching." #4822

Merged
merged 11 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/ripple/app/ledger/InboundLedgers.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class InboundLedgers

virtual void
stop() = 0;

virtual std::size_t
cacheSize() = 0;
};

std::unique_ptr<InboundLedgers>
Expand Down
21 changes: 21 additions & 0 deletions src/ripple/app/ledger/LedgerReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ class LedgerReplayer final
void
stop();

std::size_t
tasksSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return tasks_.size();
}

std::size_t
deltasSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return deltas_.size();
}

std::size_t
skipListsSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return skipLists_.size();
}

private:
mutable std::mutex mtx_;
std::vector<std::shared_ptr<LedgerReplayTask>> tasks_;
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,13 @@ class InboundLedgersImp : public InboundLedgers
mRecentFailures.clear();
}

std::size_t
cacheSize() override
{
ScopedLockType lock(mLock);
return mLedgers.size();
}

private:
clock_type& m_clock;

Expand Down
5 changes: 5 additions & 0 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ LedgerMaster::updatePaths()
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Need network ledger for updating paths";
return;
}
Expand All @@ -1568,6 +1569,7 @@ LedgerMaster::updatePaths()
else
{ // Nothing to do
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Nothing to do for updating paths";
return;
}
Expand All @@ -1584,6 +1586,7 @@ LedgerMaster::updatePaths()
<< "Published ledger too old for updating paths";
std::lock_guard ml(m_mutex);
--mPathFindThread;
mPathLedger.reset();
return;
}
}
Expand All @@ -1596,6 +1599,7 @@ LedgerMaster::updatePaths()
if (!pathRequests.requestsPending())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug())
<< "No path requests found. Nothing to do for updating "
"paths. "
Expand All @@ -1613,6 +1617,7 @@ LedgerMaster::updatePaths()
<< "No path requests left. No need for further updating "
"paths";
--mPathFindThread;
mPathLedger.reset();
return;
}
}
Expand Down
172 changes: 162 additions & 10 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,20 +1065,172 @@ class ApplicationImp : public Application, public BasicApp
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.

nodeFamily_.sweep();
{
std::shared_ptr<FullBelowCache const> const fullBelowCache =
nodeFamily_.getFullBelowCache(0);

std::shared_ptr<TreeNodeCache const> const treeNodeCache =
nodeFamily_.getTreeNodeCache(0);

std::size_t const oldFullBelowSize = fullBelowCache->size();
std::size_t const oldTreeNodeSize = treeNodeCache->size();

nodeFamily_.sweep();

JLOG(m_journal.debug())
<< "NodeFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << fullBelowCache->size();

JLOG(m_journal.debug())
<< "NodeFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: " << treeNodeCache->size();
}
if (shardFamily_)
{
std::size_t const oldFullBelowSize =
shardFamily_->getFullBelowCacheSize();
std::size_t const oldTreeNodeSize =
shardFamily_->getTreeNodeCacheSize().second;

shardFamily_->sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();

JLOG(m_journal.debug())
<< "ShardFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << shardFamily_->getFullBelowCacheSize();

JLOG(m_journal.debug())
<< "ShardFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: "
<< shardFamily_->getTreeNodeCacheSize().second;
}
{
TaggedCache<uint256, Transaction> const& masterTxCache =
getMasterTransaction().getCache();

std::size_t const oldMasterTxSize = masterTxCache.size();

getMasterTransaction().sweep();

JLOG(m_journal.debug())
<< "MasterTransaction sweep. Size before: " << oldMasterTxSize
<< "; size after: " << masterTxCache.size();
}
{
// Does not appear to have an associated cache.
getNodeStore().sweep();
}
if (shardStore_)
{
// Does not appear to have an associated cache.
shardStore_->sweep();
getLedgerMaster().sweep();
getTempNodeCache().sweep();
getValidations().expire(m_journal);
getInboundLedgers().sweep();
getLedgerReplayer().sweep();
m_acceptedLedgerCache.sweep();
cachedSLEs_.sweep();
}
{
std::size_t const oldLedgerMasterCacheSize =
getLedgerMaster().getFetchPackCacheSize();

getLedgerMaster().sweep();

JLOG(m_journal.debug())
<< "LedgerMaster sweep. Size before: "
<< oldLedgerMasterCacheSize << "; size after: "
<< getLedgerMaster().getFetchPackCacheSize();
}
{
// NodeCache == TaggedCache<SHAMapHash, Blob>
std::size_t const oldTempNodeCacheSize = getTempNodeCache().size();

getTempNodeCache().sweep();

JLOG(m_journal.debug())
<< "TempNodeCache sweep. Size before: " << oldTempNodeCacheSize
<< "; size after: " << getTempNodeCache().size();
}
{
std::size_t const oldCurrentCacheSize =
getValidations().sizeOfCurrentCache();
std::size_t const oldSizeSeqEnforcesSize =
getValidations().sizeOfSeqEnforcersCache();
std::size_t const oldByLedgerSize =
getValidations().sizeOfByLedgerCache();
std::size_t const oldBySequenceSize =
getValidations().sizeOfBySequenceCache();

getValidations().expire(m_journal);

JLOG(m_journal.debug())
<< "Validations Current expire. Size before: "
<< oldCurrentCacheSize
<< "; size after: " << getValidations().sizeOfCurrentCache();

JLOG(m_journal.debug())
<< "Validations SeqEnforcer expire. Size before: "
<< oldSizeSeqEnforcesSize << "; size after: "
<< getValidations().sizeOfSeqEnforcersCache();

JLOG(m_journal.debug())
<< "Validations ByLedger expire. Size before: "
<< oldByLedgerSize
<< "; size after: " << getValidations().sizeOfByLedgerCache();

JLOG(m_journal.debug())
<< "Validations BySequence expire. Size before: "
<< oldBySequenceSize
<< "; size after: " << getValidations().sizeOfBySequenceCache();
}
{
std::size_t const oldInboundLedgersSize =
getInboundLedgers().cacheSize();

getInboundLedgers().sweep();

JLOG(m_journal.debug())
<< "InboundLedgers sweep. Size before: "
<< oldInboundLedgersSize
<< "; size after: " << getInboundLedgers().cacheSize();
}
{
size_t const oldTasksSize = getLedgerReplayer().tasksSize();
size_t const oldDeltasSize = getLedgerReplayer().deltasSize();
size_t const oldSkipListsSize = getLedgerReplayer().skipListsSize();

getLedgerReplayer().sweep();

JLOG(m_journal.debug())
<< "LedgerReplayer tasks sweep. Size before: " << oldTasksSize
<< "; size after: " << getLedgerReplayer().tasksSize();

JLOG(m_journal.debug())
<< "LedgerReplayer deltas sweep. Size before: "
<< oldDeltasSize
<< "; size after: " << getLedgerReplayer().deltasSize();

JLOG(m_journal.debug())
<< "LedgerReplayer skipLists sweep. Size before: "
<< oldSkipListsSize
<< "; size after: " << getLedgerReplayer().skipListsSize();
}
{
std::size_t const oldAcceptedLedgerSize =
m_acceptedLedgerCache.size();

m_acceptedLedgerCache.sweep();

JLOG(m_journal.debug())
<< "AcceptedLedgerCache sweep. Size before: "
<< oldAcceptedLedgerSize
<< "; size after: " << m_acceptedLedgerCache.size();
}
{
std::size_t const oldCachedSLEsSize = cachedSLEs_.size();

cachedSLEs_.sweep();

JLOG(m_journal.debug())
<< "CachedSLEs sweep. Size before: " << oldCachedSLEsSize
<< "; size after: " << cachedSLEs_.size();
}

#ifdef RIPPLED_REPORTING
if (auto pg = dynamic_cast<PostgresDatabase*>(&*mRelationalDatabase))
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/app/paths/PathRequests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,17 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
break;
}

// Hold on to the line cache until after the lock is released, so it can
// be destroyed outside of the lock
std::shared_ptr<RippleLineCache> lastCache;
{
// Get the latest requests, cache, and ledger for next pass
std::lock_guard sl(mLock);

if (requests_.empty())
break;
requests = requests_;
lastCache = cache;
cache = getLineCache(cache->getLedger(), false);
}
} while (!app_.getJobQueue().isStopping());
Expand Down
28 changes: 28 additions & 0 deletions src/ripple/consensus/Validations.h
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,34 @@ class Validations

return laggards;
}

std::size_t
sizeOfCurrentCache() const
{
std::lock_guard lock{mutex_};
return current_.size();
}

std::size_t
sizeOfSeqEnforcersCache() const
{
std::lock_guard lock{mutex_};
return seqEnforcers_.size();
}

std::size_t
sizeOfByLedgerCache() const
{
std::lock_guard lock{mutex_};
return byLedger_.size();
}

std::size_t
sizeOfBySequenceCache() const
{
std::lock_guard lock{mutex_};
return bySequence_.size();
}
};

} // namespace ripple
Expand Down
5 changes: 1 addition & 4 deletions src/ripple/ledger/CachedView.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ class CachedViewImpl : public DigestAwareReadView
DigestAwareReadView const& base_;
CachedSLEs& cache_;
std::mutex mutable mutex_;
std::unordered_map<
key_type,
std::shared_ptr<SLE const>,
hardened_hash<>> mutable map_;
std::unordered_map<key_type, uint256, hardened_hash<>> mutable map_;

public:
CachedViewImpl() = delete;
Expand Down
33 changes: 23 additions & 10 deletions src/ripple/ledger/impl/CachedView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,38 @@ CachedViewImpl::exists(Keylet const& k) const
std::shared_ptr<SLE const>
CachedViewImpl::read(Keylet const& k) const
{
{
static CountedObjects::Counter hits{"CachedView::cache_hit"};
static CountedObjects::Counter hitsexpired{"CachedView::cache_hit_expired"};
intelliot marked this conversation as resolved.
Show resolved Hide resolved
static CountedObjects::Counter misses{"CachedView::cache_miss"};
bool cacheHit = false;
bool baseRead = false;

auto const digest = [&]() -> std::optional<uint256> {
std::lock_guard lock(mutex_);
auto const iter = map_.find(k.key);
if (iter != map_.end())
{
if (!iter->second || !k.check(*iter->second))
return nullptr;
cacheHit = true;
return iter->second;
}
}
auto const digest = base_.digest(k.key);
return base_.digest(k.key);
intelliot marked this conversation as resolved.
Show resolved Hide resolved
}();
if (!digest)
return nullptr;
auto sle = cache_.fetch(*digest, [&]() { return base_.read(k); });
auto sle = cache_.fetch(*digest, [&]() {
baseRead = true;
return base_.read(k);
});
if (cacheHit && baseRead)
hitsexpired.increment();
else if (cacheHit)
hits.increment();
else
misses.increment();
std::lock_guard lock(mutex_);
auto const er = map_.emplace(k.key, sle);
auto const& iter = er.first;
auto const er = map_.emplace(k.key, *digest);
bool const inserted = er.second;
if (iter->second && !k.check(*iter->second))
if (sle && !k.check(*sle))
{
if (!inserted)
{
Expand All @@ -62,7 +75,7 @@ CachedViewImpl::read(Keylet const& k) const
}
return nullptr;
}
return iter->second;
return sle;
}

} // namespace detail
Expand Down
Loading