Skip to content

Commit

Permalink
Incremental improvements to path finding memory usage:
Browse files Browse the repository at this point in the history
* Abort background path finding when closed or disconnected
* Exit pathfinding job thread if there are no requests left
* Don't bother creating the path find job if there are no requests
* Refactor to remove circular dependency between InfoSub and PathRequest
  • Loading branch information
ximinez authored and nbougalis committed Mar 24, 2022
1 parent 4d5459d commit e7e672c
Show file tree
Hide file tree
Showing 26 changed files with 550 additions and 108 deletions.
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ if (tests)
src/test/basics/contract_test.cpp
src/test/basics/FeeUnits_test.cpp
src/test/basics/hardened_hash_test.cpp
src/test/basics/join_test.cpp
src/test/basics/mulDiv_test.cpp
src/test/basics/tagged_integer_test.cpp
#[===============================[
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/ledger/LedgerHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED

#include <ripple/basics/CountedObject.h>
#include <ripple/basics/contract.h>
#include <mutex>

Expand All @@ -35,7 +36,7 @@ namespace ripple {
way the object always holds a value. We can use the
genesis ledger in all cases.
*/
class LedgerHolder
class LedgerHolder : public CountedObject<LedgerHolder>
{
public:
// Update the held ledger
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/ledger/LedgerReplay.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED

#include <ripple/basics/CountedObject.h>
#include <cstdint>
#include <map>
#include <memory>
Expand All @@ -29,7 +30,7 @@ namespace ripple {
class Ledger;
class STTx;

class LedgerReplay
class LedgerReplay : public CountedObject<LedgerReplay>
{
std::shared_ptr<Ledger const> parent_;
std::shared_ptr<Ledger const> replay_;
Expand Down
48 changes: 44 additions & 4 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,13 @@ LedgerMaster::getPublishedLedgerAge()
std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch();
ret -= pubClose;
ret = (ret > 0s) ? ret : 0s;
static std::chrono::seconds lastRet = -1s;

JLOG(m_journal.trace()) << "Published ledger age is " << ret.count();
if (ret != lastRet)
{
JLOG(m_journal.trace()) << "Published ledger age is " << ret.count();
lastRet = ret;
}
return ret;
}

Expand All @@ -287,8 +292,13 @@ LedgerMaster::getValidatedLedgerAge()
std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch();
ret -= valClose;
ret = (ret > 0s) ? ret : 0s;
static std::chrono::seconds lastRet = -1s;

JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count();
if (ret != lastRet)
{
JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count();
lastRet = ret;
}
return ret;
}

Expand Down Expand Up @@ -1483,12 +1493,14 @@ LedgerMaster::updatePaths()
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
JLOG(m_journal.debug()) << "Need network ledger for updating paths";
return;
}
}

