Skip to content

Commit

Permalink
Recover old open ledger transactions to the queue:
Browse files Browse the repository at this point in the history
* Recover to the open ledger once, then to the queue.
* If transaction fails to queue for any reason, drop it.
* New result codes for transactions that can not queue.
* Add minimum queue size
* RIPD-1530
fix XRPLF#2215
  • Loading branch information
ximinez committed Sep 19, 2017
1 parent 3bfd9de commit 8f30526
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 50 deletions.
8 changes: 7 additions & 1 deletion src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ RCLConsensus::relay(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
{
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_.slice();
protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size());
Expand All @@ -163,6 +164,10 @@ RCLConsensus::relay(RCLCxTx const& tx)
app_.overlay().foreach (send_always(
std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
}
else
{
JLOG(j_.debug()) << "Not relaying disputedtx " << tx.id();
}
}
void
RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
Expand Down Expand Up @@ -303,6 +308,7 @@ RCLConsensus::onClose(
// Build SHAMap containing all transactions in our open ledger
for (auto const& tx : initialLedger->txs)
{
JLOG(j_.debug()) << "Adding open ledger TX " << tx.first->getTransactionID();
Serializer s(2048);
tx.first->add(s);
initialSet->addItem(
Expand Down Expand Up @@ -474,7 +480,7 @@ RCLConsensus::doAccept(
{
JLOG(j_.debug())
<< "Test applying disputed transaction that did"
<< " not get in";
<< " not get in " << it.second.tx().id();

SerialIter sit(it.second.tx().tx_.slice());
auto txn = std::make_shared<STTx const>(sit);
Expand Down
42 changes: 40 additions & 2 deletions src/ripple/app/ledger/impl/OpenLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
#include <BeastConfig.h>
#include <ripple/app/ledger/OpenLedger.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h>
#include <ripple/ledger/CachedView.h>
#include <ripple/overlay/Message.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/overlay/predicates.h>
#include <ripple/protocol/Feature.h>
#include <boost/range/adaptor/transformed.hpp>

Expand Down Expand Up @@ -117,6 +121,29 @@ OpenLedger::accept(Application& app, Rules const& rules,
for (auto const& item : locals)
app.getTxQ().apply(app, *next,
item.second, flags, j_);

// If we didn't relay this transaction recently, relay it to all peers
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Relaying recovered tx " << txId;
protocol::TMTransaction msg;
Serializer s;

tx->add(s);
msg.set_rawtransaction(&s.getData().front(), s.getLength());
msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
app.overlay().foreach(send_if_not(
std::make_shared<Message>(msg, protocol::mtTRANSACTION),
peer_in_set(*toSkip)));
}
}

// Switch to the new open view
std::lock_guard<
std::mutex> lock2(current_mutex_);
Expand All @@ -143,8 +170,19 @@ OpenLedger::apply_one (Application& app, OpenView& view,
{
if (retry)
flags = flags | tapRETRY;
auto const result = ripple::apply(
app, view, *tx, flags, j);
auto const result = [&]
{
if (app.getHashRouter().shouldRecover(tx->getTransactionID()))
return ripple::apply(
app, view, *tx, flags, j);
else
// If the transaction can't get into the queue for any reason,
// drop it. If other nodes / validators have it, it'll be in
// their proposed set. If it's stuck on this node, then clean
// up the open ledger.
return app.getTxQ().apply(
app, view, tx, flags, j);
}();
if (result.second)
return Result::success;
if (isTefFailure (result.first) ||
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ class ApplicationImp
, mFeeTrack (std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))

, mHashRouter (std::make_unique<HashRouter>(
stopwatch(), HashRouter::getDefaultHoldTime ()))
stopwatch(), HashRouter::getDefaultHoldTime (),
HashRouter::getDefaultRecoverLimit ()))

, mValidations (make_Validations (*this))

Expand Down
10 changes: 10 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ HashRouter::shouldRelay (uint256 const& key)
return s.releasePeerSet();
}

bool
HashRouter::shouldRecover(uint256 const& key)
{
std::lock_guard <std::mutex> lock(mutex_);

auto& s = emplace(key).first;

return s.shouldRecover(recoverLimit_);
}

} // ripple
41 changes: 38 additions & 3 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class HashRouter
static char const* getCountedObjectName () { return "HashRouterEntry"; }

Entry ()
: flags_ (0)
{
}

Expand Down Expand Up @@ -107,12 +106,33 @@ class HashRouter
return true;
}

/** Determines if this item should be recovered from the open ledger.
Counts the number of times the item has been recovered.
If it hits the limit, reset the counter and return false.
Else, increment the counter and return true.
@note The limit is signed while the counter is unsigned.
A negative limit will retry forever.
*/
bool shouldRecover(std::int32_t limit)
{
if (recoveries_ == limit)
{
recoveries_ = 0;
return false;
}
++recoveries_;
return true;
}

private:
int flags_;
int flags_ = 0;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
std::uint16_t recoveries_ = 0;
};

public:
Expand All @@ -123,9 +143,16 @@ class HashRouter
return 300s;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
static inline std::int32_t getDefaultRecoverLimit()
{
return 1;
}

HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds,
std::int32_t recoverLimit)
: suppressionMap_(clock)
, holdTime_ (entryHoldTimeInSeconds)
, recoverLimit_ (recoverLimit)
{
}

