Skip to content

Commit

Permalink
Simplify the Job Queue:
Browse files Browse the repository at this point in the history
This is a refactor aimed at cleaning up and simplifying the existing
job queue.

As of now, all jobs are cancelled at the same time and in the same
way, so this commit removes the per-job cancellation token. If the
need for such support is demonstrated, support can be re-added.

* Revise documentation for ClosureCounter and Workers.
* Simplify code, removing unnecessary function arguments and
  deduplicating expressions
* Restructure job handlers to no longer need to pass a job's
  handle to the job.
  • Loading branch information
thejohnfreeman authored and nbougalis committed Mar 1, 2022
1 parent df02eb1 commit c2a08a1
Show file tree
Hide file tree
Showing 31 changed files with 166 additions and 244 deletions.
8 changes: 2 additions & 6 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
acquiringLedger_ = hash;

app_.getJobQueue().addJob(
jtADVANCE,
"getConsensusLedger",
[id = hash, &app = app_](Job&) {
jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() {
app.getInboundLedgers().acquire(
id, 0, InboundLedger::Reason::CONSENSUS);
});
Expand Down Expand Up @@ -423,9 +421,7 @@ RCLConsensus::Adaptor::onAccept(
Json::Value&& consensusJson)
{
app_.getJobQueue().addJob(
jtACCEPT,
"acceptLedger",
[=, cj = std::move(consensusJson)](auto&) mutable {
jtACCEPT, "acceptLedger", [=, cj = std::move(consensusJson)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/consensus/RCLValidations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_;

app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [pApp, hash](Job&) {
jtADVANCE, "getConsensusLedger", [pApp, hash]() {
pApp->getInboundLedgers().acquire(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
Expand Down
7 changes: 3 additions & 4 deletions src/ripple/app/ledger/ConsensusTransSetSF.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ ConsensusTransSetSF::gotNode(
auto stx = std::make_shared<STTx const>(std::ref(sit));
assert(stx->getTransactionID() == nodeHash.as_uint256());
auto const pap = &app_;
app_.getJobQueue().addJob(
jtTRANSACTION, "TXS->TXN", [pap, stx](Job&) {
pap->getOPs().submitTransaction(stx);
});
app_.getJobQueue().addJob(jtTRANSACTION, "TXS->TXN", [pap, stx]() {
pap->getOPs().submitTransaction(stx);
});
}
catch (std::exception const&)
{
Expand Down
7 changes: 3 additions & 4 deletions src/ripple/app/ledger/Ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,10 +981,9 @@ pendSaveValidated(

// See if we can use the JobQueue.
if (!isSynchronous &&
app.getJobQueue().addJob(
jobType, jobName, [&app, ledger, isCurrent](Job&) {
saveValidatedLedger(app, ledger, isCurrent);
}))
app.getJobQueue().addJob(jobType, jobName, [&app, ledger, isCurrent]() {
saveValidatedLedger(app, ledger, isCurrent);
}))
{
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/LedgerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class LedgerMaster : public AbstractFetchPackContainer
setPubLedger(std::shared_ptr<Ledger const> const& l);

void
tryFill(Job& job, std::shared_ptr<Ledger const> ledger);
tryFill(std::shared_ptr<Ledger const> ledger);

void
getFetchPack(LedgerIndex missing, InboundLedger::Reason reason);
Expand All @@ -326,7 +326,7 @@ class LedgerMaster : public AbstractFetchPackContainer
findNewLedgersToPublish(std::unique_lock<std::recursive_mutex>&);

void
updatePaths(Job& job);
updatePaths();

// Returns true if work started. Always called with m_mutex locked.
// The passed lock is a reminder to callers.
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/OrderBookDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ OrderBookDB::setup(std::shared_ptr<ReadView const> const& ledger)
update(ledger);
else
app_.getJobQueue().addJob(
jtUPDATE_PF, "OrderBookDB::update", [this, ledger](Job&) {
jtUPDATE_PF, "OrderBookDB::update", [this, ledger]() {
update(ledger);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ InboundLedger::done()

// We hold the PeerSet lock, so must dispatch
app_.getJobQueue().addJob(
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()](Job&) {
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
if (self->complete_ && !self->failed_)
{
self->app_.getLedgerMaster().checkAccept(self->getLedger());
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class InboundLedgersImp : public InboundLedgers
// dispatch
if (ledger->gotData(std::weak_ptr<Peer>(peer), packet))
app_.getJobQueue().addJob(
jtLEDGER_DATA, "processLedgerData", [ledger](Job&) {
jtLEDGER_DATA, "processLedgerData", [ledger]() {
ledger->runData();
});

Expand All @@ -198,7 +198,7 @@ class InboundLedgersImp : public InboundLedgers
if (packet->type() == protocol::liAS_NODE)
{
app_.getJobQueue().addJob(
jtLEDGER_DATA, "gotStaleData", [this, packet](Job&) {
jtLEDGER_DATA, "gotStaleData", [this, packet]() {
gotStaleData(packet);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/LedgerDeltaAcquire.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ LedgerDeltaAcquire::onLedgerBuilt(
app_.getJobQueue().addJob(
jtREPLAY_TASK,
"onLedgerBuilt",
[=, ledger = this->fullLedger_, &app = this->app_](Job&) {
[=, ledger = this->fullLedger_, &app = this->app_]() {
for (auto reason : reasons)
{
switch (reason)
Expand Down
21 changes: 10 additions & 11 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ LedgerMaster::getEarliestFetch()
}

void
LedgerMaster::tryFill(Job& job, std::shared_ptr<Ledger const> ledger)
LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
{
std::uint32_t seq = ledger->info().seq;
uint256 prevHash = ledger->info().parentHash;
Expand All @@ -710,7 +710,7 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr<Ledger const> ledger)
std::uint32_t maxHas = seq;

NodeStore::Database& nodeStore{app_.getNodeStore()};
while (!job.shouldCancel() && seq > 0)
while (!app_.getJobQueue().isStopping() && seq > 0)
{
{
std::lock_guard ml(m_mutex);
Expand Down Expand Up @@ -1453,7 +1453,7 @@ LedgerMaster::tryAdvance()
if (!mAdvanceThread && !mValidLedger.empty())
{
mAdvanceThread = true;
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this](Job&) {
app_.getJobQueue().addJob(jtADVANCE, "advanceLedger", [this]() {
std::unique_lock sl(m_mutex);

assert(!mValidLedger.empty() && mAdvanceThread);
Expand All @@ -1476,7 +1476,7 @@ LedgerMaster::tryAdvance()
}

void
LedgerMaster::updatePaths(Job& job)
LedgerMaster::updatePaths()
{
{
std::lock_guard ml(m_mutex);
Expand All @@ -1487,7 +1487,7 @@ LedgerMaster::updatePaths(Job& job)
}
}

while (!job.shouldCancel())
while (!app_.getJobQueue().isStopping())
{
std::shared_ptr<ReadView const> lastLedger;
{
Expand Down Expand Up @@ -1527,8 +1527,7 @@ LedgerMaster::updatePaths(Job& job)

try
{
app_.getPathRequests().updateAll(
lastLedger, job.getCancelCallback());
app_.getPathRequests().updateAll(lastLedger);
}
catch (SHAMapMissingNode const& mn)
{
Expand Down Expand Up @@ -1591,7 +1590,7 @@ LedgerMaster::newPFWork(
if (mPathFindThread < 2)
{
if (app_.getJobQueue().addJob(
jtUPDATE_PF, name, [this](Job& j) { updatePaths(j); }))
jtUPDATE_PF, name, [this]() { updatePaths(); }))
{
++mPathFindThread;
}
Expand Down Expand Up @@ -1942,8 +1941,8 @@ LedgerMaster::fetchForHistory(
mFillInProgress = seq;
}
app_.getJobQueue().addJob(
jtADVANCE, "tryFill", [this, ledger](Job& j) {
tryFill(j, ledger);
jtADVANCE, "tryFill", [this, ledger]() {
tryFill(ledger);
});
}
}
Expand Down Expand Up @@ -2124,7 +2123,7 @@ LedgerMaster::gotFetchPack(bool progress, std::uint32_t seq)
{
if (!mGotFetchPackThread.test_and_set(std::memory_order_acquire))
{
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&](Job&) {
app_.getJobQueue().addJob(jtLEDGER_DATA, "gotFetchPack", [&]() {
app_.getInboundLedgers().gotFetchPack();
mGotFetchPackThread.clear(std::memory_order_release);
});
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/TimeoutCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TimeoutCounter::queueJob(ScopedLockType& sl)
app_.getJobQueue().addJob(
queueJobParameter_.jobType,
queueJobParameter_.jobName,
[wptr = pmDowncast()](Job&) {
[wptr = pmDowncast()]() {
if (auto sptr = wptr.lock(); sptr)
sptr->invokeOnTimer();
});
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/TransactionAcquire.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ TransactionAcquire::done()
// just updates the consensus and related structures when we acquire
// a transaction set. No need to update them if we're shutting down.
app_.getJobQueue().addJob(
jtTXN_DATA, "completeAcquire", [pap, hash, map](Job&) {
jtTXN_DATA, "completeAcquire", [pap, hash, map]() {
pap->getInboundTransactions().giveSet(hash, map, true);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ class ApplicationImp : public Application, public BasicApp
if (e.value() == boost::system::errc::success)
{
m_jobQueue->addJob(
jtSWEEP, "sweep", [this](Job&) { doSweep(); });
jtSWEEP, "sweep", [this]() { doSweep(); });
}
// Recover as best we can if an unexpected error occurs.
if (e.value() != boost::system::errc::success &&
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/NodeStoreScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
if (jobQueue_.isStopped())
return;

if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) {
if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task]() {
task.performScheduledTask();
}))
{
Expand Down
24 changes: 11 additions & 13 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ NetworkOPsImp::setHeartbeatTimer()
heartbeatTimer_,
mConsensus.parms().ledgerGRANULARITY,
[this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this](Job&) {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
processHeartbeatTimer();
});
},
Expand All @@ -964,7 +964,7 @@ NetworkOPsImp::setClusterTimer()
clusterTimer_,
10s,
[this]() {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this](Job&) {
m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
processClusterTimer();
});
},
Expand Down Expand Up @@ -1153,7 +1153,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)

auto tx = std::make_shared<Transaction>(trans, reason, app_);

m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx](Job&) {
m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
auto t = tx;
processTransaction(t, false, false, FailHard::no);
});
Expand Down Expand Up @@ -1224,9 +1224,8 @@ NetworkOPsImp::doTransactionAsync(

if (mDispatchState == DispatchState::none)
{
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this](Job&) {
transactionBatch();
}))
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
{
mDispatchState = DispatchState::scheduled;
}
Expand Down Expand Up @@ -1262,10 +1261,9 @@ NetworkOPsImp::doTransactionSync(
if (mTransactions.size())
{
// More transactions need to be applied, but by another job.
if (m_job_queue.addJob(
jtBATCH, "transactionBatch", [this](Job&) {
transactionBatch();
}))
if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
transactionBatch();
}))
{
mDispatchState = DispatchState::scheduled;
}
Expand Down Expand Up @@ -2941,7 +2939,7 @@ NetworkOPsImp::reportFeeChange()
if (f != mLastFeeSummary)
{
m_job_queue.addJob(
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this](Job&) {
jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
pubServer();
});
}
Expand All @@ -2953,7 +2951,7 @@ NetworkOPsImp::reportConsensusStateChange(ConsensusPhase phase)
m_job_queue.addJob(
jtCLIENT_CONSENSUS,
"reportConsensusStateChange->pubConsensus",
[this, phase](Job&) { pubConsensus(phase); });
[this, phase]() { pubConsensus(phase); });
}

inline void
Expand Down Expand Up @@ -3346,7 +3344,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
app_.getJobQueue().addJob(
jtCLIENT_ACCT_HIST,
"AccountHistoryTxStream",
[this, dbType = databaseType, subInfo](Job&) {
[this, dbType = databaseType, subInfo]() {
auto const& accountId = subInfo.index_->accountId_;
auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
Expand Down
8 changes: 3 additions & 5 deletions src/ripple/app/paths/PathRequests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ PathRequests::getLineCache(
}

void
PathRequests::updateAll(
std::shared_ptr<ReadView const> const& inLedger,
Job::CancelCallback shouldCancel)
PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
{
auto event =
app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll");
Expand All @@ -84,7 +82,7 @@ PathRequests::updateAll(
{
for (auto const& wr : requests)
{
if (shouldCancel())
if (app_.getJobQueue().isStopping())
break;

auto request = wr.lock();
Expand Down Expand Up @@ -174,7 +172,7 @@ PathRequests::updateAll(
requests = requests_;
cache = getLineCache(cache->getLedger(), false);
}
} while (!shouldCancel());
} while (!app_.getJobQueue().isStopping());

JLOG(mJournal.debug()) << "updateAll complete: " << processed
<< " processed and " << removed << " removed";
Expand Down
5 changes: 1 addition & 4 deletions src/ripple/app/paths/PathRequests.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ class PathRequests
/** Update all of the contained PathRequest instances.
@param ledger Ledger we are pathfinding in.
@param shouldCancel Invocable that returns whether to cancel.
*/
void
updateAll(
std::shared_ptr<ReadView const> const& ledger,
Job::CancelCallback shouldCancel);
updateAll(std::shared_ptr<ReadView const> const& ledger);

std::shared_ptr<RippleLineCache>
getLineCache(
Expand Down
16 changes: 8 additions & 8 deletions src/ripple/core/ClosureCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ namespace ripple {

/**
* The role of a `ClosureCounter` is to assist in shutdown by letting callers
* wait for the completion of callbacks (of a single type signature) that they
* previously scheduled. The lifetime of a `ClosureCounter` consists of two
* wait for the completion of closures (of a specific type signature) that they
* previously registered. These closures are typically callbacks for
* asynchronous operations. The lifetime of a `ClosureCounter` consists of two
* phases: the initial expanding "fork" phase, and the subsequent shrinking
* "join" phase.
*
* In the fork phase, callers register a callback by passing the callback and
* In the fork phase, callers register a closure by passing the closure and
* receiving a substitute in return. The substitute has the same callable
* interface as the callback, and it informs the `ClosureCounter` whenever it
* interface as the closure, and it informs the `ClosureCounter` whenever it
* is copied or destroyed, so that it can keep an accurate count of copies.
*
* The transition to the join phase is made by a call to `join`. In this
* phase, every substitute returned going forward will be empty, signaling to
* the caller that they should just drop the callback and cancel their
* asynchronous operation. `join` blocks until all existing callback
* substitutes are destroyed.
* phase, every substitute returned going forward will be null, signaling to
* the caller that they should drop the closure and cancel their operation.
* `join` blocks until all existing closure substitutes are destroyed.
*
* \tparam Ret_t The return type of the closure.
* \tparam Args_t The argument types of the closure.
Expand Down
Loading

0 comments on commit c2a08a1

Please sign in to comment.