Skip to content

Commit

Permalink
Recover open ledger transactions to the queue (RIPD-1530):
Browse files Browse the repository at this point in the history
* If the transaction can't be queued, recover to the open ledger once,
  and drop it on the next attempt.
* New result codes for transactions that can not queue.
* Add minimum queue size.
* Remove the obsolete and incorrect SF_RETRY flag.
* fix XRPLF#2215
  • Loading branch information
ximinez committed Sep 21, 2017
1 parent 3bfd9de commit 62127d7
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 88 deletions.
6 changes: 6 additions & 0 deletions doc/rippled-example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@
# time a transaction with a higher fee level is added.
# Default: 20.
#
# minimum_queue_size = <number>
#
# The queue will always be able to hold at least this <number> of
# transactions, regardless of recent ledger sizes or the value of
# ledgers_in_queue. Default: 2000.
#
# retry_sequence_percent = <number>
#
# If a client replaces a transaction in the queue (same sequence
Expand Down
9 changes: 8 additions & 1 deletion src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ RCLConsensus::relay(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_.slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
Expand All @@ -163,6 +164,10 @@ RCLConsensus::relay(RCLCxTx const& tx)
app_.overlay().foreach (send_always(
std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
}
else
{
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
}
}
void
RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
Expand Down Expand Up @@ -303,6 +308,8 @@ RCLConsensus::onClose(
// Build SHAMap containing all transactions in our open ledger
for (auto const& tx : initialLedger->txs)
{
JLOG(j_.trace()) << "Adding open ledger TX " <<
tx.first->getTransactionID();
Serializer s(2048);
tx.first->add(s);
initialSet->addItem(
Expand Down Expand Up @@ -474,7 +481,7 @@ RCLConsensus::doAccept(
{
JLOG(j_.debug())
<< "Test applying disputed transaction that did"
<< " not get in";
<< " not get in " << it.second.tx().id();

SerialIter sit(it.second.tx().tx_.slice());
auto txn = std::make_shared<STTx const>(sit);
Expand Down
17 changes: 10 additions & 7 deletions src/ripple/app/ledger/OpenLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class OpenLedger
std::string const& suffix = "",
modify_type const& f = {});

private:
/** Algorithm for applying transactions.
This has the retry logic and ordering semantics
Expand All @@ -178,9 +179,9 @@ class OpenLedger
apply (Application& app, OpenView& view,
ReadView const& check, FwdRange const& txs,
OrderedTxs& retries, ApplyFlags flags,
beast::Journal j);
std::map<uint256, bool>& shouldRecover,
beast::Journal j);

private:
enum Result
{
success,
Expand All @@ -197,7 +198,7 @@ class OpenLedger
apply_one (Application& app, OpenView& view,
std::shared_ptr< STTx const> const& tx,
bool retry, ApplyFlags flags,
beast::Journal j);
bool shouldRecover, beast::Journal j);
};

//------------------------------------------------------------------------------
Expand All @@ -207,7 +208,8 @@ void
OpenLedger::apply (Application& app, OpenView& view,
ReadView const& check, FwdRange const& txs,
OrderedTxs& retries, ApplyFlags flags,
beast::Journal j)
std::map<uint256, bool>& shouldRecover,
beast::Journal j)
{
for (auto iter = txs.begin();
iter != txs.end(); ++iter)
Expand All @@ -217,10 +219,11 @@ OpenLedger::apply (Application& app, OpenView& view,
// Dereferencing the iterator can
// throw since it may be transformed.
auto const tx = *iter;
if (check.txExists(tx->getTransactionID()))
auto const txId = tx->getTransactionID();
if (check.txExists(txId))
continue;
auto const result = apply_one(app, view,
tx, true, flags, j);
tx, true, flags, shouldRecover[txId], j);
if (result == Result::retry)
retries.insert(tx);
}
Expand All @@ -241,7 +244,7 @@ OpenLedger::apply (Application& app, OpenView& view,
{
switch (apply_one(app, view,
iter->second, retry, flags,
j))
shouldRecover[iter->second->getTransactionID()], j))
{
case Result::success:
++changes;
Expand Down
69 changes: 63 additions & 6 deletions src/ripple/app/ledger/impl/OpenLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
#include <BeastConfig.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h>
#include <ripple/ledger/CachedView.h>
#include <ripple/overlay/Message.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/Feature.h>
#include <boost/range/adaptor/transformed.hpp>

Expand Down Expand Up @@ -84,14 +88,20 @@ OpenLedger::accept(Application& app, Rules const& rules,
JLOG(j_.trace()) <<
"accept ledger " << ledger->seq() << " " << suffix;
auto next = create(rules, ledger);
std::map<uint256, bool> shouldRecover;
if (retriesFirst)
{
for (auto const& tx : retries)
{
auto const txID = tx.second->getTransactionID();
shouldRecover[txID] = app.getHashRouter().shouldRecover(txID);
}
// Handle disputed tx, outside lock
using empty =
std::vector<std::shared_ptr<
STTx const>>;
apply (app, *next, *ledger, empty{},
retries, flags, j_);
retries, flags, shouldRecover, j_);
}
// Block calls to modify, otherwise
// new tx going into the open ledger
Expand All @@ -100,6 +110,19 @@ OpenLedger::accept(Application& app, Rules const& rules,
std::mutex> lock1(modify_mutex_);
// Apply tx from the current open view
if (! current_->txs.empty())
{
for (auto const& tx : current_->txs)
{
auto const txID = tx.first->getTransactionID();
auto iter = shouldRecover.lower_bound(txID);
if (iter != shouldRecover.end()
&& iter->first == txID)
// already had a chance via disputes
iter->second = false;
else
shouldRecover.emplace_hint(iter, txID,
app.getHashRouter().shouldRecover(txID));
}
apply (app, *next, *ledger,
boost::adaptors::transform(
current_->txs,
Expand All @@ -109,14 +132,38 @@ OpenLedger::accept(Application& app, Rules const& rules,
{
return p.first;
}),
retries, flags, j_);
retries, flags, shouldRecover, j_);
}
// Call the modifier
if (f)
f(*next, j_);
// Apply local tx
for (auto const& item : locals)
app.getTxQ().apply(app, *next,
item.second, flags, j_);

// If we didn't relay this transaction recently, relay it to all peers
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Relaying recovered tx " << txId;
protocol::TMTransaction msg;
Serializer s;

tx->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
app.overlay().foreach(send_if_not(
std::make_shared<Message>(msg, protocol::mtTRANSACTION),
peer_in_set(*toSkip)));
}
}

// Switch to the new open view
std::lock_guard<
std::mutex> lock2(current_mutex_);
Expand All @@ -138,14 +185,24 @@ OpenLedger::create (Rules const& rules,
auto
OpenLedger::apply_one (Application& app, OpenView& view,
std::shared_ptr<STTx const> const& tx,
bool retry, ApplyFlags flags,
bool retry, ApplyFlags flags, bool shouldRecover,
beast::Journal j) -> Result
{
if (retry)
flags = flags | tapRETRY;
auto const result = ripple::apply(
app, view, *tx, flags, j);
if (result.second)
auto const result = [&]
{
auto const queueResult = app.getTxQ().apply(
app, view, tx, flags | tapPREFER_QUEUE, j);
// If the transaction can't get into the queue for intrinsic
// reasons, and it can still be recovered, try to put it
// directly into the open ledger, else drop it.
if (queueResult.first == telCAN_NOT_QUEUE && shouldRecover)
return ripple::apply(app, view, *tx, flags, j);
return queueResult;
}();
if (result.second ||
result.first == terQUEUED)
return Result::success;
if (isTefFailure (result.first) ||
isTemMalformed (result.first) ||
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class ApplicationImp
, mFeeTrack (std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))

, mHashRouter (std::make_unique<HashRouter>(
stopwatch(), HashRouter::getDefaultHoldTime ()))
stopwatch(), HashRouter::getDefaultHoldTime (),
HashRouter::getDefaultRecoverLimit ()))

, mValidations (make_Validations (*this))

Expand Down
10 changes: 10 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ HashRouter::shouldRelay (uint256 const& key)
return s.releasePeerSet();
}

bool
HashRouter::shouldRecover(uint256 const& key)
{
std::lock_guard <std::mutex> lock(mutex_);

auto& s = emplace(key).first;

return s.shouldRecover(recoverLimit_);
}

} // ripple
35 changes: 31 additions & 4 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ namespace ripple {
// VFALCO NOTE How can both bad and good be set on a hash?
#define SF_BAD 0x02 // Temporarily bad
#define SF_SAVED 0x04
#define SF_RETRY 0x08 // Transaction can be retried
#define SF_TRUSTED 0x10 // comes from trusted source
// Private flags, used internally in apply.cpp.
// Do not attempt to read, set, or reuse.
Expand Down Expand Up @@ -66,7 +65,6 @@ class HashRouter
static char const* getCountedObjectName () { return "HashRouterEntry"; }

Entry ()
: flags_ (0)
{
}

Expand Down Expand Up @@ -107,12 +105,26 @@ class HashRouter
return true;
}

/** Determines if this item should be recovered from the open ledger.
Counts the number of times the item has been recovered.
Every `limit` times the function is called, return false.
Else return true.
@note The limit must be > 0
*/
bool shouldRecover(std::uint32_t limit)
{
return ++recoveries_ % limit != 0;
}

private:
int flags_;
int flags_ = 0;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
std::uint32_t recoveries_ = 0;
};

public:
Expand All @@ -123,9 +135,16 @@ class HashRouter
return 300s;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
static inline std::uint32_t getDefaultRecoverLimit()
{
return 1;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds,
std::uint32_t recoverLimit)
: suppressionMap_(clock)
, holdTime_ (entryHoldTimeInSeconds)
, recoverLimit_ (recoverLimit + 1u)
{
}

Expand Down Expand Up @@ -164,6 +183,12 @@ class HashRouter
*/
boost::optional<std::set<PeerShortID>> shouldRelay(uint256 const& key);

/** Determines whether the hashed item should be recovered
@return `bool` indicates whether the item should be relayed
*/
bool shouldRecover(uint256 const& key);

private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool> emplace (uint256 const&);
Expand All @@ -175,6 +200,8 @@ class HashRouter
hardened_hash<strong_hash>> suppressionMap_;

std::chrono::seconds const holdTime_;

std::uint32_t const recoverLimit_;
};

} // ripple
Expand Down
8 changes: 1 addition & 7 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,6 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr<STTx const> const& iTrans
auto const txid = trans->getTransactionID ();
auto const flags = app_.getHashRouter().getFlags(txid);

if ((flags & SF_RETRY) != 0)
{
JLOG(m_journal.warn()) << "Redundant transactions submitted";
return;
}

if ((flags & SF_BAD) != 0)
{
JLOG(m_journal.warn()) << "Submitted transaction cached bad";
Expand Down Expand Up @@ -1102,7 +1096,7 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
Serializer s;

e.transaction->getSTransaction()->add (s);
tx.set_rawtransaction (&s.getData().front(), s.getLength());
tx.set_rawtransaction (s.data(), s.size());
tx.set_status (protocol::tsCURRENT);
tx.set_receivetimestamp (app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED);
Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/misc/TxQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TxQ
struct Setup
{
std::size_t ledgersInQueue = 20;
std::size_t queueSizeMin = 2000;
std::uint32_t retrySequencePercent = 25;
// TODO: eahennis. Can we remove the multi tx factor?
std::int32_t multiTxnPercent = -90;
Expand Down
Loading

0 comments on commit 62127d7

Please sign in to comment.