Expand Down Expand Up @@ -164,6 +191,12 @@ class HashRouter
*/
boost::optional<std::set<PeerShortID>> shouldRelay(uint256 const& key);

/** Determines whether the hashed item should be recovered
@return `bool` indicates whether the item should be relayed
*/
bool shouldRecover(uint256 const& key);

private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool> emplace (uint256 const&);
Expand All @@ -175,6 +208,8 @@ class HashRouter
hardened_hash<strong_hash>> suppressionMap_;

std::chrono::seconds const holdTime_;

std::int32_t const recoverLimit_;
};

} // ripple
Expand Down
1 change: 1 addition & 0 deletions src/ripple/app/misc/TxQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TxQ
struct Setup
{
std::size_t ledgersInQueue = 20;
std::size_t queueSizeMin = 2000;
std::uint32_t retrySequencePercent = 25;
// TODO: eahennis. Can we remove the multi tx factor?
std::int32_t multiTxnPercent = -90;
Expand Down
40 changes: 21 additions & 19 deletions src/ripple/app/misc/impl/TxQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view,
promise to stick around for long enough that it has
a realistic chance of getting into a ledger.
*/
auto lastValid = getLastLedgerSequence(tx);
auto const lastValid = getLastLedgerSequence(tx);
canBeHeld = !lastValid || *lastValid >=
view.info().seq + setup_.minimumLastLedgerBuffer;
}
Expand All @@ -349,10 +349,15 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view,
can queue. Mitigates the lost cost of relaying should
an early one fail or get dropped.
*/
auto const tSeq = tx.getSequence();
canBeHeld = accountIter == byAccount_.end() ||
replacementIter ||
accountIter->second.getTxnCount() <
setup_.maximumTxnPerAccount;
setup_.maximumTxnPerAccount ||
// Allow the transaction to get in front of the first
// queued transaction. Allows recovery of open ledger
// transactions, and stuck transactions.
tSeq < accountIter->second.transactions.begin()->first;
}
return canBeHeld;
}
Expand Down Expand Up @@ -512,8 +517,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
non-blockers?
Yes: Remove the queued transaction. Continue to next
step.
No: Reject `txn` with `telINSUF_FEE_P` or
`telCAN_NOT_QUEUE`. Stop.
No: Reject `txn` with `telCAN_NOT_QUEUE_FEE`. Stop.
No: Continue to next step.
3. Does this tx have the expected sequence number for the
account?
Expand All @@ -527,11 +531,11 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
than the previous tx?
No: Reject with `telINSUF_FEE_P`. Stop.
Yes: Are any of the prior sequence txs blockers?
Yes: Reject with `telCAN_NOT_QUEUE`. Stop.
Yes: Reject with `telCAN_NOT_QUEUE_BLOCKED`. Stop.
No: Are the fees in-flight of the other
queued txs >= than the account
balance or minimum account reserve?
Yes: Reject with `telCAN_NOT_QUEUE`. Stop.
Yes: Reject with `telCAN_NOT_QUEUE_BALANCE`. Stop.
No: Create a throwaway sandbox `View`. Modify
the account's sequence number to match
the tx (avoid `terPRE_SEQ`), and decrease
Expand All @@ -550,8 +554,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
it to `doApply()` and return that result.
No: Continue to the next step.
6. Can the tx be held in the queue? (See TxQ::canBeHeld).
No: Reject `txn` with `telINSUF_FEE_P` if this tx
has the current sequence, or `telCAN_NOT_QUEUE`
No: Reject `txn` with `telCAN_NOT_QUEUE_FULL`
if not. Stop.
Yes: Continue to the next step.
7. Is the queue full?
Expand Down Expand Up @@ -679,8 +682,7 @@ TxQ::apply(Application& app, OpenView& view,
transactionID <<
" in favor of normal queued " <<
existingIter->second.txID;
return{existingIter == txQAcct.transactions.begin() ?
telINSUF_FEE_P : telCAN_NOT_QUEUE, false };
return {telCAN_NOT_QUEUE_BLOCKS, false };
}
}
}
Expand Down Expand Up @@ -710,7 +712,7 @@ TxQ::apply(Application& app, OpenView& view,
transactionID <<
" in favor of queued " <<
existingIter->second.txID;
return{ telINSUF_FEE_P, false };
return{ telCAN_NOT_QUEUE_FEE, false };
}
}
}
Expand Down Expand Up @@ -806,7 +808,7 @@ TxQ::apply(Application& app, OpenView& view,
transactionID <<
". A blocker-type transaction " <<
"is in the queue.";
return{ telCAN_NOT_QUEUE, false };
return{ telCAN_NOT_QUEUE_BLOCKED, false };
}
multiTxn->fee +=
workingIter->second.consequences->fee;
Expand All @@ -824,7 +826,7 @@ TxQ::apply(Application& app, OpenView& view,
than the account's current balance, or the
minimum reserve. If it is, then there's a risk
that the fees won't get paid, so drop this
transaction with a telCAN_NOT_QUEUE result.
transaction with a telCAN_NOT_QUEUE_BALANCE result.
TODO: Decide whether to count the current txn fee
in this limit if it's the last transaction for
this account. Currently, it will not count,
Expand Down Expand Up @@ -866,7 +868,7 @@ TxQ::apply(Application& app, OpenView& view,
"Ignoring transaction " <<
transactionID <<
". Total fees in flight too high.";
return{ telCAN_NOT_QUEUE, false };
return{ telCAN_NOT_QUEUE_BALANCE, false };
}

// Create the test view from the current view
Expand Down Expand Up @@ -970,8 +972,7 @@ TxQ::apply(Application& app, OpenView& view,
JLOG(j_.trace()) << "Transaction " <<
transactionID <<
" can not be held";
return { feeLevelPaid >= requiredFeeLevel ?
telCAN_NOT_QUEUE : telINSUF_FEE_P, false };
return { telCAN_NOT_QUEUE, false };
}

// If the queue is full, decide whether to drop the current
Expand All @@ -986,7 +987,7 @@ TxQ::apply(Application& app, OpenView& view,
transactionID <<
" would kick a transaction from the same account (" <<
account << ") out of the queue.";
return { telCAN_NOT_QUEUE, false };
return { telCAN_NOT_QUEUE_FULL, false };
}
auto const& endAccount = byAccount_.at(lastRIter->account);
auto endEffectiveFeeLevel = [&]()
Expand Down Expand Up @@ -1038,7 +1039,7 @@ TxQ::apply(Application& app, OpenView& view,
JLOG(j_.warn()) << "Queue is full, and transaction " <<
transactionID <<
" fee is lower than end item's account average fee";
return { telINSUF_FEE_P, false };
return { telCAN_NOT_QUEUE_FULL, false };
}
}

