Skip to content

Commit

Permalink
* Decrease the relative cost for peers to share transaction and ledge…
Browse files Browse the repository at this point in the history
…r data.

* Increase number of transactions allowed in the job queue.

Control transaction process rate:

Do not process a transaction received from a peer
if it has been processed within the past ten seconds.

Set worker threads to be consistent for validators and non-validators.
Resource consumption is more efficient this way.
  • Loading branch information
mtrippled committed Dec 14, 2017
1 parent 090d813 commit c73fc8e
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 45 deletions.
4 changes: 1 addition & 3 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,7 @@ class ApplicationImp
bool ApplicationImp::setup()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone(),
config_->exists (SECTION_VALIDATOR_TOKEN) ||
config_->exists (SECTION_VALIDATION_SEED));
m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone());

// We want to intercept and wait for CTRL-C to terminate the process
m_signals.add (SIGINT);
Expand Down
12 changes: 12 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
return result.second;
}

bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
Stopwatch::time_point now, std::chrono::seconds interval)
{
std::lock_guard <std::mutex> lock (mutex_);

auto result = emplace(key);
auto& s = result.first;
s.addPeer (peer);
flags = s.getFlags ();
return s.shouldProcess (now, interval);
}

int HashRouter::getFlags (uint256 const& key)
{
std::lock_guard <std::mutex> lock (mutex_);
Expand Down
13 changes: 13 additions & 0 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,21 @@ class HashRouter
return ++recoveries_ % limit != 0;
}

bool shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval)
{
if (processed_ && ((*processed_ + interval) > now))
return false;
processed_.emplace (now);
return true;
}

private:
int flags_ = 0;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
boost::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0;
};

Expand Down Expand Up @@ -161,6 +170,10 @@ class HashRouter
bool addSuppressionPeer (uint256 const& key, PeerShortID peer,
int& flags);

// Add a peer suppression and return whether the entry should be processed
bool shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, Stopwatch::time_point now, std::chrono::seconds interval);

/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
Expand Down
3 changes: 1 addition & 2 deletions src/ripple/core/JobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ class JobQueue

/** Set the number of thread serving the job queue to precisely this number.
*/
void setThreadCount (int c, bool const standaloneMode,
bool const validator=true);
void setThreadCount (int c, bool const standaloneMode);

/** Return a scoped LoadEvent.
*/
Expand Down
11 changes: 3 additions & 8 deletions src/ripple/core/impl/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ JobQueue::getJobCountGE (JobType t) const
}

void
JobQueue::setThreadCount (int c, bool const standaloneMode,
bool const validator)
JobQueue::setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
Expand All @@ -163,13 +162,9 @@ JobQueue::setThreadCount (int c, bool const standaloneMode,
else if (c == 0)
{
c = static_cast<int>(std::thread::hardware_concurrency());
if (validator)
c = 2 + std::min(c, 4); // I/O will bottleneck
else
c *= 2; // Tested to improve stability under high RPC load.
c = 2 + std::min(c, 4); // I/O will bottleneck
JLOG (m_journal.info()) << "Auto-tuning to " << c <<
" validation/transaction/proposal threads for " <<
(validator ? "" : "non-") << "validator.";
" validation/transaction/proposal threads.";
}
else
{
Expand Down
12 changes: 8 additions & 4 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,17 +1055,19 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)

int flags;

if (! app_.getHashRouter ().addSuppressionPeer (
txID, id_, flags))
constexpr std::chrono::seconds tx_interval = 10s;
if (! app_.getHashRouter ().shouldProcess (
txID, id_, flags, clock_type::now(), tx_interval))
{
// we have seen this transaction recently
if (flags & SF_BAD)
{
fee_ = Resource::feeInvalidSignature;
JLOG(p_journal_.debug()) << "Ignoring known bad tx " <<
txID;
return;
}

return;
}

JLOG(p_journal_.debug()) << "Got tx " << txID;
Expand All @@ -1088,7 +1090,9 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
}
}

if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100)
// The maximum number of transactions to have in the job queue.
constexpr int max_transactions = 250;
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions)
{
JLOG(p_journal_.info()) << "Transaction queue is full";
}
Expand Down
43 changes: 18 additions & 25 deletions src/ripple/resource/impl/Fees.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,24 @@
namespace ripple {
namespace Resource {

Charge const feeInvalidRequest ( 10, "malformed request" );
Charge const feeRequestNoReply ( 1, "unsatisfiable request" );
Charge const feeInvalidSignature ( 100, "invalid signature" );
Charge const feeUnwantedData ( 15, "useless data" );
Charge const feeBadData ( 20, "invalid data" );

Charge const feeInvalidRPC ( 10, "malformed RPC" );
Charge const feeReferenceRPC ( 2, "reference RPC" );
Charge const feeExceptionRPC ( 10, "exceptioned RPC" );
Charge const feeLightRPC ( 5, "light RPC" ); // DAVID: Check the cost
Charge const feeLowBurdenRPC ( 20, "low RPC" );
Charge const feeMediumBurdenRPC ( 40, "medium RPC" );
Charge const feeHighBurdenRPC ( 300, "heavy RPC" );

Charge const feeLightPeer (1, "trivial peer request" );
Charge const feeLowBurdenPeer (2, "simple peer request" );
Charge const feeMediumBurdenPeer (50, "moderate peer request" );
Charge const feeHighBurdenPeer (250, "heavy peer request" );

Charge const feeNewTrustedNote ( 10, "trusted note" );
Charge const feeNewValidTx ( 10, "valid tx" );
Charge const feeSatisfiedRequest ( 10, "needed data" );

Charge const feeWarning ( 200, "received warning" );
Charge const feeDrop ( 300, "dropped" );
Charge const feeInvalidRequest ( 100, "malformed request" );
Charge const feeRequestNoReply ( 10, "unsatisfiable request" );
Charge const feeInvalidSignature ( 1000, "invalid signature" );
Charge const feeUnwantedData ( 150, "useless data" );
Charge const feeBadData ( 200, "invalid data" );

Charge const feeInvalidRPC ( 100, "malformed RPC" );
Charge const feeReferenceRPC ( 20, "reference RPC" );
Charge const feeExceptionRPC ( 100, "exceptioned RPC" );
Charge const feeMediumBurdenRPC ( 400, "medium RPC" );
Charge const feeHighBurdenRPC ( 3000, "heavy RPC" );

Charge const feeLightPeer ( 1, "trivial peer request" );
Charge const feeMediumBurdenPeer ( 250, "moderate peer request" );
Charge const feeHighBurdenPeer ( 2000, "heavy peer request" );

Charge const feeWarning ( 2000, "received warning" );
Charge const feeDrop ( 3000, "dropped" );

}
}
6 changes: 3 additions & 3 deletions src/ripple/resource/impl/Tuning.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ namespace Resource {
enum
{
// Balance at which a warning is issued
warningThreshold = 500
warningThreshold = 5000

// Balance at which the consumer is disconnected
,dropThreshold = 1500
,dropThreshold = 15000

// The number of seconds in the exponential decay window
// (This should be a power of two)
,decayWindowSeconds = 32

// The minimum balance required in order to include a load source in gossip
,minimumGossipBalance = 100
,minimumGossipBalance = 1000
};

// The number of seconds until an inactive table item is removed
Expand Down

0 comments on commit c73fc8e

Please sign in to comment.