Skip to content

Commit

Permalink
Control transaction dispatch rate:
Browse files Browse the repository at this point in the history
Do not process a transaction received from a peer if it has
been processed within the past ten seconds.

Increase the number of transaction handlers that can be in
flight in the job queue and decrease the relative cost for
peers to share transaction and ledger data.

Additionally, make better use of resources by adjusting the
number of threads we initialize, by reverting commit
68b8ffd.

Performance counter modifications:
  * Create and display counters to track:
    1) Pending transaction limit overruns.
    2) Total peer disconnections.
    3) Peers disconnections due to resource consumption.

Avoid a potential double-free in Json library.
  • Loading branch information
mtrippled committed Jan 13, 2018
1 parent 090d813 commit 77c5f7a
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 48 deletions.
4 changes: 1 addition & 3 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,7 @@ class ApplicationImp
bool ApplicationImp::setup()
{
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone(),
config_->exists (SECTION_VALIDATOR_TOKEN) ||
config_->exists (SECTION_VALIDATION_SEED));
m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone());

// We want to intercept and wait for CTRL-C to terminate the process
m_signals.add (SIGINT);
Expand Down
12 changes: 12 additions & 0 deletions src/ripple/app/misc/HashRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
return result.second;
}

bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, std::chrono::seconds tx_interval)
{
std::lock_guard <std::mutex> lock (mutex_);

auto result = emplace(key);
auto& s = result.first;
s.addPeer (peer);
flags = s.getFlags ();
return s.shouldProcess (std::chrono::steady_clock::now(), tx_interval);
}

int HashRouter::getFlags (uint256 const& key)
{
std::lock_guard <std::mutex> lock (mutex_);
Expand Down
13 changes: 13 additions & 0 deletions src/ripple/app/misc/HashRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,21 @@ class HashRouter
return ++recoveries_ % limit != 0;
}

bool shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval)
{
if (processed_ && ((*processed_ + interval) > now))
return false;
processed_.emplace (now);
return true;
}

private:
int flags_ = 0;
std::set <PeerShortID> peers_;
// This could be generalized to a map, if more
// than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_;
boost::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0;
};

Expand Down Expand Up @@ -161,6 +170,10 @@ class HashRouter
bool addSuppressionPeer (uint256 const& key, PeerShortID peer,
int& flags);

// Add a peer suppression and return whether the entry should be processed
bool shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
std::chrono::seconds tx_interval);

/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
Expand Down
8 changes: 7 additions & 1 deletion src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,12 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)

info[jss::state_accounting] = accounting_.json();
info[jss::uptime] = UptimeTimer::getInstance ().getElapsedSeconds ();
info[jss::jq_trans_overflow] = std::to_string(
app_.overlay().getJqTransOverflow());
info[jss::peer_disconnects] = std::to_string(
app_.overlay().getPeerDisconnect());
info[jss::peer_disconnects_resources] = std::to_string(
app_.overlay().getPeerDisconnectCharges());

return info;
}
Expand Down Expand Up @@ -3365,7 +3371,7 @@ Json::Value NetworkOPsImp::StateAccounting::json() const
ret[states_[i]] = Json::objectValue;
auto& state = ret[states_[i]];
state[jss::transitions] = counters[i].transitions;
state[jss::duration_us] = std::to_string (counters[i].dur.count());
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}

return ret;
Expand Down
3 changes: 1 addition & 2 deletions src/ripple/core/JobQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ class JobQueue

/** Set the number of thread serving the job queue to precisely this number.
*/
void setThreadCount (int c, bool const standaloneMode,
bool const validator=true);
void setThreadCount (int c, bool const standaloneMode);

/** Return a scoped LoadEvent.
*/
Expand Down
11 changes: 3 additions & 8 deletions src/ripple/core/impl/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ JobQueue::getJobCountGE (JobType t) const
}

