From f2d4241392d556552fdd3c38574322c672596a0b Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 18 Aug 2021 18:55:20 -0500 Subject: [PATCH 1/3] Forward validations and manifests in reporting mode --- src/ripple/app/misc/NetworkOPs.cpp | 44 +++++++++++++++++++ src/ripple/app/misc/NetworkOPs.h | 4 ++ src/ripple/app/reporting/ETLSource.cpp | 24 +++++++++- src/ripple/app/reporting/ETLSource.h | 2 +- .../nodestore/backend/CassandraFactory.cpp | 4 +- src/ripple/rpc/handlers/Subscribe.cpp | 4 -- 6 files changed, 74 insertions(+), 8 deletions(-) diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index cae1b6163f4..11365372b38 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -449,6 +449,10 @@ class NetworkOPsImp final : public NetworkOPs void pubValidation(std::shared_ptr const& val) override; + void + forwardValidation(Json::Value const& jvObj) override; + void + forwardManifest(Json::Value const& jvObj) override; void forwardProposedTransaction(Json::Value const& jvObj) override; void @@ -2590,6 +2594,46 @@ NetworkOPsImp::forwardProposedTransaction(Json::Value const& jvObj) forwardProposedAccountTransaction(jvObj); } +void +NetworkOPsImp::forwardValidation(Json::Value const& jvObj) +{ + std::lock_guard sl(mSubLock); + + for (auto i = mStreamMaps[sValidations].begin(); + i != mStreamMaps[sValidations].end();) + { + if (auto p = i->second.lock()) + { + p->send(jvObj, true); + ++i; + } + else + { + i = mStreamMaps[sValidations].erase(i); + } + } +} + +void +NetworkOPsImp::forwardManifest(Json::Value const& jvObj) +{ + std::lock_guard sl(mSubLock); + + for (auto i = mStreamMaps[sManifests].begin(); + i != mStreamMaps[sManifests].end();) + { + if (auto p = i->second.lock()) + { + p->send(jvObj, true); + ++i; + } + else + { + i = mStreamMaps[sManifests].erase(i); + } + } +} + static void getAccounts(Json::Value const& jvObj, std::vector& accounts) { diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 444da2cc8c4..5e00c4cc6ae 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -261,6 +261,10 @@ class NetworkOPs : public InfoSub::Source virtual void pubValidation(std::shared_ptr const& val) = 0; + virtual void + forwardValidation(Json::Value const& jvObj) = 0; + virtual void + forwardManifest(Json::Value const& jvObj) = 0; virtual void forwardProposedTransaction(Json::Value const& jvObj) = 0; virtual void diff --git a/src/ripple/app/reporting/ETLSource.cpp b/src/ripple/app/reporting/ETLSource.cpp index 5036ca9102b..76a1f432f44 100644 --- a/src/ripple/app/reporting/ETLSource.cpp +++ b/src/ripple/app/reporting/ETLSource.cpp @@ -261,6 +261,10 @@ ETLSource::onHandshake(boost::beast::error_code ec) jv["streams"].append(ledgerStream); Json::Value txnStream("transactions_proposed"); jv["streams"].append(txnStream); + Json::Value validationStream("validations"); + jv["streams"].append(validationStream); + Json::Value manifestStream("manifests"); + jv["streams"].append(manifestStream); Json::FastWriter fastWriter; JLOG(journal_.trace()) << "Sending subscribe stream message"; @@ -354,12 +358,30 @@ ETLSource::handleMessage() { if (response.isMember(jss::transaction)) { - if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) + if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) { etl_.getApplication().getOPs().forwardProposedTransaction( response); } } + else if ( + response.isMember("type") && + response["type"] == "validationReceived") + { + if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) + { + etl_.getApplication().getOPs().forwardValidation(response); + } + } + else if ( + response.isMember("type") && + response["type"] == "manifestReceived") + { + if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) + { + etl_.getApplication().getOPs().forwardManifest(response); + } + } else { JLOG(journal_.debug()) diff --git a/src/ripple/app/reporting/ETLSource.h b/src/ripple/app/reporting/ETLSource.h index b4bb5ccf921..16224b97c18 100644 --- a/src/ripple/app/reporting/ETLSource.h +++ b/src/ripple/app/reporting/ETLSource.h @@ -374,7 +374,7 @@ class ETLLoadBalancer /// @param in ETLSource in question /// @return true if messages should be forwarded bool - shouldPropagateTxnStream(ETLSource* in) const + shouldPropagateStream(ETLSource* in) const { for (auto& src : sources_) { diff --git a/src/ripple/nodestore/backend/CassandraFactory.cpp b/src/ripple/nodestore/backend/CassandraFactory.cpp index 26850e7a600..10282d94b70 100644 --- a/src/ripple/nodestore/backend/CassandraFactory.cpp +++ b/src/ripple/nodestore/backend/CassandraFactory.cpp @@ -370,7 +370,7 @@ class CassandraBackend : public Backend continue; } - query = {}; + query.str(""); query << "SELECT * FROM " << tableName << " LIMIT 1"; statement = makeStatement(query.str().c_str(), 0); fut = cass_session_execute(session_.get(), statement); @@ -433,7 +433,7 @@ class CassandraBackend : public Backend */ cass_future_free(prepare_future); - query = {}; + query.str(""); query << "SELECT object FROM " << tableName << " WHERE hash = ?"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index c11fa7e4db9..3cbea6ab6f0 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -138,8 +138,6 @@ doSubscribe(RPC::JsonContext& context) } else if (streamName == "manifests") { - if (context.app.config().reporting()) - return rpcError(rpcREPORTING_UNSUPPORTED); context.netOps.subManifests(ispSub); } else if (streamName == "transactions") @@ -154,8 +152,6 @@ doSubscribe(RPC::JsonContext& context) } else if (streamName == "validations") { - if (context.app.config().reporting()) - return rpcError(rpcREPORTING_UNSUPPORTED); context.netOps.subValidations(ispSub); } else if (streamName == "peer_status") From aa04be78cc2aadd88115c7946f37340a7803a0a0 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 8 Sep 2021 14:58:16 -0700 Subject: [PATCH 2/3] [FOLD] Address comments --- src/ripple/app/reporting/ETLSource.cpp | 27 ++++++++++++-------------- src/ripple/app/reporting/ETLSource.h | 6 +++--- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/ripple/app/reporting/ETLSource.cpp b/src/ripple/app/reporting/ETLSource.cpp index 76a1f432f44..5476c2a80b1 100644 --- a/src/ripple/app/reporting/ETLSource.cpp +++ b/src/ripple/app/reporting/ETLSource.cpp @@ -356,33 +356,30 @@ ETLSource::handleMessage() } else { - if (response.isMember(jss::transaction)) + if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) { - if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) + if (response.isMember(jss::transaction)) { etl_.getApplication().getOPs().forwardProposedTransaction( response); } - } - else if ( - response.isMember("type") && - response["type"] == "validationReceived") - { - if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) + else if ( + response.isMember("type") && + response["type"] == "validationReceived") { etl_.getApplication().getOPs().forwardValidation(response); } - } - else if ( - response.isMember("type") && - response["type"] == "manifestReceived") - { - if (etl_.getETLLoadBalancer().shouldPropagateStream(this)) + else if ( + response.isMember("type") && + response["type"] == "manifestReceived") { etl_.getApplication().getOPs().forwardManifest(response); } } - else + + if ( + response.isMember("type") && + response["type"] == "ledgerClosed") { JLOG(journal_.debug()) << __func__ << " : " diff --git a/src/ripple/app/reporting/ETLSource.h b/src/ripple/app/reporting/ETLSource.h index 16224b97c18..66cb61bcc7a 100644 --- a/src/ripple/app/reporting/ETLSource.h +++ b/src/ripple/app/reporting/ETLSource.h @@ -368,9 +368,9 @@ class ETLLoadBalancer /// Determine whether messages received on the transactions_proposed stream /// should be forwarded to subscribing clients. The server subscribes to - /// transactions_proposed on multiple ETLSources, yet only forwards messages - /// from one source at any given time (to avoid sending duplicate messages - /// to clients). + /// transactions_proposed, validations, and manifests on multiple + /// ETLSources, yet only forwards messages from one source at any given time + /// (to avoid sending duplicate messages to clients). /// @param in ETLSource in question /// @return true if messages should be forwarded bool From 2d1f6514aa616c1e1ee12b4717377d7bc3c29e8d Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Wed, 8 Sep 2021 15:01:36 -0700 Subject: [PATCH 3/3] [FOLD] run formatter --- src/ripple/app/reporting/ETLSource.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ripple/app/reporting/ETLSource.cpp b/src/ripple/app/reporting/ETLSource.cpp index 5476c2a80b1..de7e8d4797b 100644 --- a/src/ripple/app/reporting/ETLSource.cpp +++ b/src/ripple/app/reporting/ETLSource.cpp @@ -376,10 +376,8 @@ ETLSource::handleMessage() etl_.getApplication().getOPs().forwardManifest(response); } } - - if ( - response.isMember("type") && - response["type"] == "ledgerClosed") + + if (response.isMember("type") && response["type"] == "ledgerClosed") { JLOG(journal_.debug()) << __func__ << " : "