Skip to content

Commit

Permalink
WIP: Factor out multivariate Json
Browse files Browse the repository at this point in the history
  • Loading branch information
Bronek committed Oct 5, 2023
1 parent 8a74181 commit dd9a258
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 91 deletions.
9 changes: 3 additions & 6 deletions src/ripple/app/ledger/BookListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ BookListeners::removeSubscriber(std::uint64_t seq)

void
BookListeners::publish(
Json::Value const& jvObj,
Json::Value const& jvObjApiVer2,
MultiApiJson const& jvObj,
hash_set<std::uint64_t>& havePublished)
{
std::lock_guard sl(mLock);
Expand All @@ -55,10 +54,8 @@ BookListeners::publish(
// Only publish jvObj if this is the first occurence
if (havePublished.emplace(p->getSeq()).second)
{
if (p->getApiVersion() == 1)
p->send(jvObj, true);
else
p->send(jvObjApiVer2, true);
p->send(
jvObj.select(apiVersionSelector(p->getApiVersion())), true);
}
++it;
}
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/app/ledger/BookListeners.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#ifndef RIPPLE_APP_LEDGER_BOOKLISTENERS_H_INCLUDED
#define RIPPLE_APP_LEDGER_BOOKLISTENERS_H_INCLUDED

#include <ripple/basics/MultivarJson.h>
#include <ripple/net/InfoSub.h>

#include <memory>
#include <mutex>

Expand Down Expand Up @@ -58,7 +60,7 @@ class BookListeners
*/
void
publish(Json::Value const& jvObj, Json::Value const& jvObjApiVer2, hash_set<std::uint64_t>& havePublished);
publish(MultiApiJson const& jvObj, hash_set<std::uint64_t>& havePublished);

private:
std::recursive_mutex mLock;
Expand Down
5 changes: 2 additions & 3 deletions src/ripple/app/ledger/OrderBookDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ void
OrderBookDB::processTxn(
std::shared_ptr<ReadView const> const& ledger,
const AcceptedLedgerTx& alTx,
Json::Value const& jvObj,
Json::Value const& jvObjApiVer2)
MultiApiJson const& jvObj)
{
std::lock_guard sl(mLock);

Expand All @@ -278,7 +277,7 @@ OrderBookDB::processTxn(
{data->getFieldAmount(sfTakerGets).issue(),
data->getFieldAmount(sfTakerPays).issue()});
if (listeners)
listeners->publish(jvObj, jvObjApiVer2, havePublished);
listeners->publish(jvObj, havePublished);
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/ripple/app/ledger/OrderBookDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <ripple/app/ledger/AcceptedLedgerTx.h>
#include <ripple/app/ledger/BookListeners.h>
#include <ripple/app/main/Application.h>
#include <ripple/basics/MultivarJson.h>

#include <mutex>

namespace ripple {
Expand Down Expand Up @@ -63,8 +65,7 @@ class OrderBookDB
processTxn(
std::shared_ptr<ReadView const> const& ledger,
const AcceptedLedgerTx& alTx,
Json::Value const& jvObj,
Json::Value const& jvObjApiVer2);
MultiApiJson const& jvObj);

private:
Application& app_;
Expand Down
130 changes: 51 additions & 79 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/MultivarJson.h>
#include <ripple/basics/PerfLog.h>
#include <ripple/basics/SubmitSync.h>
#include <ripple/basics/UptimeClock.h>
Expand Down Expand Up @@ -3148,22 +3149,20 @@ NetworkOPsImp::pubValidatedTransaction(
{
auto const& stTxn = transaction.getTxn();

Json::Value jvObj =
transJson(*stTxn, transaction.getResult(), true, ledger);
Json::Value jvObjApi2 =
Json::Value jvSeed =
transJson(*stTxn, transaction.getResult(), true, ledger);

{
auto const& meta = transaction.getMeta();
jvObj[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, stTxn, meta);
RPC::insertDeliverMax(jvObj[jss::meta], stTxn->getTxnType(), 1);

jvObjApi2[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvObjApi2[jss::meta], *ledger, stTxn, meta);
RPC::insertDeliverMax(jvObjApi2[jss::meta], stTxn->getTxnType(), 2);
jvSeed[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvSeed[jss::meta], *ledger, stTxn, meta);
}

// Create two different Json objects, for different API versions
MultiApiJson jvObj = MultiApiJson::fill(jvSeed);
RPC::insertDeliverMax(jvObj[0][jss::meta], stTxn->getTxnType(), 1);
RPC::insertDeliverMax(jvObj[1][jss::meta], stTxn->getTxnType(), 2);

{
std::lock_guard sl(mSubLock);

Expand All @@ -3174,10 +3173,8 @@ NetworkOPsImp::pubValidatedTransaction(

if (p)
{
if (p->getApiVersion() == 1)
p->send(jvObj, true);
else
p->send(jvObjApi2, true);
p->send(
jvObj.select(apiVersionSelector(p->getApiVersion())), true);
++it;
}
else
Expand All @@ -3192,10 +3189,8 @@ NetworkOPsImp::pubValidatedTransaction(

if (p)
{
if (p->getApiVersion() == 1)
p->send(jvObj, true);
else
p->send(jvObjApi2, true);
p->send(
jvObj.select(apiVersionSelector(p->getApiVersion())), true);
++it;
}
else
Expand All @@ -3204,7 +3199,7 @@ NetworkOPsImp::pubValidatedTransaction(
}

if (transaction.getResult() == tesSUCCESS)
app_.getOrderBookDB().processTxn(ledger, transaction, jvObj, jvObjApi2);
app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);

pubAccountTransaction(ledger, transaction, last);
}
Expand Down Expand Up @@ -3308,55 +3303,44 @@ NetworkOPsImp::pubAccountTransaction(
{
auto const& stTxn = transaction.getTxn();

Json::Value jvObj =
transJson(*stTxn, transaction.getResult(), true, ledger);
Json::Value jvObjApi2 =
Json::Value jvSeed =
transJson(*stTxn, transaction.getResult(), true, ledger);

{
auto const& meta = transaction.getMeta();

jvObj[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, stTxn, meta);
RPC::insertDeliverMax(jvObj[jss::meta], stTxn->getTxnType(), 1);

jvObjApi2[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(
jvObjApi2[jss::meta], *ledger, stTxn, meta);
RPC::insertDeliverMax(jvObjApi2[jss::meta], stTxn->getTxnType(), 2);
jvSeed[jss::meta] = meta.getJson(JsonOptions::none);
RPC::insertDeliveredAmount(jvSeed[jss::meta], *ledger, stTxn, meta);
}

// Create two different Json objects, for different API versions
MultiApiJson jvObj = MultiApiJson::fill(jvSeed);
RPC::insertDeliverMax(jvObj[0][jss::meta], stTxn->getTxnType(), 1);
RPC::insertDeliverMax(jvObj[1][jss::meta], stTxn->getTxnType(), 2);

for (InfoSub::ref isrListener : notify)
{
if (isrListener->getApiVersion() == 1)
isrListener->send(jvObj, true);
else
isrListener->send(jvObjApi2, true);
isrListener->send(
jvObj.select(apiVersionSelector(isrListener->getApiVersion())),
true);
}

if (last)
{
jvObj[jss::account_history_boundary] = true;
jvObjApi2[jss::account_history_boundary] = true;
}
jvObj.set(jss::account_history_boundary, true);

assert(!jvObj.isMember(jss::account_history_tx_stream));
assert(!jvObj[0].isMember(jss::account_history_tx_stream));
for (auto& info : accountHistoryNotify)
{
auto& index = info.index_;
if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
{
jvObj[jss::account_history_tx_first] = true;
jvObjApi2[jss::account_history_tx_first] = true;
}
jvObj[jss::account_history_tx_index] = index->forwardTxIndex_;
jvObjApi2[jss::account_history_tx_index] = index->forwardTxIndex_;
jvObj.set(jss::account_history_tx_first, true);

jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_);
index->forwardTxIndex_ += 1;

if (info.sink_->getApiVersion() == 1)
info.sink_->send(jvObj, true);
else
info.sink_->send(jvObjApi2, true);
info.sink_->send(
jvObj.select(apiVersionSelector(info.sink_->getApiVersion())),
true);
}
}
}
Expand Down Expand Up @@ -3624,15 +3608,13 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
return false;
};

auto send2 = [&](Json::Value const& jvObj,
Json::Value const& jvObjApi2,
bool unsubscribe) -> bool {
auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
bool unsubscribe) -> bool {
if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
{
if (sptr->getApiVersion() == 1)
sptr->send(jvObj, true);
else
sptr->send(jvObjApi2, true);
sptr->send(
jvObj.select(apiVersionSelector(sptr->getApiVersion())),
true);

if (unsubscribe)
unsubAccountHistory(sptr, accountId, false);
Expand Down Expand Up @@ -3801,40 +3783,30 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
return;
}

Json::Value jvTx = transJson(
Json::Value jvSeed = transJson(
*stTxn, meta->getResultTER(), true, curTxLedger);
jvTx[jss::meta] = meta->getJson(JsonOptions::none);
jvTx[jss::account_history_tx_index] = txHistoryIndex;

Json::Value jvTxApi2 = transJson(
*stTxn, meta->getResultTER(), true, curTxLedger);
jvTxApi2[jss::meta] = meta->getJson(JsonOptions::none);
jvTxApi2[jss::account_history_tx_index] =
txHistoryIndex;
jvSeed[jss::meta] = meta->getJson(JsonOptions::none);
jvSeed[jss::account_history_tx_index] = txHistoryIndex;
txHistoryIndex -= 1;

if (i + 1 == num_txns ||
txns[i + 1].first->getLedger() != tx->getLedger())
{
jvTx[jss::account_history_boundary] = true;
jvTxApi2[jss::account_history_boundary] = true;
}
jvSeed[jss::account_history_boundary] = true;

RPC::insertDeliveredAmount(
jvTx[jss::meta], *curTxLedger, stTxn, *meta);
RPC::insertDeliverMax(
jvTx[jss::meta], stTxn->getTxnType(), 1);
jvSeed[jss::meta], *curTxLedger, stTxn, *meta);

RPC::insertDeliveredAmount(
jvTxApi2[jss::meta], *curTxLedger, stTxn, *meta);
// Create two Json objects, for different API versions
MultiApiJson jvTx = MultiApiJson::fill(jvSeed);
RPC::insertDeliverMax(
jvTx.val[0][jss::meta], stTxn->getTxnType(), 1);
RPC::insertDeliverMax(
jvTxApi2[jss::meta], stTxn->getTxnType(), 2);
jvTx.val[1][jss::meta], stTxn->getTxnType(), 2);

if (isFirstTx(tx, meta))
{
jvTx[jss::account_history_tx_first] = true;
jvTxApi2[jss::account_history_tx_first] = true;
send2(jvTx, jvTxApi2, false);
jvTx.set(jss::account_history_tx_first, true);
sendMultiApiJson(jvTx, false);

JLOG(m_journal.trace())
<< "AccountHistory job for account "
Expand All @@ -3844,7 +3816,7 @@ NetworkOPsImp::addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
}
else
{
send2(jvTx, jvTxApi2, false);
sendMultiApiJson(jvTx, false);
}
}

Expand Down
Loading

0 comments on commit dd9a258

Please sign in to comment.