Skip to content

Commit

Permalink
Migrate thread safety to RCLConsensus (RIPD-1389):
Browse files Browse the repository at this point in the history
Moves thread saftey from generic Consensus to RCLConsensus.  RCLConsensus uses
a lock to control all calls into the consensus code.  The adaptor class now
maintains atomic versions of status information that can be accessed by
NetworkOPs concurrently with the main consensus calls.
  • Loading branch information
bachase committed Jul 5, 2017
1 parent fd7bdca commit d20650e
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 203 deletions.
3 changes: 3 additions & 0 deletions docs/consensus.qbk
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,9 @@ struct Adaptor
Ledger const & prevLedger,
ConsensusMode mode);

// Called when consensus operating mode changes
void onModeChange(ConsensuMode before, ConsensusMode after);

// Called when ledger closes. Implementation should generate an initial Result
// with position based on the current open ledger's transactions.
ConsensusResult onClose(Ledger const &, Ledger const & prev, ConsensusMode mode);
Expand Down
91 changes: 66 additions & 25 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ RCLConsensus::RCLConsensus(
validatorKeys,
journal)
, consensus_(clock, adaptor_, journal)
, j_(journal)

{
}
Expand Down Expand Up @@ -370,18 +371,22 @@ RCLConsensus::Adaptor::onAccept(
Json::Value && consensusJson)
{
app_.getJobQueue().addJob(
jtACCEPT, "acceptLedger", [&](auto&) {
// note that no lock is held inside this thread, which
// is fine since once a ledger is accepted, consensus
// will not touch any internal state until startRound is called
doAccept(
jtACCEPT,
"acceptLedger",
[&, cj = std::move(consensusJson) ](auto&) mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson));
app_.getOPs().endConsensus();
std::move(cj));
this->app_.getOPs().endConsensus();
});
}

Expand All @@ -394,6 +399,9 @@ RCLConsensus::Adaptor::doAccept(
ConsensusMode const& mode,
Json::Value && consensusJson)
{
prevProposers_ = result.proposers;
prevRoundTime_ = result.roundTime.read();

bool closeTimeCorrect;

const bool proposing = mode == ConsensusMode::proposing;
Expand Down Expand Up @@ -863,8 +871,12 @@ RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, bool proposing)
Json::Value
RCLConsensus::getJson(bool full) const
{
auto ret = consensus_.getJson(full);
ret["validating"] = adaptor_.validating_;
Json::Value ret;
{
ScopedLockType _{mutex_};
ret = consensus_.getJson(full);
}
ret["validating"] = adaptor_.validating();
return ret;
}

Expand All @@ -873,12 +885,13 @@ RCLConsensus::timerEntry(NetClock::time_point const& now)
{
try
{
ScopedLockType _{mutex_};
consensus_.timerEntry(now);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
JLOG(adaptor_.j_.error()) << "Missing node during consensus process " << mn;
JLOG(j_.error()) << "Missing node during consensus process " << mn;
Rethrow();
}
}
Expand All @@ -888,43 +901,71 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{
try
{
ScopedLockType _{mutex_};
consensus_.gotTxSet(now, txSet);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
JLOG(adaptor_.j_.error()) << "Missing node during consensus process " << mn;
JLOG(j_.error()) << "Missing node during consensus process " << mn;
Rethrow();
}
}


//! @see Consensus::simulate

void
RCLConsensus::startRound(
RCLConsensus::simulate(
NetClock::time_point const& now,
RCLCxLedger::ID const& prevLgrId,
RCLCxLedger const& prevLgr)
boost::optional<std::chrono::milliseconds> consensusDelay)
{
// We have a key, and we have some idea what the ledger is
adaptor_.validating_ =
!adaptor_.app_.getOPs().isNeedNetworkLedger() && (adaptor_.valPublic_.size() != 0);
ScopedLockType _{mutex_};
consensus_.simulate(now, consensusDelay);
}

// propose only if we're in sync with the network (and validating)
bool proposing =
adaptor_.validating_ && (adaptor_.app_.getOPs().getOperatingMode() == NetworkOPs::omFULL);
bool
RCLConsensus::peerProposal(
NetClock::time_point const& now,
RCLCxPeerPos const& newProposal)
{
ScopedLockType _{mutex_};
return consensus_.peerProposal(now, newProposal);
}

if (adaptor_.validating_)
bool
RCLConsensus::Adaptor::preStartRound(RCLCxLedger const & prevLgr)
{
// We have a key, and we have some idea what the ledger is
validating_ =
!app_.getOPs().isNeedNetworkLedger() && (valPublic_.size() != 0);

if (validating_)
{
JLOG(adaptor_.j_.info()) << "Entering consensus process, validating";
JLOG(j_.info()) << "Entering consensus process, validating";
}
else
{
// Otherwise we just want to monitor the validation process.
JLOG(adaptor_.j_.info()) << "Entering consensus process, watching";
JLOG(j_.info()) << "Entering consensus process, watching";
}

// Notify inbOund ledgers that we are starting a new round
adaptor_.inboundTransactions_.newRound(prevLgr.seq());
inboundTransactions_.newRound(prevLgr.seq());

// propose only if we're in sync with the network (and validating)
return validating_ &&
(app_.getOPs().getOperatingMode() == NetworkOPs::omFULL);
}

consensus_.startRound(now, prevLgrId, prevLgr, proposing);
void
RCLConsensus::startRound(
NetClock::time_point const& now,
RCLCxLedger::ID const& prevLgrId,
RCLCxLedger const& prevLgr)
{
ScopedLockType _{mutex_};
consensus_.startRound(
now, prevLgrId, prevLgr, adaptor_.preStartRound(prevLgr));
}
}
Loading

0 comments on commit d20650e

Please sign in to comment.