Skip to content

Commit

Permalink
Improve shard concurrency
Browse files Browse the repository at this point in the history
* Reduce lock scope on all public functions
* Use TaskQueue to process shard finalization in separate thread
* Store shard last ledger hash and other info in backend
* Use temp SQLite DB versus control file when acquiring
  • Loading branch information
miguelportilla committed Feb 6, 2020
1 parent cc4cefa commit 855f633
Show file tree
Hide file tree
Showing 28 changed files with 1,414 additions and 968 deletions.
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ else ()
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
nounity, main sources:
subdir: overlay
Expand Down
63 changes: 32 additions & 31 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ class ApplicationImp
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
Expand Down Expand Up @@ -463,18 +463,18 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))

, family_ (*this, *m_nodeStore, *m_collectorManager)

// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)

, m_pathRequests (std::make_unique<PathRequests> (
Expand Down Expand Up @@ -557,14 +557,6 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -625,7 +617,7 @@ class ApplicationImp

Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}

TimeKeeper&
Expand Down Expand Up @@ -941,7 +933,7 @@ class ApplicationImp
}

bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
Expand All @@ -959,12 +951,12 @@ class ApplicationImp

JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";

using namespace std::chrono;
auto const start = steady_clock::now();

getNodeStore().import(*source);
m_nodeStore->import(*source);

auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
Expand All @@ -988,14 +980,6 @@ class ApplicationImp
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}

return true;
}

Expand Down Expand Up @@ -1250,8 +1234,8 @@ class ApplicationImp
// have listeners register for "onSweep ()" notification.

family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
Expand All @@ -1262,8 +1246,8 @@ class ApplicationImp
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();

// Set timer to do another sweep later.
Expand Down Expand Up @@ -1348,9 +1332,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;

if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);

using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (!shardStore_->init())
return false;
}

if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";
Expand Down
23 changes: 22 additions & 1 deletion src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,30 @@ std::array<char const*, 8> TxDBInit {{

////////////////////////////////////////////////////////////////////////////////

// Temporary database used with an incomplete shard that is being acquired
static constexpr auto AcquireDBName {"acquire.db"};

static constexpr
std::array<char const*, 3> AcquireDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

static constexpr
std::array<char const*, 1> AcquireDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
std::array<char const*, 2> CompltDBPragma {{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

Expand Down
143 changes: 70 additions & 73 deletions src/ripple/basics/RangeSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#ifndef RIPPLE_BASICS_RANGESET_H_INCLUDED
#define RIPPLE_BASICS_RANGESET_H_INCLUDED

#include <string>
#include <boost/optional.hpp>
#include <ripple/beast/core/LexicalCast.h>

#include <boost/algorithm/string.hpp>
#include <boost/icl/closed_interval.hpp>
#include <boost/icl/interval_set.hpp>
#include <boost/serialization/split_free.hpp>
#include <boost/optional.hpp>

#include <string>

namespace ripple
{
Expand Down Expand Up @@ -86,8 +89,8 @@ std::string to_string(ClosedInterval<T> const & ci)

/** Convert the given RangeSet to a styled string.
The styled string represention is the set of disjoint intervals joined by
commas. The string "empty" is returned if the set is empty.
The styled string representation is the set of disjoint intervals joined
by commas. The string "empty" is returned if the set is empty.
@param rs The rangeset to convert
@return The styled string
Expand All @@ -109,6 +112,67 @@ std::string to_string(RangeSet<T> const & rs)
return res;
}

/** Convert the given styled string to a RangeSet.
The styled string representation is the set
of disjoint intervals joined by commas.
@param rs The set to be populated
@param s The styled string to convert
@return True on successfully converting styled string
*/
template <class T>
bool
from_string(RangeSet<T>& rs, std::string const& s)
{
std::vector<std::string> intervals;
std::vector<std::string> tokens;
bool result {true};

boost::split(tokens, s, boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
boost::split(intervals, t, boost::algorithm::is_any_of("-"));
switch (intervals.size())
{
case 1:
{
T front;
if (!beast::lexicalCastChecked(front, intervals.front()))
result = false;
else
rs.insert(front);
break;
}
case 2:
{
T front;
if (!beast::lexicalCastChecked(front, intervals.front()))
result = false;
else
{
T back;
if (!beast::lexicalCastChecked(back, intervals.back()))
result = false;
else
rs.insert(range(front, back));
}
break;
}
default:
result = false;
}

if (!result)
break;
intervals.clear();
}

if (!result)
rs.clear();
return result;
}

/** Find the largest value not in the set that is less than a given value.
@param rs The set of interest
Expand All @@ -129,75 +193,8 @@ prevMissing(RangeSet<T> const & rs, T t, T minVal = 0)
return boost::none;
return boost::icl::last(tgt);
}
} // namespace ripple


// The boost serialization documents recommended putting free-function helpers
// in the boost serialization namespace

namespace boost {
namespace serialization {
template <class Archive, class T>
void
save(Archive& ar,
ripple::ClosedInterval<T> const& ci,
const unsigned int version)
{
auto l = ci.lower();
auto u = ci.upper();
ar << l << u;
}

template <class Archive, class T>
void
load(Archive& ar, ripple::ClosedInterval<T>& ci, const unsigned int version)
{
T low, up;
ar >> low >> up;
ci = ripple::ClosedInterval<T>{low, up};
}

template <class Archive, class T>
void
serialize(Archive& ar,
ripple::ClosedInterval<T>& ci,
const unsigned int version)
{
split_free(ar, ci, version);
}

template <class Archive, class T>
void
save(Archive& ar, ripple::RangeSet<T> const& rs, const unsigned int version)
{
auto s = rs.iterative_size();
ar << s;
for (auto const& r : rs)
ar << r;
}

template <class Archive, class T>
void
load(Archive& ar, ripple::RangeSet<T>& rs, const unsigned int version)
{
rs.clear();
std::size_t intervals;
ar >> intervals;
for (std::size_t i = 0; i < intervals; ++i)
{
ripple::ClosedInterval<T> ci;
ar >> ci;
rs.insert(ci);
}
}
} // namespace ripple

template <class Archive, class T>
void
serialize(Archive& ar, ripple::RangeSet<T>& rs, const unsigned int version)
{
split_free(ar, rs, version);
}

} // serialization
} // boost
#endif
2 changes: 1 addition & 1 deletion src/ripple/core/impl/JobQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ JobQueue::JobQueue (beast::insight::Collector::ptr const& collector,
, m_lastJob (0)
, m_invalidJobData (JobTypes::instance().getInvalid (), collector, logs)
, m_processCount (0)
, m_workers (*this, perfLog, "JobQueue", 0)
, m_workers (*this, &perfLog, "JobQueue", 0)
, m_cancelCallback (std::bind (&Stoppable::isStopping, this))
, perfLog_ (perfLog)
, m_collector (collector)
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/core/impl/Workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace ripple {

Workers::Workers (
Callback& callback,
perf::PerfLog& perfLog,
perf::PerfLog* perfLog,
std::string const& threadNames,
int numberOfThreads)
: m_callback (callback)
Expand Down Expand Up @@ -63,7 +63,8 @@ void Workers::setNumberOfThreads (int numberOfThreads)
static int instance {0};
if (m_numberOfThreads != numberOfThreads)
{
perfLog_.resizeJobs(numberOfThreads);
if (perfLog_)
perfLog_->resizeJobs(numberOfThreads);

if (numberOfThreads > m_numberOfThreads)
{
Expand Down
Loading

0 comments on commit 855f633

Please sign in to comment.