diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 40b3311b57c..f1bf083ca7e 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -1042,9 +1042,7 @@ class ApplicationImp bool ApplicationImp::setup() { // VFALCO NOTE: 0 means use heuristics to determine the thread count. - m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone(), - config_->exists (SECTION_VALIDATOR_TOKEN) || - config_->exists (SECTION_VALIDATION_SEED)); + m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone()); // We want to intercept and wait for CTRL-C to terminate the process m_signals.add (SIGINT); diff --git a/src/ripple/app/misc/HashRouter.cpp b/src/ripple/app/misc/HashRouter.cpp index 8e407126058..42a846a416e 100644 --- a/src/ripple/app/misc/HashRouter.cpp +++ b/src/ripple/app/misc/HashRouter.cpp @@ -71,6 +71,18 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int& return result.second; } +bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags, + Stopwatch::time_point now, std::chrono::seconds interval) +{ + std::lock_guard lock (mutex_); + + auto result = emplace(key); + auto& s = result.first; + s.addPeer (peer); + flags = s.getFlags (); + return s.shouldProcess (now, interval); +} + int HashRouter::getFlags (uint256 const& key) { std::lock_guard lock (mutex_); diff --git a/src/ripple/app/misc/HashRouter.h b/src/ripple/app/misc/HashRouter.h index a40d3558bf9..aa27b255ba5 100644 --- a/src/ripple/app/misc/HashRouter.h +++ b/src/ripple/app/misc/HashRouter.h @@ -118,12 +118,21 @@ class HashRouter return ++recoveries_ % limit != 0; } + bool shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval) + { + if (processed_ && ((*processed_ + interval) > now)) + return false; + processed_.emplace (now); + return true; + } + private: int flags_ = 0; std::set peers_; // This could be generalized to a map, if more // than one flag needs to expire independently. boost::optional relayed_; + boost::optional processed_; std::uint32_t recoveries_ = 0; }; @@ -161,6 +170,10 @@ class HashRouter bool addSuppressionPeer (uint256 const& key, PeerShortID peer, int& flags); + // Add a peer suppression and return whether the entry should be processed + bool shouldProcess (uint256 const& key, PeerShortID peer, + int& flags, Stopwatch::time_point now, std::chrono::seconds interval); + /** Set the flags on a hash. @return `true` if the flags were changed. `false` if unchanged. diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index b5c9228e898..303a5542b9f 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -180,8 +180,7 @@ class JobQueue /** Set the number of thread serving the job queue to precisely this number. */ - void setThreadCount (int c, bool const standaloneMode, - bool const validator=true); + void setThreadCount (int c, bool const standaloneMode); /** Return a scoped LoadEvent. */ diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 414624bbc99..1ff74e34288 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -153,8 +153,7 @@ JobQueue::getJobCountGE (JobType t) const } void -JobQueue::setThreadCount (int c, bool const standaloneMode, - bool const validator) +JobQueue::setThreadCount (int c, bool const standaloneMode) { if (standaloneMode) { @@ -163,13 +162,9 @@ JobQueue::setThreadCount (int c, bool const standaloneMode, else if (c == 0) { c = static_cast(std::thread::hardware_concurrency()); - if (validator) - c = 2 + std::min(c, 4); // I/O will bottleneck - else - c *= 2; // Tested to improve stability under high RPC load. + c = 2 + std::min(c, 4); // I/O will bottleneck JLOG (m_journal.info()) << "Auto-tuning to " << c << - " validation/transaction/proposal threads for " << - (validator ? "" : "non-") << "validator."; + " validation/transaction/proposal threads."; } else { diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index e05736309cb..0cdc63465b4 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1055,8 +1055,9 @@ PeerImp::onMessage (std::shared_ptr const& m) int flags; - if (! app_.getHashRouter ().addSuppressionPeer ( - txID, id_, flags)) + constexpr std::chrono::seconds tx_interval = 10s; + if (! app_.getHashRouter ().shouldProcess ( + txID, id_, flags, clock_type::now(), tx_interval)) { // we have seen this transaction recently if (flags & SF_BAD) @@ -1064,8 +1065,9 @@ PeerImp::onMessage (std::shared_ptr const& m) fee_ = Resource::feeInvalidSignature; JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; - return; } + + return; } JLOG(p_journal_.debug()) << "Got tx " << txID; @@ -1088,7 +1090,9 @@ PeerImp::onMessage (std::shared_ptr const& m) } } - if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100) + // The maximum number of transactions to have in the job queue. + constexpr int max_transactions = 250; + if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions) { JLOG(p_journal_.info()) << "Transaction queue is full"; } diff --git a/src/ripple/resource/impl/Fees.cpp b/src/ripple/resource/impl/Fees.cpp index b1d03526be8..0ca4e4b41df 100644 --- a/src/ripple/resource/impl/Fees.cpp +++ b/src/ripple/resource/impl/Fees.cpp @@ -23,31 +23,24 @@ namespace ripple { namespace Resource { -Charge const feeInvalidRequest ( 10, "malformed request" ); -Charge const feeRequestNoReply ( 1, "unsatisfiable request" ); -Charge const feeInvalidSignature ( 100, "invalid signature" ); -Charge const feeUnwantedData ( 15, "useless data" ); -Charge const feeBadData ( 20, "invalid data" ); - -Charge const feeInvalidRPC ( 10, "malformed RPC" ); -Charge const feeReferenceRPC ( 2, "reference RPC" ); -Charge const feeExceptionRPC ( 10, "exceptioned RPC" ); -Charge const feeLightRPC ( 5, "light RPC" ); // DAVID: Check the cost -Charge const feeLowBurdenRPC ( 20, "low RPC" ); -Charge const feeMediumBurdenRPC ( 40, "medium RPC" ); -Charge const feeHighBurdenRPC ( 300, "heavy RPC" ); - -Charge const feeLightPeer (1, "trivial peer request" ); -Charge const feeLowBurdenPeer (2, "simple peer request" ); -Charge const feeMediumBurdenPeer (50, "moderate peer request" ); -Charge const feeHighBurdenPeer (250, "heavy peer request" ); - -Charge const feeNewTrustedNote ( 10, "trusted note" ); -Charge const feeNewValidTx ( 10, "valid tx" ); -Charge const feeSatisfiedRequest ( 10, "needed data" ); - -Charge const feeWarning ( 200, "received warning" ); -Charge const feeDrop ( 300, "dropped" ); +Charge const feeInvalidRequest ( 100, "malformed request" ); +Charge const feeRequestNoReply ( 10, "unsatisfiable request" ); +Charge const feeInvalidSignature ( 1000, "invalid signature" ); +Charge const feeUnwantedData ( 150, "useless data" ); +Charge const feeBadData ( 200, "invalid data" ); + +Charge const feeInvalidRPC ( 100, "malformed RPC" ); +Charge const feeReferenceRPC ( 20, "reference RPC" ); +Charge const feeExceptionRPC ( 100, "exceptioned RPC" ); +Charge const feeMediumBurdenRPC ( 400, "medium RPC" ); +Charge const feeHighBurdenRPC ( 3000, "heavy RPC" ); + +Charge const feeLightPeer ( 1, "trivial peer request" ); +Charge const feeMediumBurdenPeer ( 250, "moderate peer request" ); +Charge const feeHighBurdenPeer ( 2000, "heavy peer request" ); + +Charge const feeWarning ( 2000, "received warning" ); +Charge const feeDrop ( 3000, "dropped" ); } } diff --git a/src/ripple/resource/impl/Tuning.h b/src/ripple/resource/impl/Tuning.h index 894bd8d09fd..0752b8ad158 100644 --- a/src/ripple/resource/impl/Tuning.h +++ b/src/ripple/resource/impl/Tuning.h @@ -29,17 +29,17 @@ namespace Resource { enum { // Balance at which a warning is issued - warningThreshold = 500 + warningThreshold = 5000 // Balance at which the consumer is disconnected - ,dropThreshold = 1500 + ,dropThreshold = 15000 // The number of seconds in the exponential decay window // (This should be a power of two) ,decayWindowSeconds = 32 // The minimum balance required in order to include a load source in gossip - ,minimumGossipBalance = 100 + ,minimumGossipBalance = 1000 }; // The number of seconds until an inactive table item is removed