From 62127d725d801641bfaa61dee7d88c95e48820c5 Mon Sep 17 00:00:00 2001 From: Edward Hennis Date: Tue, 12 Sep 2017 18:32:31 -0400 Subject: [PATCH] Recover open ledger transactions to the queue (RIPD-1530): * If the transaction can't be queued, recover to the open ledger once, and drop it on the next attempt. * New result codes for transactions that can not queue. * Add minimum queue size. * Remove the obsolete and incorrect SF_RETRY flag. * fix #2215 --- doc/rippled-example.cfg | 6 ++ src/ripple/app/consensus/RCLConsensus.cpp | 9 ++- src/ripple/app/ledger/OpenLedger.h | 17 ++-- src/ripple/app/ledger/impl/OpenLedger.cpp | 69 ++++++++++++++-- src/ripple/app/main/Application.cpp | 3 +- src/ripple/app/misc/HashRouter.cpp | 10 +++ src/ripple/app/misc/HashRouter.h | 35 +++++++- src/ripple/app/misc/NetworkOPs.cpp | 8 +- src/ripple/app/misc/TxQ.h | 1 + src/ripple/app/misc/impl/TxQ.cpp | 97 +++++++++++++++-------- src/ripple/ledger/ApplyView.h | 5 ++ src/ripple/overlay/impl/PeerImp.cpp | 7 +- src/ripple/protocol/TER.h | 5 ++ src/ripple/protocol/impl/TER.cpp | 7 +- src/test/app/HashRouter_test.cpp | 46 +++++++++-- src/test/app/TxQ_test.cpp | 39 ++++----- 16 files changed, 276 insertions(+), 88 deletions(-) diff --git a/doc/rippled-example.cfg b/doc/rippled-example.cfg index c645802343f..5eeb418dd19 100644 --- a/doc/rippled-example.cfg +++ b/doc/rippled-example.cfg @@ -475,6 +475,12 @@ # time a transaction with a higher fee level is added. # Default: 20. # +# minimum_queue_size = +# +# The queue will always be able to hold at least this of +# transactions, regardless of recent ledger sizes or the value of +# ledgers_in_queue. Default: 2000. +# # retry_sequence_percent = # # If a client replaces a transaction in the queue (same sequence diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 55323ff86d2..f5b240ebc95 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -154,6 +154,7 @@ RCLConsensus::relay(RCLCxTx const& tx) // If we didn't relay this transaction recently, relay it to all peers if (app_.getHashRouter().shouldRelay(tx.id())) { + JLOG(j_.debug()) << "Relaying disputed tx " << tx.id(); auto const slice = tx.tx_.slice(); protocol::TMTransaction msg; msg.set_rawtransaction(slice.data(), slice.size()); @@ -163,6 +164,10 @@ RCLConsensus::relay(RCLCxTx const& tx) app_.overlay().foreach (send_always( std::make_shared(msg, protocol::mtTRANSACTION))); } + else + { + JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id(); + } } void RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal) @@ -303,6 +308,8 @@ RCLConsensus::onClose( // Build SHAMap containing all transactions in our open ledger for (auto const& tx : initialLedger->txs) { + JLOG(j_.trace()) << "Adding open ledger TX " << + tx.first->getTransactionID(); Serializer s(2048); tx.first->add(s); initialSet->addItem( @@ -474,7 +481,7 @@ RCLConsensus::doAccept( { JLOG(j_.debug()) << "Test applying disputed transaction that did" - << " not get in"; + << " not get in " << it.second.tx().id(); SerialIter sit(it.second.tx().tx_.slice()); auto txn = std::make_shared(sit); diff --git a/src/ripple/app/ledger/OpenLedger.h b/src/ripple/app/ledger/OpenLedger.h index a6fa7d5e193..1f5a4feff4f 100644 --- a/src/ripple/app/ledger/OpenLedger.h +++ b/src/ripple/app/ledger/OpenLedger.h @@ -167,6 +167,7 @@ class OpenLedger std::string const& suffix = "", modify_type const& f = {}); +private: /** Algorithm for applying transactions. This has the retry logic and ordering semantics @@ -178,9 +179,9 @@ class OpenLedger apply (Application& app, OpenView& view, ReadView const& check, FwdRange const& txs, OrderedTxs& retries, ApplyFlags flags, - beast::Journal j); + std::map& shouldRecover, + beast::Journal j); -private: enum Result { success, @@ -197,7 +198,7 @@ class OpenLedger apply_one (Application& app, OpenView& view, std::shared_ptr< STTx const> const& tx, bool retry, ApplyFlags flags, - beast::Journal j); + bool shouldRecover, beast::Journal j); }; //------------------------------------------------------------------------------ @@ -207,7 +208,8 @@ void OpenLedger::apply (Application& app, OpenView& view, ReadView const& check, FwdRange const& txs, OrderedTxs& retries, ApplyFlags flags, - beast::Journal j) + std::map& shouldRecover, + beast::Journal j) { for (auto iter = txs.begin(); iter != txs.end(); ++iter) @@ -217,10 +219,11 @@ OpenLedger::apply (Application& app, OpenView& view, // Dereferencing the iterator can // throw since it may be transformed. auto const tx = *iter; - if (check.txExists(tx->getTransactionID())) + auto const txId = tx->getTransactionID(); + if (check.txExists(txId)) continue; auto const result = apply_one(app, view, - tx, true, flags, j); + tx, true, flags, shouldRecover[txId], j); if (result == Result::retry) retries.insert(tx); } @@ -241,7 +244,7 @@ OpenLedger::apply (Application& app, OpenView& view, { switch (apply_one(app, view, iter->second, retry, flags, - j)) + shouldRecover[iter->second->getTransactionID()], j)) { case Result::success: ++changes; diff --git a/src/ripple/app/ledger/impl/OpenLedger.cpp b/src/ripple/app/ledger/impl/OpenLedger.cpp index 955584b72f2..08493f14094 100644 --- a/src/ripple/app/ledger/impl/OpenLedger.cpp +++ b/src/ripple/app/ledger/impl/OpenLedger.cpp @@ -20,9 +20,13 @@ #include #include #include +#include #include #include #include +#include +#include +#include #include #include @@ -84,14 +88,20 @@ OpenLedger::accept(Application& app, Rules const& rules, JLOG(j_.trace()) << "accept ledger " << ledger->seq() << " " << suffix; auto next = create(rules, ledger); + std::map shouldRecover; if (retriesFirst) { + for (auto const& tx : retries) + { + auto const txID = tx.second->getTransactionID(); + shouldRecover[txID] = app.getHashRouter().shouldRecover(txID); + } // Handle disputed tx, outside lock using empty = std::vector>; apply (app, *next, *ledger, empty{}, - retries, flags, j_); + retries, flags, shouldRecover, j_); } // Block calls to modify, otherwise // new tx going into the open ledger @@ -100,6 +110,19 @@ OpenLedger::accept(Application& app, Rules const& rules, std::mutex> lock1(modify_mutex_); // Apply tx from the current open view if (! current_->txs.empty()) + { + for (auto const& tx : current_->txs) + { + auto const txID = tx.first->getTransactionID(); + auto iter = shouldRecover.lower_bound(txID); + if (iter != shouldRecover.end() + && iter->first == txID) + // already had a chance via disputes + iter->second = false; + else + shouldRecover.emplace_hint(iter, txID, + app.getHashRouter().shouldRecover(txID)); + } apply (app, *next, *ledger, boost::adaptors::transform( current_->txs, @@ -109,7 +132,8 @@ OpenLedger::accept(Application& app, Rules const& rules, { return p.first; }), - retries, flags, j_); + retries, flags, shouldRecover, j_); + } // Call the modifier if (f) f(*next, j_); @@ -117,6 +141,29 @@ OpenLedger::accept(Application& app, Rules const& rules, for (auto const& item : locals) app.getTxQ().apply(app, *next, item.second, flags, j_); + + // If we didn't relay this transaction recently, relay it to all peers + for (auto const& txpair : next->txs) + { + auto const& tx = txpair.first; + auto const txId = tx->getTransactionID(); + if (auto const toSkip = app.getHashRouter().shouldRelay(txId)) + { + JLOG(j_.debug()) << "Relaying recovered tx " << txId; + protocol::TMTransaction msg; + Serializer s; + + tx->add(s); + msg.set_rawtransaction(s.data(), s.size()); + msg.set_status(protocol::tsNEW); + msg.set_receivetimestamp( + app.timeKeeper().now().time_since_epoch().count()); + app.overlay().foreach(send_if_not( + std::make_shared(msg, protocol::mtTRANSACTION), + peer_in_set(*toSkip))); + } + } + // Switch to the new open view std::lock_guard< std::mutex> lock2(current_mutex_); @@ -138,14 +185,24 @@ OpenLedger::create (Rules const& rules, auto OpenLedger::apply_one (Application& app, OpenView& view, std::shared_ptr const& tx, - bool retry, ApplyFlags flags, + bool retry, ApplyFlags flags, bool shouldRecover, beast::Journal j) -> Result { if (retry) flags = flags | tapRETRY; - auto const result = ripple::apply( - app, view, *tx, flags, j); - if (result.second) + auto const result = [&] + { + auto const queueResult = app.getTxQ().apply( + app, view, tx, flags | tapPREFER_QUEUE, j); + // If the transaction can't get into the queue for intrinsic + // reasons, and it can still be recovered, try to put it + // directly into the open ledger, else drop it. + if (queueResult.first == telCAN_NOT_QUEUE && shouldRecover) + return ripple::apply(app, view, *tx, flags, j); + return queueResult; + }(); + if (result.second || + result.first == terQUEUED) return Result::success; if (isTefFailure (result.first) || isTemMalformed (result.first) || diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 7f3675a7be8..73308950fc3 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -469,7 +469,8 @@ class ApplicationImp , mFeeTrack (std::make_unique(logs_->journal("LoadManager"))) , mHashRouter (std::make_unique( - stopwatch(), HashRouter::getDefaultHoldTime ())) + stopwatch(), HashRouter::getDefaultHoldTime (), + HashRouter::getDefaultRecoverLimit ())) , mValidations (make_Validations (*this)) diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 7a5397f4103..8e407126058 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -107,4 +107,14 @@ HashRouter::shouldRelay (uint256 const& key) return s.releasePeerSet(); } +bool +HashRouter::shouldRecover(uint256 const& key) +{ + std::lock_guard lock(mutex_); + + auto& s = emplace(key).first; + + return s.shouldRecover(recoverLimit_); +} + } // ripple diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index 742f49e3efb..a40d3558bf9 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -34,7 +34,6 @@ namespace ripple { // VFALCO NOTE How can both bad and good be set on a hash? #define SF_BAD 0x02 // Temporarily bad #define SF_SAVED 0x04 -#define SF_RETRY 0x08 // Transaction can be retried #define SF_TRUSTED 0x10 // comes from trusted source // Private flags, used internally in apply.cpp. // Do not attempt to read, set, or reuse. @@ -66,7 +65,6 @@ class HashRouter static char const* getCountedObjectName () { return "HashRouterEntry"; } Entry () - : flags_ (0) { } @@ -107,12 +105,26 @@ class HashRouter return true; } + /** Determines if this item should be recovered from the open ledger. + + Counts the number of times the item has been recovered. + Every `limit` times the function is called, return false. + Else return true. + + @note The limit must be > 0 + */ + bool shouldRecover(std::uint32_t limit) + { + return ++recoveries_ % limit != 0; + } + private: - int flags_; + int flags_ = 0; std::set peers_; // This could be generalized to a map, if more // than one flag needs to expire independently. boost::optional relayed_; + std::uint32_t recoveries_ = 0; }; public: @@ -123,9 +135,16 @@ class HashRouter return 300s; } - HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds) + static inline std::uint32_t getDefaultRecoverLimit() + { + return 1; + } + + HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds, + std::uint32_t recoverLimit) : suppressionMap_(clock) , holdTime_ (entryHoldTimeInSeconds) + , recoverLimit_ (recoverLimit + 1u) { } @@ -164,6 +183,12 @@ class HashRouter */ boost::optional> shouldRelay(uint256 const& key); + /** Determines whether the hashed item should be recovered + + @return `bool` indicates whether the item should be relayed + */ + bool shouldRecover(uint256 const& key); + private: // pair.second indicates whether the entry was created std::pair emplace (uint256 const&); @@ -175,6 +200,8 @@ class HashRouter hardened_hash> suppressionMap_; std::chrono::seconds const holdTime_; + + std::uint32_t const recoverLimit_; }; } // ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 231f31a5bfa..6af2675abeb 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -788,12 +788,6 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr const& iTrans auto const txid = trans->getTransactionID (); auto const flags = app_.getHashRouter().getFlags(txid); - if ((flags & SF_RETRY) != 0) - { - JLOG(m_journal.warn()) << "Redundant transactions submitted"; - return; - } - if ((flags & SF_BAD) != 0) { JLOG(m_journal.warn()) << "Submitted transaction cached bad"; @@ -1102,7 +1096,7 @@ void NetworkOPsImp::apply (std::unique_lock& batchLock) Serializer s; e.transaction->getSTransaction()->add (s); - tx.set_rawtransaction (&s.getData().front(), s.getLength()); + tx.set_rawtransaction (s.data(), s.size()); tx.set_status (protocol::tsCURRENT); tx.set_receivetimestamp (app_.timeKeeper().now().time_since_epoch().count()); tx.set_deferred(e.result == terQUEUED); diff --git a/src/ripple/app/misc/TxQ.h b/src/ripple/app/misc/TxQ.h index dd54f7ca81a..27fc524a517 100644 --- a/src/ripple/app/misc/TxQ.h +++ b/src/ripple/app/misc/TxQ.h @@ -55,6 +55,7 @@ class TxQ struct Setup { std::size_t ledgersInQueue = 20; + std::size_t queueSizeMin = 2000; std::uint32_t retrySequencePercent = 25; // TODO: eahennis. Can we remove the multi tx factor? std::int32_t multiTxnPercent = -90; diff --git a/src/ripple/app/misc/impl/TxQ.cpp b/src/ripple/app/misc/impl/TxQ.cpp index 0db98865e3e..39900e62844 100644 --- a/src/ripple/app/misc/impl/TxQ.cpp +++ b/src/ripple/app/misc/impl/TxQ.cpp @@ -339,7 +339,7 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view, promise to stick around for long enough that it has a realistic chance of getting into a ledger. */ - auto lastValid = getLastLedgerSequence(tx); + auto const lastValid = getLastLedgerSequence(tx); canBeHeld = !lastValid || *lastValid >= view.info().seq + setup_.minimumLastLedgerBuffer; } @@ -349,10 +349,31 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view, can queue. Mitigates the lost cost of relaying should an early one fail or get dropped. */ - canBeHeld = accountIter == byAccount_.end() || - replacementIter || - accountIter->second.getTxnCount() < - setup_.maximumTxnPerAccount; + + // Allow if the account is not in the queue at all + canBeHeld = accountIter == byAccount_.end(); + + if(!canBeHeld) + { + // Allow this tx to replace another one + canBeHeld = replacementIter.is_initialized(); + } + + if (!canBeHeld) + { + // Allow if there are fewer than the limit + canBeHeld = accountIter->second.getTxnCount() < + setup_.maximumTxnPerAccount; + } + + if (!canBeHeld) + { + // Allow if the transaction goes in front of any + // queued transactions. Enables recovery of open + // ledger transactions, and stuck transactions. + auto const tSeq = tx.getSequence(); + canBeHeld = tSeq < accountIter->second.transactions.rbegin()->first; + } } return canBeHeld; } @@ -512,8 +533,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view, non-blockers? Yes: Remove the queued transaction. Continue to next step. - No: Reject `txn` with `telINSUF_FEE_P` or - `telCAN_NOT_QUEUE`. Stop. + No: Reject `txn` with `telCAN_NOT_QUEUE_FEE`. Stop. No: Continue to next step. 3. Does this tx have the expected sequence number for the account? @@ -527,11 +547,11 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view, than the previous tx? No: Reject with `telINSUF_FEE_P`. Stop. Yes: Are any of the prior sequence txs blockers? - Yes: Reject with `telCAN_NOT_QUEUE`. Stop. + Yes: Reject with `telCAN_NOT_QUEUE_BLOCKED`. Stop. No: Are the fees in-flight of the other queued txs >= than the account balance or minimum account reserve? - Yes: Reject with `telCAN_NOT_QUEUE`. Stop. + Yes: Reject with `telCAN_NOT_QUEUE_BALANCE`. Stop. No: Create a throwaway sandbox `View`. Modify the account's sequence number to match the tx (avoid `terPRE_SEQ`), and decrease @@ -550,8 +570,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view, it to `doApply()` and return that result. No: Continue to the next step. 6. Can the tx be held in the queue? (See TxQ::canBeHeld). - No: Reject `txn` with `telINSUF_FEE_P` if this tx - has the current sequence, or `telCAN_NOT_QUEUE` + No: Reject `txn` with `telCAN_NOT_QUEUE_FULL` if not. Stop. Yes: Continue to the next step. 7. Is the queue full? @@ -613,8 +632,15 @@ TxQ::apply(Application& app, OpenView& view, auto const baseFee = calculateBaseFee(app, view, *tx, j); auto const feeLevelPaid = getFeeLevelPaid(*tx, baseLevel, baseFee, setup_); - auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel( - metricsSnapshot, view); + auto const requiredFeeLevel = [&]() + { + auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view); + if ((flags & tapPREFER_QUEUE) && byFee_.size()) + { + return std::max(feeLevel, byFee_.begin()->feeLevel); + } + return feeLevel; + }(); auto accountIter = byAccount_.find(account); bool const accountExists = accountIter != byAccount_.end(); @@ -679,8 +705,7 @@ TxQ::apply(Application& app, OpenView& view, transactionID << " in favor of normal queued " << existingIter->second.txID; - return{existingIter == txQAcct.transactions.begin() ? - telINSUF_FEE_P : telCAN_NOT_QUEUE, false }; + return {telCAN_NOT_QUEUE_BLOCKS, false }; } } } @@ -710,7 +735,7 @@ TxQ::apply(Application& app, OpenView& view, transactionID << " in favor of queued " << existingIter->second.txID; - return{ telINSUF_FEE_P, false }; + return{ telCAN_NOT_QUEUE_FEE, false }; } } } @@ -806,7 +831,7 @@ TxQ::apply(Application& app, OpenView& view, transactionID << ". A blocker-type transaction " << "is in the queue."; - return{ telCAN_NOT_QUEUE, false }; + return{ telCAN_NOT_QUEUE_BLOCKED, false }; } multiTxn->fee += workingIter->second.consequences->fee; @@ -824,7 +849,7 @@ TxQ::apply(Application& app, OpenView& view, than the account's current balance, or the minimum reserve. If it is, then there's a risk that the fees won't get paid, so drop this - transaction with a telCAN_NOT_QUEUE result. + transaction with a telCAN_NOT_QUEUE_BALANCE result. TODO: Decide whether to count the current txn fee in this limit if it's the last transaction for this account. Currently, it will not count, @@ -866,7 +891,7 @@ TxQ::apply(Application& app, OpenView& view, "Ignoring transaction " << transactionID << ". Total fees in flight too high."; - return{ telCAN_NOT_QUEUE, false }; + return{ telCAN_NOT_QUEUE_BALANCE, false }; } // Create the test view from the current view @@ -904,24 +929,27 @@ TxQ::apply(Application& app, OpenView& view, /* Quick heuristic check to see if it's worth checking that this tx has a high enough fee to clear all the txs in the queue. - 1) Must be an account already in the queue. - 2) Must be have passed the multiTxn checks (tx is not the next + 1) Transaction is trying to get into the open ledger + 2) Must be an account already in the queue. + 3) Must be have passed the multiTxn checks (tx is not the next account seq, the skipped seqs are in the queue, the reserve doesn't get exhausted, etc). - 3) The next transaction must not have previously tried and failed + 4) The next transaction must not have previously tried and failed to apply to an open ledger. - 4) Tx must be paying more than just the required fee level to + 5) Tx must be paying more than just the required fee level to get itself into the queue. - 5) Fee level must be escalated above the default (if it's not, + 6) Fee level must be escalated above the default (if it's not, then the first tx _must_ have failed to process in `accept` for some other reason. Tx is allowed to queue in case conditions change, but don't waste the effort to clear). - 6) Tx is not a 0-fee / free transaction, regardless of fee level. + 7) Tx is not a 0-fee / free transaction, regardless of fee level. */ - if (accountExists && multiTxn.is_initialized() && - multiTxn->nextTxIter->second.retriesRemaining == MaybeTx::retriesAllowed && - feeLevelPaid > requiredFeeLevel && - requiredFeeLevel > baseLevel && baseFee != 0) + if (!(flags & tapPREFER_QUEUE) && accountExists && + multiTxn.is_initialized() && + multiTxn->nextTxIter->second.retriesRemaining == + MaybeTx::retriesAllowed && + feeLevelPaid > requiredFeeLevel && + requiredFeeLevel > baseLevel && baseFee != 0) { OpenView sandbox(open_ledger, &view, view.rules()); @@ -970,8 +998,7 @@ TxQ::apply(Application& app, OpenView& view, JLOG(j_.trace()) << "Transaction " << transactionID << " can not be held"; - return { feeLevelPaid >= requiredFeeLevel ? - telCAN_NOT_QUEUE : telINSUF_FEE_P, false }; + return { telCAN_NOT_QUEUE, false }; } // If the queue is full, decide whether to drop the current @@ -986,7 +1013,7 @@ TxQ::apply(Application& app, OpenView& view, transactionID << " would kick a transaction from the same account (" << account << ") out of the queue."; - return { telCAN_NOT_QUEUE, false }; + return { telCAN_NOT_QUEUE_FULL, false }; } auto const& endAccount = byAccount_.at(lastRIter->account); auto endEffectiveFeeLevel = [&]() @@ -1038,7 +1065,7 @@ TxQ::apply(Application& app, OpenView& view, JLOG(j_.warn()) << "Queue is full, and transaction " << transactionID << " fee is lower than end item's account average fee"; - return { telINSUF_FEE_P, false }; + return { telCAN_NOT_QUEUE_FULL, false }; } } @@ -1105,7 +1132,8 @@ TxQ::processClosedLedger(Application& app, auto ledgerSeq = view.info().seq; if (!timeLeap) - maxSize_ = snapshot.txnsExpected * setup_.ledgersInQueue; + maxSize_ = std::max (snapshot.txnsExpected * setup_.ledgersInQueue, + setup_.queueSizeMin); // Remove any queued candidates whose LastLedgerSequence has gone by. for(auto candidateIter = byFee_.begin(); candidateIter != byFee_.end(); ) @@ -1454,6 +1482,7 @@ setup_TxQ(Config const& config) TxQ::Setup setup; auto const& section = config.section("transaction_queue"); set(setup.ledgersInQueue, "ledgers_in_queue", section); + set(setup.queueSizeMin, "minimum_queue_size", section); set(setup.retrySequencePercent, "retry_sequence_percent", section); set(setup.multiTxnPercent, "multi_txn_percent", section); set(setup.minimumEscalationMultiplier, diff --git a/src/ripple/ledger/ApplyView.h b/src/ripple/ledger/ApplyView.h index 40c83e4b505..5e555e84309 100644 --- a/src/ripple/ledger/ApplyView.h +++ b/src/ripple/ledger/ApplyView.h @@ -36,6 +36,11 @@ enum ApplyFlags // Transaction can be retried, soft failures allowed tapRETRY = 0x20, + // Transaction must pay more than both the open ledger + // fee and all transactions in the queue to get into the + // open ledger + tapPREFER_QUEUE = 0x40, + // Transaction came from a privileged source tapUNLIMITED = 0x400, }; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index e2ec14eeea9..f31fe60b573 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1056,6 +1056,8 @@ PeerImp::onMessage (std::shared_ptr const& m) { // If we've never been in synch, there's nothing we can do // with a transaction + JLOG(p_journal_.debug()) << "Ignoring incoming transaction: " << + "Need network ledger"; return; } @@ -1075,11 +1077,10 @@ PeerImp::onMessage (std::shared_ptr const& m) if (flags & SF_BAD) { fee_ = Resource::feeInvalidSignature; + JLOG(p_journal_.debug()) << "Ignoring known bad tx " << + txID; return; } - - if (!(flags & SF_RETRY)) - return; } JLOG(p_journal_.debug()) << "Got tx " << txID; diff --git a/src/ripple/protocol/TER.h b/src/ripple/protocol/TER.h index 45cd2903fb2..55e6900032f 100644 --- a/src/ripple/protocol/TER.h +++ b/src/ripple/protocol/TER.h @@ -47,6 +47,11 @@ enum TER telINSUF_FEE_P, telNO_DST_PARTIAL, telCAN_NOT_QUEUE, + telCAN_NOT_QUEUE_BALANCE, + telCAN_NOT_QUEUE_BLOCKS, + telCAN_NOT_QUEUE_BLOCKED, + telCAN_NOT_QUEUE_FEE, + telCAN_NOT_QUEUE_FULL, // -299 .. -200: M Malformed (bad signature) // Causes: diff --git a/src/ripple/protocol/impl/TER.cpp b/src/ripple/protocol/impl/TER.cpp index 0ee0bc27e4e..bd4618c4c68 100644 --- a/src/ripple/protocol/impl/TER.cpp +++ b/src/ripple/protocol/impl/TER.cpp @@ -99,7 +99,12 @@ transResults() { telFAILED_PROCESSING, { "telFAILED_PROCESSING", "Failed to correctly process transaction." } }, { telINSUF_FEE_P, { "telINSUF_FEE_P", "Fee insufficient." } }, { telNO_DST_PARTIAL, { "telNO_DST_PARTIAL", "Partial payment to create account not allowed." } }, - { telCAN_NOT_QUEUE, { "telCAN_NOT_QUEUE", "Can not queue at this time." } }, + { telCAN_NOT_QUEUE, { "telCAN_NOT_QUEUE", "Can not queue at this time." } }, + { telCAN_NOT_QUEUE_BALANCE, { "telCAN_NOT_QUEUE_BALANCE", "Can not queue at this time: insufficient balance to pay all queued fees." } }, + { telCAN_NOT_QUEUE_BLOCKS, { "telCAN_NOT_QUEUE_BLOCKS", "Can not queue at this time: would block later queued transaction(s)." } }, + { telCAN_NOT_QUEUE_BLOCKED, { "telCAN_NOT_QUEUE_BLOCKED", "Can not queue at this time: blocking transaction in queue." } }, + { telCAN_NOT_QUEUE_FEE, { "telCAN_NOT_QUEUE_FEE", "Can not queue at this time: fee insufficient to replace queued transaction." } }, + { telCAN_NOT_QUEUE_FULL, { "telCAN_NOT_QUEUE_FULL", "Can not queue at this time: queue is full." } }, { temMALFORMED, { "temMALFORMED", "Malformed transaction." } }, { temBAD_AMOUNT, { "temBAD_AMOUNT", "Can only send positive amounts." } }, diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 1cd52b9734a..85178e08a16 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -32,7 +32,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 2); uint256 const key1(1); uint256 const key2(2); @@ -69,7 +69,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 2); uint256 const key1(1); uint256 const key2(2); @@ -148,7 +148,7 @@ class HashRouter_test : public beast::unit_test::suite // Normal HashRouter using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 2); uint256 const key1(1); uint256 const key2(2); @@ -178,7 +178,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 2s); + HashRouter router(stopwatch, 2s, 2); uint256 const key1(1); BEAST_EXPECT(router.setFlags(key1, 10)); @@ -191,7 +191,7 @@ class HashRouter_test : public beast::unit_test::suite { using namespace std::chrono_literals; TestStopwatch stopwatch; - HashRouter router(stopwatch, 1s); + HashRouter router(stopwatch, 1s, 2); uint256 const key1(1); @@ -229,6 +229,41 @@ class HashRouter_test : public beast::unit_test::suite BEAST_EXPECT(peers && peers->size() == 0); } + void + testRecover() + { + using namespace std::chrono_literals; + TestStopwatch stopwatch; + HashRouter router(stopwatch, 1s, 5); + + uint256 const key1(1); + + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(!router.shouldRecover(key1)); + // Expire, but since the next search will + // be for this entry, it will get refreshed + // instead. + ++stopwatch; + BEAST_EXPECT(router.shouldRecover(key1)); + // Expire, but since the next search will + // be for this entry, it will get refreshed + // instead. + ++stopwatch; + // Recover again. Recovery is independent of + // time as long as the entry doesn't expire. + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(router.shouldRecover(key1)); + // Expire again + ++stopwatch; + BEAST_EXPECT(router.shouldRecover(key1)); + BEAST_EXPECT(!router.shouldRecover(key1)); + } + public: void @@ -239,6 +274,7 @@ class HashRouter_test : public beast::unit_test::suite testSuppression(); testSetFlags(); testRelay(); + testRecover(); } }; diff --git a/src/test/app/TxQ_test.cpp b/src/test/app/TxQ_test.cpp index 8adb6cca617..1d92bf0dbad 100644 --- a/src/test/app/TxQ_test.cpp +++ b/src/test/app/TxQ_test.cpp @@ -109,6 +109,7 @@ class TxQ_test : public beast::unit_test::suite auto p = test::jtx::envconfig(); auto& section = p->section("transaction_queue"); section.set("ledgers_in_queue", "2"); + section.set("minimum_queue_size", "2"); section.set("min_ledgers_to_compute_size_limit", "3"); section.set("max_ledger_counts_to_store", "100"); section.set("retry_sequence_percent", "25"); @@ -240,7 +241,7 @@ class TxQ_test : public beast::unit_test::suite // Hank sees his txn got held and bumps the fee, // but doesn't even bump it enough to requeue - env(noop(hank), fee(11), ter(telINSUF_FEE_P)); + env(noop(hank), fee(11), ter(telCAN_NOT_QUEUE_FEE)); checkMetrics(env, 2, 12, 7, 6, 256); // Hank sees his txn got held and bumps the fee, @@ -303,7 +304,7 @@ class TxQ_test : public beast::unit_test::suite // Try to add another transaction with the default (low) fee, // it should fail because the queue is full. - env(noop(charlie), ter(telINSUF_FEE_P)); + env(noop(charlie), ter(telCAN_NOT_QUEUE_FULL)); // Add another transaction, with a higher fee, // Not high enough to get into the ledger, but high @@ -441,7 +442,7 @@ class TxQ_test : public beast::unit_test::suite BEAST_EXPECT(env.current()->info().seq == 6); // Fail to queue an item with a low LastLedgerSeq env(noop(alice), json(R"({"LastLedgerSequence":7})"), - ter(telINSUF_FEE_P)); + ter(telCAN_NOT_QUEUE)); // Queue an item with a sufficient LastLedgerSeq. env(noop(alice), json(R"({"LastLedgerSequence":8})"), queued); @@ -599,7 +600,7 @@ class TxQ_test : public beast::unit_test::suite // average fee. (Which is ~144,115,188,075,855,907 // because of the zero fee txn.) env(noop(carol), fee(feeCarol), - seq(seqCarol), ter(telINSUF_FEE_P)); + seq(seqCarol), ter(telCAN_NOT_QUEUE_FULL)); env.close(); // Some of Bob's transactions stay in the queue, @@ -820,13 +821,13 @@ class TxQ_test : public beast::unit_test::suite // queue. env(noop(alice), seq(aliceSeq), json(jss::LastLedgerSequence, lastLedgerSeq + 7), - fee(aliceFee), ter(telCAN_NOT_QUEUE)); + fee(aliceFee), ter(telCAN_NOT_QUEUE_FULL)); checkMetrics(env, 8, 8, 5, 4, 513); // Charlie - try to add another item to the queue, // which fails because fee is lower than Alice's // queued average. - env(noop(charlie), fee(19), ter(telINSUF_FEE_P)); + env(noop(charlie), fee(19), ter(telCAN_NOT_QUEUE_FULL)); checkMetrics(env, 8, 8, 5, 4, 513); // Charlie - add another item to the queue, which @@ -845,7 +846,7 @@ class TxQ_test : public beast::unit_test::suite // so resubmits with higher fee, but the queue // is full, and her account is the cheapest. env(noop(alice), seq(aliceSeq - 1), - fee(aliceFee), ter(telCAN_NOT_QUEUE)); + fee(aliceFee), ter(telCAN_NOT_QUEUE_FULL)); checkMetrics(env, 8, 8, 5, 4, 513); // Try to replace a middle item in the queue @@ -853,7 +854,7 @@ class TxQ_test : public beast::unit_test::suite aliceSeq = env.seq(alice) + 2; aliceFee = 25; env(noop(alice), seq(aliceSeq), - fee(aliceFee), ter(telINSUF_FEE_P)); + fee(aliceFee), ter(telCAN_NOT_QUEUE_FEE)); checkMetrics(env, 8, 8, 5, 4, 513); // Replace a middle item from the queue successfully @@ -877,7 +878,7 @@ class TxQ_test : public beast::unit_test::suite aliceFee = env.le(alice)->getFieldAmount(sfBalance).xrp().drops() - (59); env(noop(alice), seq(aliceSeq), - fee(aliceFee), ter(telCAN_NOT_QUEUE)); + fee(aliceFee), ter(telCAN_NOT_QUEUE_BALANCE)); checkMetrics(env, 4, 10, 6, 5, 256); // Try to spend more than Alice can afford with all the other txs. @@ -899,7 +900,7 @@ class TxQ_test : public beast::unit_test::suite aliceFee /= 5; ++aliceSeq; env(noop(alice), seq(aliceSeq), - fee(aliceFee), ter(telCAN_NOT_QUEUE)); + fee(aliceFee), ter(telCAN_NOT_QUEUE_BALANCE)); checkMetrics(env, 4, 10, 6, 5, 256); env.close(); @@ -1005,7 +1006,7 @@ class TxQ_test : public beast::unit_test::suite // Try to add another transaction with the default (low) fee, // it should fail because it can't replace the one already // there. - env(noop(charlie), ter(telINSUF_FEE_P)); + env(noop(charlie), ter(telCAN_NOT_QUEUE_FEE)); // Add another transaction, with a higher fee, // Not high enough to get into the ledger, but high @@ -1115,7 +1116,7 @@ class TxQ_test : public beast::unit_test::suite // is still uninitialized, so preflight succeeds here, // and this txn fails because it can't be stored in the queue. env(noop(alice), json(R"({"AccountTxnID": "0"})"), - ter(telINSUF_FEE_P)); + ter(telCAN_NOT_QUEUE)); checkMetrics(env, 0, boost::none, 2, 1, 256); env.close(); @@ -1218,7 +1219,7 @@ class TxQ_test : public beast::unit_test::suite // Try adding a new transaction. // Too many fees in flight. env(noop(alice), fee(drops(200)), seq(aliceSeq+1), - ter(telCAN_NOT_QUEUE)); + ter(telCAN_NOT_QUEUE_BALANCE)); checkMetrics(env, 4, 6, 5, 3, 256); // Close the ledger. All of Alice's transactions @@ -1230,7 +1231,7 @@ class TxQ_test : public beast::unit_test::suite // Still can't add a new transaction for Alice, // no matter the fee. env(noop(alice), fee(drops(200)), seq(aliceSeq + 1), - ter(telCAN_NOT_QUEUE)); + ter(telCAN_NOT_QUEUE_BALANCE)); checkMetrics(env, 1, 10, 3, 5, 256); /* At this point, Alice's transaction is indefinitely @@ -1289,12 +1290,12 @@ class TxQ_test : public beast::unit_test::suite env(noop(alice), seq(aliceSeq + 2), queued); // Can't replace the first tx with a blocker - env(fset(alice, asfAccountTxnID), fee(20), ter(telINSUF_FEE_P)); + env(fset(alice, asfAccountTxnID), fee(20), ter(telCAN_NOT_QUEUE_BLOCKS)); // Can't replace the second / middle tx with a blocker env(regkey(alice, bob), seq(aliceSeq + 1), fee(20), - ter(telCAN_NOT_QUEUE)); + ter(telCAN_NOT_QUEUE_BLOCKS)); env(signers(alice, 2, { {bob}, {charlie}, {daria} }), fee(20), - seq(aliceSeq + 1), ter(telCAN_NOT_QUEUE)); + seq(aliceSeq + 1), ter(telCAN_NOT_QUEUE_BLOCKS)); // CAN replace the last tx with a blocker env(signers(alice, 2, { { bob },{ charlie },{ daria } }), fee(20), seq(aliceSeq + 2), queued); @@ -1302,7 +1303,7 @@ class TxQ_test : public beast::unit_test::suite queued); // Can't queue up any more transactions after the blocker - env(noop(alice), seq(aliceSeq + 3), ter(telCAN_NOT_QUEUE)); + env(noop(alice), seq(aliceSeq + 3), ter(telCAN_NOT_QUEUE_BLOCKED)); // Other accounts are not affected env(noop(bob), queued); @@ -2109,7 +2110,7 @@ class TxQ_test : public beast::unit_test::suite } } - envs(noop(alice), fee(none), seq(none), ter(telCAN_NOT_QUEUE))(submitParams); + envs(noop(alice), fee(none), seq(none), ter(telCAN_NOT_QUEUE_BLOCKED))(submitParams); checkMetrics(env, 5, 6, 4, 3, 256); {