void
JobQueue::setThreadCount (int c, bool const standaloneMode,
bool const validator)
JobQueue::setThreadCount (int c, bool const standaloneMode)
{
if (standaloneMode)
{
Expand All @@ -163,13 +162,9 @@ JobQueue::setThreadCount (int c, bool const standaloneMode,
else if (c == 0)
{
c = static_cast<int>(std::thread::hardware_concurrency());
if (validator)
c = 2 + std::min(c, 4); // I/O will bottleneck
else
c *= 2; // Tested to improve stability under high RPC load.
c = 2 + std::min(c, 4); // I/O will bottleneck
JLOG (m_journal.info()) << "Auto-tuning to " << c <<
" validation/transaction/proposal threads for " <<
(validator ? "" : "non-") << "validator.";
" validation/transaction/proposal threads.";
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/json/impl/json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ Value::~Value ()

case arrayValue:
case objectValue:
delete value_.map_;
if (value_.map_)
delete value_.map_;
break;

default:
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/json/json_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class Value
double real_;
bool bool_;
char* string_;
ObjectValues* map_;
ObjectValues* map_ {nullptr};
} value_;
ValueType type_ : 8;
int allocated_ : 1; // Notes: if declared as bool, bitfield is useless.
Expand Down
12 changes: 12 additions & 0 deletions src/ripple/overlay/Overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,18 @@ class Overlay
std::size_t
selectPeers (PeerSet& set, std::size_t limit, std::function<
bool(std::shared_ptr<Peer> const&)> score) = 0;

/** Increment and retrieve counter for transaction job queue overflows. */
virtual void incJqTransOverflow() = 0;
virtual std::uint64_t getJqTransOverflow() const = 0;

/** Increment and retrieve counters for total peer disconnects, and
* disconnects we initiate for excessive resource consumption.
*/
virtual void incPeerDisconnect() = 0;
virtual std::uint64_t getPeerDisconnect() const = 0;
virtual void incPeerDisconnectCharges() = 0;
virtual std::uint64_t getPeerDisconnectCharges() const = 0;
};

struct ScoreHasLedger
Expand Down
39 changes: 39 additions & 0 deletions src/ripple/overlay/impl/OverlayImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class OverlayImpl : public Overlay
Resolver& m_resolver;
std::atomic <Peer::id_t> next_id_;
int timer_count_;
std::atomic <uint64_t> jqTransOverflow_ {0};
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};

//--------------------------------------------------------------------------

Expand Down Expand Up @@ -301,6 +304,42 @@ class OverlayImpl : public Overlay
bool isInbound,
int bytes);

void
incJqTransOverflow() override
{
++jqTransOverflow_;
}

std::uint64_t
getJqTransOverflow() const override
{
return jqTransOverflow_;
}

void
incPeerDisconnect() override
{
++peerDisconnects_;
}

std::uint64_t
getPeerDisconnect() const override
{
return peerDisconnects_;
}

void
incPeerDisconnectCharges() override
{
++peerDisconnectsCharges_;
}

std::uint64_t
getPeerDisconnectCharges() const override
{
return peerDisconnectsCharges_;
};

private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,
Expand Down
15 changes: 11 additions & 4 deletions src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ PeerImp::charge (Resource::Charge const& fee)
usage_.disconnect() && strand_.running_in_this_thread())
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
}
}
Expand Down Expand Up @@ -414,6 +415,7 @@ PeerImp::close()
error_code ec;
timer_.cancel(ec);
socket_.close(ec);
overlay_.incPeerDisconnect();
if(m_inbound)
{
JLOG(journal_.debug()) << "Closed";
Expand Down Expand Up @@ -1054,18 +1056,20 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
uint256 txID = stx->getTransactionID ();

int flags;
constexpr std::chrono::seconds tx_interval = 10s;

if (! app_.getHashRouter ().addSuppressionPeer (
txID, id_, flags))
if (! app_.getHashRouter ().shouldProcess (txID, id_, flags,
tx_interval))
{
// we have seen this transaction recently
if (flags & SF_BAD)
{
fee_ = Resource::feeInvalidSignature;
JLOG(p_journal_.debug()) << "Ignoring known bad tx " <<
txID;
return;
}

return;
}

JLOG(p_journal_.debug()) << "Got tx " << txID;
Expand All @@ -1088,8 +1092,11 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
}
}

