Skip to content

Commit

Permalink
Improve shard concurrency
Browse files Browse the repository at this point in the history
* Reduce lock scope on all public functions
* Use TaskQueue to process shard finalization in separate thread
* Store shard last ledger hash and other info in backend
* Use temp SQLite DB versus control file when acquiring
* Remove boost serialization from cmake files
  • Loading branch information
miguelportilla committed Mar 3, 2020
1 parent f76a5a3 commit efba284
Show file tree
Hide file tree
Showing 35 changed files with 1,712 additions and 1,148 deletions.
1 change: 0 additions & 1 deletion Builds/CMake/RippleConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ find_dependency (Boost 1.70
filesystem
program_options
regex
serialization
system
thread)
#[=========================================================[
Expand Down
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
subdir: overlay
Expand Down
2 changes: 0 additions & 2 deletions Builds/CMake/deps/Boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED
filesystem
program_options
regex
serialization
system
thread)

Expand All @@ -69,7 +68,6 @@ target_link_libraries (ripple_boost
Boost::filesystem
Boost::program_options
Boost::regex
Boost::serialization
Boost::system
Boost::thread)
if (Boost_COMPILER)
Expand Down
1 change: 0 additions & 1 deletion Builds/containers/shared/install_boost.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ else
BLDARGS+=(--with-filesystem)
BLDARGS+=(--with-program_options)
BLDARGS+=(--with-regex)
BLDARGS+=(--with-serialization)
BLDARGS+=(--with-system)
BLDARGS+=(--with-atomic)
BLDARGS+=(--with-thread)
Expand Down
63 changes: 32 additions & 31 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ class ApplicationImp
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
Expand Down Expand Up @@ -463,18 +463,18 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))

, family_ (*this, *m_nodeStore, *m_collectorManager)

// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)

, m_pathRequests (std::make_unique<PathRequests> (
Expand Down Expand Up @@ -558,14 +558,6 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -626,7 +618,7 @@ class ApplicationImp

Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}

TimeKeeper&
Expand Down Expand Up @@ -943,7 +935,7 @@ class ApplicationImp
}

bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
Expand All @@ -961,12 +953,12 @@ class ApplicationImp

JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";

using namespace std::chrono;
auto const start = steady_clock::now();

getNodeStore().import(*source);
m_nodeStore->import(*source);

auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
Expand All @@ -990,14 +982,6 @@ class ApplicationImp
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}

return true;
}

Expand Down Expand Up @@ -1252,8 +1236,8 @@ class ApplicationImp
// have listeners register for "onSweep ()" notification.

family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
Expand All @@ -1264,8 +1248,8 @@ class ApplicationImp
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();

// Set timer to do another sweep later.
Expand Down Expand Up @@ -1350,9 +1334,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;

if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);

using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (!shardStore_->init())
return false;
}

if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";
Expand Down
23 changes: 22 additions & 1 deletion src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,30 @@ std::array<char const*, 8> TxDBInit {{

////////////////////////////////////////////////////////////////////////////////

// Temporary database used with an incomplete shard that is being acquired
static constexpr auto AcquireDBName {"acquire.db"};

static constexpr
std::array<char const*, 3> AcquireDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

static constexpr
std::array<char const*, 1> AcquireDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
std::array<char const*, 2> CompltDBPragma {{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void printHelp (const po::options_description& desc)
" connect <ip> [<port>]\n"
" consensus_info\n"
" deposit_authorized <source_account> <destination_account> [<ledger>]\n"
" download_shard [[<index> <url>]] <validate>\n"
" download_shard [[<index> <url>]]\n"
" feature [<feature> [accept|reject]]\n"
" fetch_info [clear]\n"
" gateway_balances [<ledger>] <issuer_account> [ <hotwallet> [ <hotwallet> ]]\n"
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

Expand Down
Loading

0 comments on commit efba284

Please sign in to comment.