Expand Down Expand Up @@ -1105,7 +1106,8 @@ TxQ::processClosedLedger(Application& app,
auto ledgerSeq = view.info().seq;

if (!timeLeap)
maxSize_ = snapshot.txnsExpected * setup_.ledgersInQueue;
maxSize_ = std::max (snapshot.txnsExpected * setup_.ledgersInQueue,
setup_.queueSizeMin);

// Remove any queued candidates whose LastLedgerSequence has gone by.
for(auto candidateIter = byFee_.begin(); candidateIter != byFee_.end(); )
Expand Down
11 changes: 11 additions & 0 deletions src/ripple/protocol/TER.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ enum TER
telINSUF_FEE_P,
telNO_DST_PARTIAL,
telCAN_NOT_QUEUE,
telCAN_NOT_QUEUE_BALANCE,
telCAN_NOT_QUEUE_BLOCKS,
telCAN_NOT_QUEUE_BLOCKED,
telCAN_NOT_QUEUE_FEE,
telCAN_NOT_QUEUE_FULL,
// update istelCanNotQueue if any similar codes are added

// -299 .. -200: M Malformed (bad signature)
// Causes:
Expand Down Expand Up @@ -217,6 +223,11 @@ inline bool isTelLocal(TER x)
return ((x) >= telLOCAL_ERROR && (x) < temMALFORMED);
}

inline bool isTelCanNotQueue(TER x)
{
return ((x) >= telCAN_NOT_QUEUE && (x) <= telCAN_NOT_QUEUE_FULL);
}

inline bool isTemMalformed(TER x)
{
return ((x) >= temMALFORMED && (x) < tefFAILURE);
Expand Down
5 changes: 5 additions & 0 deletions src/ripple/protocol/impl/TER.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ transResults()
{ telINSUF_FEE_P, { "telINSUF_FEE_P", "Fee insufficient." } },
{ telNO_DST_PARTIAL, { "telNO_DST_PARTIAL", "Partial payment to create account not allowed." } },
{ telCAN_NOT_QUEUE, { "telCAN_NOT_QUEUE", "Can not queue at this time." } },
{ telCAN_NOT_QUEUE_BALANCE, { "telCAN_NOT_QUEUE_BALANCE", "Can not queue at this time: insufficient balance to pay all queued fees." } },
{ telCAN_NOT_QUEUE_BLOCKS, { "telCAN_NOT_QUEUE_BLOCKS", "Can not queue at this time: would block later queued transaction(s)." } },
{ telCAN_NOT_QUEUE_BLOCKED, { "telCAN_NOT_QUEUE_BLOCKED", "Can not queue at this time: blocking transaction in queue." } },
{ telCAN_NOT_QUEUE_FEE, { "telCAN_NOT_QUEUE_FEE", "Can not queue at this time: fee insufficient to replace queued transaction." } },
{ telCAN_NOT_QUEUE_FULL, { "telCAN_NOT_QUEUE_FULL", "Can not queue at this time: queue is full." } },

{ temMALFORMED, { "temMALFORMED", "Malformed transaction." } },
{ temBAD_AMOUNT, { "temBAD_AMOUNT", "Can only send positive amounts." } },
Expand Down
Loading

0 comments on commit 8f30526

Please sign in to comment.