Skip to content

Commit

Permalink
Improve shard concurrency
Browse files Browse the repository at this point in the history
* Reduce lock scope on all public functions
* Use TaskQueue to process shard finalization in separate thread
* Store shard last ledger hash and other info in backend
* Use temp SQLite DB versus control file when acquiring
* Remove boost serialization from cmake files
  • Loading branch information
miguelportilla committed Feb 7, 2020
1 parent cc4cefa commit 46d85c4
Show file tree
Hide file tree
Showing 31 changed files with 1,416 additions and 972 deletions.
1 change: 0 additions & 1 deletion Builds/CMake/RippleConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ find_dependency (Boost 1.70
filesystem
program_options
regex
serialization
system
thread)
#[=========================================================[
Expand Down
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ else ()
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
nounity, main sources:
subdir: overlay
Expand Down
2 changes: 0 additions & 2 deletions Builds/CMake/deps/Boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED
filesystem
program_options
regex
serialization
system
thread)

Expand All @@ -69,7 +68,6 @@ target_link_libraries (ripple_boost
Boost::filesystem
Boost::program_options
Boost::regex
Boost::serialization
Boost::system
Boost::thread)
if (Boost_COMPILER)
Expand Down
1 change: 0 additions & 1 deletion Builds/containers/shared/install_boost.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ else
BLDARGS+=(--with-filesystem)
BLDARGS+=(--with-program_options)
BLDARGS+=(--with-regex)
BLDARGS+=(--with-serialization)
BLDARGS+=(--with-system)
BLDARGS+=(--with-atomic)
BLDARGS+=(--with-thread)
Expand Down
63 changes: 32 additions & 31 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ class ApplicationImp
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
Expand Down Expand Up @@ -463,18 +463,18 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))

, family_ (*this, *m_nodeStore, *m_collectorManager)

// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)

, m_pathRequests (std::make_unique<PathRequests> (
Expand Down Expand Up @@ -557,14 +557,6 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -625,7 +617,7 @@ class ApplicationImp

Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}

TimeKeeper&
Expand Down Expand Up @@ -941,7 +933,7 @@ class ApplicationImp
}

bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
Expand All @@ -959,12 +951,12 @@ class ApplicationImp

JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";

using namespace std::chrono;
auto const start = steady_clock::now();

getNodeStore().import(*source);
m_nodeStore->import(*source);

auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
Expand All @@ -988,14 +980,6 @@ class ApplicationImp
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}

return true;
}

Expand Down Expand Up @@ -1250,8 +1234,8 @@ class ApplicationImp
// have listeners register for "onSweep ()" notification.

family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
Expand All @@ -1262,8 +1246,8 @@ class ApplicationImp
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();

// Set timer to do another sweep later.
Expand Down Expand Up @@ -1348,9 +1332,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;

if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);

using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (!shardStore_->init())
return false;
}

if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";
Expand Down
23 changes: 22 additions & 1 deletion src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,30 @@ std::array<char const*, 8> TxDBInit {{

////////////////////////////////////////////////////////////////////////////////

// Temporary database used with an incomplete shard that is being acquired
static constexpr auto AcquireDBName {"acquire.db"};

static constexpr
std::array<char const*, 3> AcquireDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

static constexpr
std::array<char const*, 1> AcquireDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
std::array<char const*, 2> CompltDBPragma {{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

Expand Down
Loading

0 comments on commit 46d85c4

Please sign in to comment.