Skip to content

Commit

Permalink
Several changes to improve Consensus stability: (XRPLF#4505)
Browse files Browse the repository at this point in the history
 * Verify accepted ledger becomes validated, and retry
   with a new consensus transaction set if not.
 * Always store proposals.
 * Track proposals by ledger sequence. This helps slow peers catch
   up with the rest of the network.
 * Acquire transaction sets for proposals with future ledger sequences.
   This also helps slow peers catch up.
 * Optimize timer delay for establish phase to wait based on how
   long validators have been sending proposals. This also helps slow
   peers to catch up.
 * Fix impasse achieving close time consensus.
 * Don't wait between open and establish phases.
  • Loading branch information
mtrippled authored and manojsdoshi committed Aug 18, 2023
1 parent a082a2e commit df65548
Show file tree
Hide file tree
Showing 21 changed files with 945 additions and 187 deletions.
32 changes: 32 additions & 0 deletions 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
r aded4a7a92 docs: add API Changelog (#4612)

# Rebase 01df19c08b..aded4a7a92 onto 01df19c08b (1 command)
#
# Commands:
# p, pick <commit> = use commit
# r, reword <commit> = use commit, but edit the commit message
# e, edit <commit> = use commit, but stop for amending
# s, squash <commit> = use commit, but meld into previous commit
# f, fixup [-C | -c] <commit> = like "squash" but keep only the previous
# commit's log message, unless -C is used, in which case
# keep only this commit's message; -c is same as -C but
# opens the editor
# x, exec <command> = run command (the rest of the line) using shell
# b, break = stop here (continue rebase later with 'git rebase --continue')
# d, drop <commit> = remove commit
# l, label <label> = label current HEAD with a name
# t, reset <label> = reset HEAD to a label
# m, merge [-C <commit> | -c <commit>] <label> [# <oneline>]
# create a merge commit using the original merge commit's
# message (or the oneline, if no original merge commit was
# specified); use -c <commit> to reword the commit message
# u, update-ref <ref> = track a placeholder for the <ref> to be updated
# to this position in the new commits. The <ref> is
# updated at the end of the rebase
#
# These lines can be re-ordered; they are executed from top to bottom.
#
# If you remove a line here THAT COMMIT WILL BE LOST.
#
# However, if you remove everything, the rebase will be aborted.
#
3 changes: 3 additions & 0 deletions Builds/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ripple.consensus > ripple.basics
ripple.consensus > ripple.beast
ripple.consensus > ripple.json
ripple.consensus > ripple.protocol
ripple.consensus > ripple.shamap
ripple.core > ripple.beast
ripple.core > ripple.json
ripple.core > ripple.protocol
Expand Down Expand Up @@ -125,11 +126,13 @@ test.core > ripple.server
test.core > test.jtx
test.core > test.toplevel
test.core > test.unit_test
test.csf > ripple.app
test.csf > ripple.basics
test.csf > ripple.beast
test.csf > ripple.consensus
test.csf > ripple.json
test.csf > ripple.protocol
test.csf > test.jtx
test.json > ripple.beast
test.json > ripple.json
test.json > test.jtx
Expand Down
118 changes: 78 additions & 40 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus(
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type const& clock,
Consensus<Adaptor>::clock_type& clock,
ValidatorKeys const& validatorKeys,
beast::Journal journal)
: adaptor_(
Expand Down Expand Up @@ -171,6 +171,9 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());

if (proposal.ledgerSeq().has_value())
prop.set_ledgerseq(*proposal.ledgerSeq());

app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
}

Expand All @@ -180,7 +183,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
JLOG(j_.trace()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_->slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
Expand All @@ -192,13 +195,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
}
else
{
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id();
}
}
void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
<< ripple::to_string(proposal.prevLedger()) << " -> "
<< ripple::to_string(proposal.position());

Expand All @@ -212,6 +215,7 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
prop.set_nodepubkey(
validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size());
prop.set_ledgerseq(*proposal.ledgerSeq());

auto sig = signDigest(
validatorKeys_.publicKey,
Expand Down Expand Up @@ -297,7 +301,8 @@ auto
RCLConsensus::Adaptor::onClose(
RCLCxLedger const& ledger,
NetClock::time_point const& closeTime,
ConsensusMode mode) -> Result
ConsensusMode mode,
clock_type& clock) -> Result
{
const bool wrongLCL = mode == ConsensusMode::wrongLedger;
const bool proposing = mode == ConsensusMode::proposing;
Expand Down Expand Up @@ -379,7 +384,6 @@ RCLConsensus::Adaptor::onClose(

// Needed because of the move below.
auto const setHash = initialSet->getHash().as_uint256();

return Result{
std::move(initialSet),
RCLCxPeerPos::Proposal{
Expand All @@ -388,7 +392,9 @@ RCLConsensus::Adaptor::onClose(
setHash,
closeTime,
app_.timeKeeper().closeTime(),
validatorKeys_.nodeID}};
validatorKeys_.nodeID,
initialLedger->info().seq,
clock}};
}

void
Expand All @@ -400,50 +406,43 @@ RCLConsensus::Adaptor::onForceAccept(
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson));
auto txsBuilt = buildAndValidate(
result, prevLedger, closeResolution, mode, std::move(consensusJson));
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
}