while (!app_.getJobQueue().isStopping())
{
JLOG(m_journal.debug()) << "updatePaths running";
std::shared_ptr<ReadView const> lastLedger;
{
std::lock_guard ml(m_mutex);
Expand All @@ -1506,6 +1518,7 @@ LedgerMaster::updatePaths()
else
{ // Nothing to do
--mPathFindThread;
JLOG(m_journal.debug()) << "Nothing to do for updating paths";
return;
}
}
Expand All @@ -1527,7 +1540,31 @@ LedgerMaster::updatePaths()

try
{
app_.getPathRequests().updateAll(lastLedger);
auto& pathRequests = app_.getPathRequests();
{
std::lock_guard ml(m_mutex);
if (!pathRequests.requestsPending())
{
--mPathFindThread;
JLOG(m_journal.debug())
<< "No path requests found. Nothing to do for updating "
"paths. "
<< mPathFindThread << " jobs remaining";
return;
}
}
JLOG(m_journal.debug()) << "Updating paths";
pathRequests.updateAll(lastLedger);

std::lock_guard ml(m_mutex);
if (!pathRequests.requestsPending())
{
JLOG(m_journal.debug())
<< "No path requests left. No need for further updating "
"paths";
--mPathFindThread;
return;
}
}
catch (SHAMapMissingNode const& mn)
{
Expand Down Expand Up @@ -1587,8 +1624,11 @@ LedgerMaster::newPFWork(
const char* name,
std::unique_lock<std::recursive_mutex>&)
{
if (mPathFindThread < 2)
if (mPathFindThread < 2 && app_.getPathRequests().requestsPending())
{
JLOG(m_journal.debug())
<< "newPFWork: Creating job. path find threads: "
<< mPathFindThread;
if (app_.getJobQueue().addJob(
jtUPDATE_PF, name, [this]() { updatePaths(); }))
{
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/misc/CanonicalTXSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED
#define RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED

#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STTx.h>
#include <ripple/protocol/SeqProxy.h>
Expand All @@ -34,7 +35,7 @@ namespace ripple {
*/
// VFALCO TODO rename to SortedTxSet
class CanonicalTXSet
class CanonicalTXSet : public CountedObject<CanonicalTXSet>
{
private:
class Key
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/app/misc/OrderBook.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
#ifndef RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED
#define RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED

#include <ripple/basics/CountedObject.h>

namespace ripple {

/** Describes a serialized ledger entry for an order book. */
class OrderBook
class OrderBook : public CountedObject<OrderBook>
{
public:
using pointer = std::shared_ptr<OrderBook>;
Expand Down
44 changes: 33 additions & 11 deletions src/ripple/app/paths/PathRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ PathRequest::parseJson(Json::Value const& jvParams)
}

Json::Value
PathRequest::doClose(Json::Value const&)
PathRequest::doClose()
{
JLOG(m_journal.debug()) << iIdentifier << " closed";
std::lock_guard sl(mLock);
Expand All @@ -457,13 +457,20 @@ PathRequest::doStatus(Json::Value const&)
return jvStatus;
}

void
PathRequest::doAborting() const
{
JLOG(m_journal.info()) << iIdentifier << " aborting early";
}

std::unique_ptr<Pathfinder> const&
PathRequest::getPathFinder(
std::shared_ptr<RippleLineCache> const& cache,
hash_map<Currency, std::unique_ptr<Pathfinder>>& currency_map,
Currency const& currency,
STAmount const& dst_amount,
int const level)
int const level,
std::function<bool(void)> const& continueCallback)
{
auto i = currency_map.find(currency);
if (i != currency_map.end())
Expand All @@ -477,8 +484,8 @@ PathRequest::getPathFinder(
dst_amount,
saSendMax,
app_);
if (pathfinder->findPaths(level))
pathfinder->computePathRanks(max_paths_);
if (pathfinder->findPaths(level, continueCallback))
pathfinder->computePathRanks(max_paths_, continueCallback);
else
pathfinder.reset(); // It's a bad request - clear it.
return currency_map[currency] = std::move(pathfinder);
Expand All @@ -488,7 +495,8 @@ bool
PathRequest::findPaths(
std::shared_ptr<RippleLineCache> const& cache,
int const level,
Json::Value& jvArray)
Json::Value& jvArray,
std::function<bool(void)> const& continueCallback)
{
auto sourceCurrencies = sciSourceCurrencies;
if (sourceCurrencies.empty() && saSendMax)
Expand All @@ -515,22 +523,33 @@ PathRequest::findPaths(
hash_map<Currency, std::unique_ptr<Pathfinder>> currency_map;
for (auto const& issue : sourceCurrencies)
{
if (continueCallback && !continueCallback())
break;
JLOG(m_journal.debug())
<< iIdentifier
<< " Trying to find paths: " << STAmount(issue, 1).getFullText();

auto& pathfinder = getPathFinder(
cache, currency_map, issue.currency, dst_amount, level);
cache,
currency_map,
issue.currency,
dst_amount,
level,
continueCallback);
if (!pathfinder)
{
assert(false);
assert(continueCallback && !continueCallback());
JLOG(m_journal.debug()) << iIdentifier << " No paths found";
continue;
}

STPath fullLiquidityPath;
auto ps = pathfinder->getBestPaths(
max_paths_, fullLiquidityPath, mContext[issue], issue.account);
max_paths_,
fullLiquidityPath,
mContext[issue],
issue.account,
continueCallback);
mContext[issue] = ps;

auto& sourceAccount = !isXRP(issue.account)
Expand Down Expand Up @@ -628,7 +647,10 @@ PathRequest::findPaths(
}

Json::Value
PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
PathRequest::doUpdate(
std::shared_ptr<RippleLineCache> const& cache,
bool fast,
std::function<bool(void)> const& continueCallback)
{
using namespace std::chrono;
JLOG(m_journal.debug())
Expand Down Expand Up @@ -699,7 +721,7 @@ PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
JLOG(m_journal.debug()) << iIdentifier << " processing at level " << iLevel;

Json::Value jvArray = Json::arrayValue;
if (findPaths(cache, iLevel, jvArray))
if (findPaths(cache, iLevel, jvArray, continueCallback))
{
bLastSuccess = jvArray.size() != 0;
newStatus[jss::alternatives] = std::move(jvArray);
Expand Down Expand Up @@ -730,7 +752,7 @@ PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
}

InfoSub::pointer
PathRequest::getSubscriber()
PathRequest::getSubscriber() const
{
return wpSubscriber.lock();
}
Expand Down
32 changes: 20 additions & 12 deletions src/ripple/app/paths/PathRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class PathRequests;
// Return values from parseJson <0 = invalid, >0 = valid
#define PFR_PJ_INVALID -1
#define PFR_PJ_NOCHANGE 0
#define PFR_PJ_CHANGE 1

class PathRequest : public std::enable_shared_from_this<PathRequest>,
public CountedObject<PathRequest>
class PathRequest final : public InfoSubRequest,
public std::enable_shared_from_this<PathRequest>,
public CountedObject<PathRequest>
{
public:
using wptr = std::weak_ptr<PathRequest>;
Expand All @@ -55,8 +55,6 @@ class PathRequest : public std::enable_shared_from_this<PathRequest>,
using wref = const wptr&;

public:
// VFALCO TODO Break the cyclic dependency on InfoSub

// path_find semantics
// Subscriber is updated
PathRequest(
Expand Down Expand Up @@ -91,15 +89,20 @@ class PathRequest : public std::enable_shared_from_this<PathRequest>,
doCreate(std::shared_ptr<RippleLineCache> const&, Json::Value const&);

Json::Value
doClose(Json::Value const&);
doClose() override;
Json::Value
doStatus(Json::Value const&);
doStatus(Json::Value const&) override;
void
doAborting() const;

// update jvStatus
Json::Value
doUpdate(std::shared_ptr<RippleLineCache> const&, bool fast);
doUpdate(
std::shared_ptr<RippleLineCache> const&,
bool fast,
std::function<bool(void)> const& continueCallback = {});
InfoSub::pointer
getSubscriber();
getSubscriber() const;
bool
hasCompletion();

Expand All @@ -113,13 +116,18 @@ class PathRequest : public std::enable_shared_from_this<PathRequest>,
hash_map<Currency, std::unique_ptr<Pathfinder>>&,
Currency const&,
STAmount const&,
int const);
int const,
std::function<bool(void)> const&);

/** Finds and sets a PathSet in the JSON argument.
Returns false if the source currencies are inavlid.
*/
bool
findPaths(std::shared_ptr<RippleLineCache> const&, int const, Json::Value&);
findPaths(
std::shared_ptr<RippleLineCache> const&,
int const,
Json::Value&,
std::function<bool(void)> const&);

int
parseJson(Json::Value const&);
Expand Down Expand Up @@ -156,7 +164,7 @@ class PathRequest : public std::enable_shared_from_this<PathRequest>,
int iLevel;
bool bLastSuccess;

int iIdentifier;
int const iIdentifier;

std::chrono::steady_clock::time_point const created_;
std::chrono::steady_clock::time_point quick_reply_;
Expand Down
Loading

0 comments on commit e7e672c

Please sign in to comment.