if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100)
// The maximum number of transactions to have in the job queue.
constexpr int max_transactions = 250;
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions)
{
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
}
else if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/protocol/JsonFields.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe,
// Unsubscribe, BookOffers
// out: paths/Node, STPathSet, STAmount
JSS ( jsonrpc ); // json version
JSS ( jq_trans_overflow ); // JobQueue transaction limit overflow.
JSS ( key ); // out: WalletSeed
JSS ( key_type ); // in/out: WalletPropose, TransactionSign
JSS ( latency ); // out: PeerImp
Expand Down Expand Up @@ -326,6 +327,9 @@ JSS ( peer ); // in: AccountLines
JSS ( peer_authorized ); // out: AccountLines
JSS ( peer_id ); // out: RCLCxPeerPos
JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay
JSS ( peer_disconnects ); // Severed peer connection counter.
JSS ( peer_disconnects_resources ); // Severed peer connections because of
// excess resource consumption.
JSS ( port ); // in: Connect
JSS ( previous_ledger ); // out: LedgerPropose
JSS ( proof ); // in: BookOffers
Expand Down
43 changes: 18 additions & 25 deletions src/ripple/resource/impl/Fees.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,24 @@
namespace ripple {
namespace Resource {

Charge const feeInvalidRequest ( 10, "malformed request" );
Charge const feeRequestNoReply ( 1, "unsatisfiable request" );
Charge const feeInvalidSignature ( 100, "invalid signature" );
Charge const feeUnwantedData ( 15, "useless data" );
Charge const feeBadData ( 20, "invalid data" );

Charge const feeInvalidRPC ( 10, "malformed RPC" );
Charge const feeReferenceRPC ( 2, "reference RPC" );
Charge const feeExceptionRPC ( 10, "exceptioned RPC" );
Charge const feeLightRPC ( 5, "light RPC" ); // DAVID: Check the cost
Charge const feeLowBurdenRPC ( 20, "low RPC" );
Charge const feeMediumBurdenRPC ( 40, "medium RPC" );
Charge const feeHighBurdenRPC ( 300, "heavy RPC" );

Charge const feeLightPeer (1, "trivial peer request" );
Charge const feeLowBurdenPeer (2, "simple peer request" );
Charge const feeMediumBurdenPeer (50, "moderate peer request" );
Charge const feeHighBurdenPeer (250, "heavy peer request" );

Charge const feeNewTrustedNote ( 10, "trusted note" );
Charge const feeNewValidTx ( 10, "valid tx" );
Charge const feeSatisfiedRequest ( 10, "needed data" );

Charge const feeWarning ( 200, "received warning" );
Charge const feeDrop ( 300, "dropped" );
Charge const feeInvalidRequest ( 100, "malformed request" );
Charge const feeRequestNoReply ( 10, "unsatisfiable request" );
Charge const feeInvalidSignature ( 1000, "invalid signature" );
Charge const feeUnwantedData ( 150, "useless data" );
Charge const feeBadData ( 200, "invalid data" );

Charge const feeInvalidRPC ( 100, "malformed RPC" );
Charge const feeReferenceRPC ( 20, "reference RPC" );
Charge const feeExceptionRPC ( 100, "exceptioned RPC" );
Charge const feeMediumBurdenRPC ( 400, "medium RPC" );
Charge const feeHighBurdenRPC ( 3000, "heavy RPC" );

Charge const feeLightPeer ( 1, "trivial peer request" );
Charge const feeMediumBurdenPeer ( 250, "moderate peer request" );
Charge const feeHighBurdenPeer ( 2000, "heavy peer request" );

Charge const feeWarning ( 2000, "received warning" );
Charge const feeDrop ( 3000, "dropped" );

}
}
6 changes: 3 additions & 3 deletions src/ripple/resource/impl/Tuning.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ namespace Resource {
enum
{
// Balance at which a warning is issued
warningThreshold = 500
warningThreshold = 5000

// Balance at which the consumer is disconnected
,dropThreshold = 1500
,dropThreshold = 15000

// The number of seconds in the exponential decay window
// (This should be a power of two)
,decayWindowSeconds = 32

// The minimum balance required in order to include a load source in gossip
,minimumGossipBalance = 100
,minimumGossipBalance = 1000
};

// The number of seconds until an inactive table item is removed
Expand Down
18 changes: 18 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,23 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(!router.shouldRecover(key1));
}

void
testProcess()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s, 5);
uint256 const key(1);
HashRouter::PeerShortID peer = 1;
int flags;

BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
BEAST_EXPECT(! router.shouldProcess(key, peer, flags, 1s));
std::this_thread::sleep_for(2s);
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}


public:

void
Expand All @@ -275,6 +292,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testRecover();
testProcess();
}
};

Expand Down

0 comments on commit 77c5f7a

Please sign in to comment.