void
RCLConsensus::Adaptor::onAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson)
Json::Value&& consensusJson,
std::pair<CanonicalTxSet_t, Ledger_t>&& tb)
{
app_.getJobQueue().addJob(
jtACCEPT,
"acceptLedger",
[=, this, cj = std::move(consensusJson)]() mutable {
[=,
this,
cj = std::move(consensusJson),
txsBuilt = std::move(tb)]() mutable {
// Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via
// endConsensus).
this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj));
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
this->app_.getOPs().endConsensus();
});
}

void
RCLConsensus::Adaptor::doAccept(
std::pair<
RCLConsensus::Adaptor::CanonicalTxSet_t,
RCLConsensus::Adaptor::Ledger_t>
RCLConsensus::Adaptor::buildAndValidate(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
Ledger_t const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusMode const& mode,
Json::Value&& consensusJson)
{
Expand Down Expand Up @@ -497,12 +496,12 @@ RCLConsensus::Adaptor::doAccept(
{
retriableTxs.insert(
std::make_shared<STTx const>(SerialIter{item.slice()}));
JLOG(j_.debug()) << " Tx: " << item.key();
JLOG(j_.trace()) << " Tx: " << item.key();
}
catch (std::exception const& ex)
{
failed.insert(item.key());
JLOG(j_.warn())
JLOG(j_.trace())
<< " Tx: " << item.key() << " throws: " << ex.what();
}
}
Expand Down Expand Up @@ -579,6 +578,19 @@ RCLConsensus::Adaptor::doAccept(
ledgerMaster_.consensusBuilt(
built.ledger_, result.txns.id(), std::move(consensusJson));

return {retriableTxs, built};
}

void
RCLConsensus::Adaptor::prepareOpenLedger(
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
Result const& result,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode)
{
auto& retriableTxs = txsBuilt.first;
auto const& built = txsBuilt.second;

//-------------------------------------------------------------------------
{
// Apply disputed transactions that didn't get in
Expand All @@ -601,7 +613,7 @@ RCLConsensus::Adaptor::doAccept(
// we voted NO
try
{
JLOG(j_.debug())
JLOG(j_.trace())
<< "Test applying disputed transaction that did"
<< " not get in " << dispute.tx().id();

Expand All @@ -619,7 +631,7 @@ RCLConsensus::Adaptor::doAccept(
}
catch (std::exception const& ex)
{
JLOG(j_.debug()) << "Failed to apply transaction we voted "
JLOG(j_.trace()) << "Failed to apply transaction we voted "
"NO on. Exception: "
<< ex.what();
}
Expand Down Expand Up @@ -669,6 +681,7 @@ RCLConsensus::Adaptor::doAccept(
// we entered the round with the network,
// see how close our close time is to other node's
// close time reports, and update our clock.
bool const consensusFail = result.state == ConsensusState::MovedOn;
if ((mode == ConsensusMode::proposing ||
mode == ConsensusMode::observing) &&
!consensusFail)
Expand Down Expand Up @@ -889,12 +902,30 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
mode_ = after;
}

bool
RCLConsensus::Adaptor::retryAccept(
Ledger_t const& newLedger,
std::optional<std::chrono::time_point<std::chrono::steady_clock>>& start)
const
{
static bool const standalone = ledgerMaster_.standalone();
auto const& validLedger = ledgerMaster_.getValidatedLedger();

return (app_.getOPs().isFull() && !standalone &&
(validLedger && (newLedger.id() != validLedger->info().hash) &&
(newLedger.seq() >= validLedger->info().seq))) &&
(!start ||
std::chrono::steady_clock::now() - *start < std::chrono::seconds{5});
}

//-----------------------------------------------------------------------------

Json::Value
RCLConsensus::getJson(bool full) const
{
Json::Value ret;
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
ret = consensus_.getJson(full);
}
ret["validating"] = adaptor_.validating();
Expand All @@ -906,7 +937,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now)
{
try
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.timerEntry(now);
}
catch (SHAMapMissingNode const& mn)
Expand All @@ -922,7 +953,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{
try
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.gotTxSet(now, txSet);
}
catch (SHAMapMissingNode const& mn)
Expand All @@ -940,7 +971,7 @@ RCLConsensus::simulate(
NetClock::time_point const& now,
std::optional<std::chrono::milliseconds> consensusDelay)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.simulate(now, consensusDelay);
}

Expand All @@ -949,7 +980,7 @@ RCLConsensus::peerProposal(
NetClock::time_point const& now,
RCLCxPeerPos const& newProposal)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
return consensus_.peerProposal(now, newProposal);
}

Expand Down Expand Up @@ -1022,6 +1053,12 @@ RCLConsensus::Adaptor::getQuorumKeys() const
return app_.validators().getQuorumKeys();
}

std::size_t
RCLConsensus::Adaptor::quorum() const
{
return app_.validators().quorum();
}

std::size_t
RCLConsensus::Adaptor::laggards(
Ledger_t::Seq const seq,
Expand Down Expand Up @@ -1051,12 +1088,13 @@ RCLConsensus::startRound(
hash_set<NodeID> const& nowUntrusted,
hash_set<NodeID> const& nowTrusted)
{
std::lock_guard _{mutex_};
std::lock_guard _{adaptor_.peekMutex()};
consensus_.startRound(
now,
prevLgrId,
prevLgr,
nowUntrusted,
adaptor_.preStartRound(prevLgr, nowTrusted));
}

} // namespace ripple
Loading

0 comments on commit df65548

Please sign in to comment.