Skip to content

Commit

Permalink
Improve shard concurrency
Browse files Browse the repository at this point in the history
* Reduce lock scope on all public functions
* Use TaskQueue to process shard finalization in separate thread
* Store shard last ledger hash and other info in backend
* Use temp SQLite DB versus control file when acquiring
* Remove boost serialization from cmake files
  • Loading branch information
miguelportilla committed Mar 25, 2020
1 parent b54d672 commit 78c9abb
Show file tree
Hide file tree
Showing 48 changed files with 1,911 additions and 1,302 deletions.
1 change: 0 additions & 1 deletion Builds/CMake/RippleConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ find_dependency (Boost 1.70
filesystem
program_options
regex
serialization
system
thread)
#[=========================================================[
Expand Down
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
subdir: overlay
Expand Down
4 changes: 1 addition & 3 deletions Builds/CMake/RippledInterface.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ target_compile_definitions (opts
>
$<$<BOOL:${beast_no_unit_test_inline}>:BEAST_NO_UNIT_TEST_INLINE=1>
$<$<BOOL:${beast_disable_autolink}>:BEAST_DONT_AUTOLINK_TO_WIN32_LIBRARIES=1>
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>
# doesn't currently compile ? :
$<$<BOOL:${verify_nodeobject_keys}>:RIPPLE_VERIFY_NODEOBJECT_KEYS=1>)
$<$<BOOL:${single_io_service_thread}>:RIPPLE_SINGLE_IO_SERVICE_THREAD=1>)
target_compile_options (opts
INTERFACE
$<$<AND:$<BOOL:${is_gcc}>,$<COMPILE_LANGUAGE:CXX>>:-Wsuggest-override>
Expand Down
6 changes: 0 additions & 6 deletions Builds/CMake/RippledSettings.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ option (have_package_container
option (beast_no_unit_test_inline
"Prevents unit test definitions from being inserted into global table"
OFF)
# NOTE - THIS OPTION CURRENTLY DOES NOT COMPILE :
# TODO: fix or remove
option (verify_nodeobject_keys
"This verifies that the hash of node objects matches the payload. \
This check is expensive - use with caution."
OFF)
option (single_io_service_thread
"Restricts the number of threads calling io_service::run to one. \
This can be useful when debugging."
Expand Down
2 changes: 0 additions & 2 deletions Builds/CMake/deps/Boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ find_package (Boost 1.70 REQUIRED
filesystem
program_options
regex
serialization
system
thread)

Expand All @@ -69,7 +68,6 @@ target_link_libraries (ripple_boost
Boost::filesystem
Boost::program_options
Boost::regex
Boost::serialization
Boost::system
Boost::thread)
if (Boost_COMPILER)
Expand Down
1 change: 0 additions & 1 deletion Builds/containers/shared/install_boost.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ else
BLDARGS+=(--with-filesystem)
BLDARGS+=(--with-program_options)
BLDARGS+=(--with-regex)
BLDARGS+=(--with-serialization)
BLDARGS+=(--with-system)
BLDARGS+=(--with-atomic)
BLDARGS+=(--with-thread)
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ InboundLedger::init(ScopedLockType& collectionLock)
if (mFailed)
return;
}
else if (shardStore && mSeq >= shardStore->earliestSeq())
else if (shardStore && mSeq >= shardStore->earliestLedgerSeq())
{
if (auto l = shardStore->fetchLedger(mHash, mSeq))
{
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class InboundLedgersImp
if (reason == InboundLedger::Reason::HISTORY)
{
if (inbound->getLedger()->stateMap().family().isShardBacked())
app_.getNodeStore().copyLedger(inbound->getLedger());
app_.getNodeStore().storeLedger(inbound->getLedger());
}
else if (reason == InboundLedger::Reason::SHARD)
{
Expand All @@ -120,7 +120,7 @@ class InboundLedgersImp
if (inbound->getLedger()->stateMap().family().isShardBacked())
shardStore->setStored(inbound->getLedger());
else
shardStore->copyLedger(inbound->getLedger());
shardStore->storeLedger(inbound->getLedger());
}
return inbound->getLedger();
}
Expand Down
8 changes: 4 additions & 4 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1742,7 +1742,7 @@ LedgerMaster::fetchForHistory(
*hash, missing, reason);
if (!ledger &&
missing != fetch_seq_ &&
missing > app_.getNodeStore().earliestSeq())
missing > app_.getNodeStore().earliestLedgerSeq())
{
JLOG(m_journal.trace())
<< "fetchForHistory want fetch pack " << missing;
Expand Down Expand Up @@ -1771,7 +1771,7 @@ LedgerMaster::fetchForHistory(
mShardLedger = ledger;
}
if (!ledger->stateMap().family().isShardBacked())
app_.getShardStore()->copyLedger(ledger);
app_.getShardStore()->storeLedger(ledger);
}
else
{
Expand Down Expand Up @@ -1807,7 +1807,7 @@ LedgerMaster::fetchForHistory(
else
// Do not fetch ledger sequences lower
// than the earliest ledger sequence
fetchSz = app_.getNodeStore().earliestSeq();
fetchSz = app_.getNodeStore().earliestLedgerSeq();
fetchSz = missing >= fetchSz ?
std::min(ledger_fetch_size_, (missing - fetchSz) + 1) : 0;
try
Expand Down Expand Up @@ -1867,7 +1867,7 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
std::lock_guard sll(mCompleteLock);
missing = prevMissing(mCompleteLedgers,
mPubLedger->info().seq,
app_.getNodeStore().earliestSeq());
app_.getNodeStore().earliestLedgerSeq());
}
if (missing)
{
Expand Down
63 changes: 32 additions & 31 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ class ApplicationImp
// These are Stoppable-related
std::unique_ptr <JobQueue> m_jobQueue;
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
detail::AppFamily family_;
std::unique_ptr <detail::AppFamily> sFamily_;
std::unique_ptr <NodeStore::DatabaseShard> shardStore_;
std::unique_ptr <detail::AppFamily> shardFamily_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
Expand Down Expand Up @@ -463,18 +463,18 @@ class ApplicationImp
m_collectorManager->group ("jobq"), m_nodeStoreScheduler,
logs_->journal("JobQueue"), *logs_, *perfLog_))

, m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4))
, m_nodeStore (m_shaMapStore->makeNodeStore ("NodeStore.main", 4))

, family_ (*this, *m_nodeStore, *m_collectorManager)

// The shard store is optional and make_ShardStore can return null.
, shardStore_(make_ShardStore(
, shardStore_ (make_ShardStore (
*this,
*m_jobQueue,
m_nodeStoreScheduler,
4,
logs_->journal("ShardStore")))

, family_ (*this, *m_nodeStore, *m_collectorManager)

, m_orderBookDB (*this, *m_jobQueue)

, m_pathRequests (std::make_unique<PathRequests> (
Expand Down Expand Up @@ -558,14 +558,6 @@ class ApplicationImp
logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this))
{
if (shardStore_)
{
sFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);
}

add (m_resourceManager.get ());

//
Expand Down Expand Up @@ -626,7 +618,7 @@ class ApplicationImp

Family* shardFamily() override
{
return sFamily_.get();
return shardFamily_.get();
}

TimeKeeper&
Expand Down Expand Up @@ -943,7 +935,7 @@ class ApplicationImp
}

bool
initNodeStoreDBs()
initNodeStore()
{
if (config_->doImport)
{
Expand All @@ -961,12 +953,12 @@ class ApplicationImp

JLOG(j.warn()) <<
"Starting node import from '" << source->getName() <<
"' to '" << getNodeStore().getName() << "'.";
"' to '" << m_nodeStore->getName() << "'.";

using namespace std::chrono;
auto const start = steady_clock::now();

getNodeStore().import(*source);
m_nodeStore->import(*source);

auto const elapsed = duration_cast <seconds>
(steady_clock::now() - start);
Expand All @@ -990,14 +982,6 @@ class ApplicationImp
family().treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (sFamily_)
{
sFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
sFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});
}

return true;
}

Expand Down Expand Up @@ -1252,8 +1236,8 @@ class ApplicationImp
// have listeners register for "onSweep ()" notification.

family().fullbelow().sweep();
if (sFamily_)
sFamily_->fullbelow().sweep();
if (shardFamily_)
shardFamily_->fullbelow().sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
if (shardStore_)
Expand All @@ -1264,8 +1248,8 @@ class ApplicationImp
getInboundLedgers().sweep();
m_acceptedLedgerCache.sweep();
family().treecache().sweep();
if (sFamily_)
sFamily_->treecache().sweep();
if (shardFamily_)
shardFamily_->treecache().sweep();
cachedSLEs_.expire();

// Set timer to do another sweep later.
Expand Down Expand Up @@ -1350,9 +1334,26 @@ bool ApplicationImp::setup()
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);

if (!initSQLiteDBs() || !initNodeStoreDBs())
if (!initSQLiteDBs() || !initNodeStore())
return false;

if (shardStore_)
{
shardFamily_ = std::make_unique<detail::AppFamily>(
*this,
*shardStore_,
*m_collectorManager);

using namespace std::chrono;
shardFamily_->treecache().setTargetSize(
config_->getValueFor(SizedItem::treeCacheSize));
shardFamily_->treecache().setTargetAge(
seconds{config_->getValueFor(SizedItem::treeCacheAge)});

if (!shardStore_->init())
return false;
}

if (!peerReservations_->load(getWalletDB()))
{
JLOG(m_journal.fatal()) << "Cannot find peer reservations!";
Expand Down
41 changes: 31 additions & 10 deletions src/ripple/app/main/DBInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ namespace ripple {
////////////////////////////////////////////////////////////////////////////////

// Ledger database holds ledgers and ledger confirmations
static constexpr auto LgrDBName {"ledger.db"};
inline constexpr auto LgrDBName {"ledger.db"};

static constexpr
inline constexpr
std::array<char const*, 3> LgrDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

static constexpr
inline constexpr
std::array<char const*, 5> LgrDBInit {{
"BEGIN TRANSACTION;",

Expand All @@ -63,9 +63,9 @@ std::array<char const*, 5> LgrDBInit {{
////////////////////////////////////////////////////////////////////////////////

// Transaction database holds transactions and public keys
static constexpr auto TxDBName {"transaction.db"};
inline constexpr auto TxDBName {"transaction.db"};

static constexpr
inline constexpr
#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP)
std::array<char const*, 6> TxDBPragma {{
#else
Expand All @@ -81,7 +81,7 @@ static constexpr
#endif
}};

static constexpr
inline constexpr
std::array<char const*, 8> TxDBInit {{
"BEGIN TRANSACTION;",

Expand Down Expand Up @@ -116,18 +116,39 @@ std::array<char const*, 8> TxDBInit {{

////////////////////////////////////////////////////////////////////////////////

// Temporary database used with an incomplete shard that is being acquired
inline constexpr auto AcquireShardDBName {"acquire.db"};

inline constexpr
std::array<char const*, 3> AcquireShardDBPragma {{
"PRAGMA synchronous=NORMAL;",
"PRAGMA journal_mode=WAL;",
"PRAGMA journal_size_limit=1582080;"
}};

inline constexpr
std::array<char const*, 1> AcquireShardDBInit {{
"CREATE TABLE IF NOT EXISTS Shard ( \
ShardIndex INTEGER PRIMARY KEY, \
LastLedgerHash CHARACTER(64), \
StoredLedgerSeqs BLOB \
);"
}};

////////////////////////////////////////////////////////////////////////////////

// Pragma for Ledger and Transaction databases with complete shards
static constexpr
std::array<char const*, 2> CompleteShardDBPragma {{
inline constexpr
std::array<char const*, 2> CompleteShardDBPragma{{
"PRAGMA synchronous=OFF;",
"PRAGMA journal_mode=OFF;"
}};

////////////////////////////////////////////////////////////////////////////////

static constexpr auto WalletDBName {"wallet.db"};
inline constexpr auto WalletDBName {"wallet.db"};

static constexpr
inline constexpr
std::array<char const*, 6> WalletDBInit {{
"BEGIN TRANSACTION;",

Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void printHelp (const po::options_description& desc)
" connect <ip> [<port>]\n"
" consensus_info\n"
" deposit_authorized <source_account> <destination_account> [<ledger>]\n"
" download_shard [[<index> <url>]] <validate>\n"
" download_shard [[<index> <url>]]\n"
" feature [<feature> [accept|reject]]\n"
" fetch_info [clear]\n"
" gateway_balances [<ledger>] <issuer_account> [ <hotwallet> [ <hotwallet> ]]\n"
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,16 @@ SHAMapStoreImp::run()
std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
std::unique_ptr<NodeStore::Backend> oldBackend;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

state_db_.setState (SavedState {newBackend->getName(),
nextArchiveDir, lastRotated});
clearCaches (validatedSeq);
oldBackend = dbRotating_->rotateBackends(
std::move(newBackend));
std::move(newBackend),
lock);
}
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;

Expand Down
Loading

0 comments on commit 78c9abb

Please sign in to comment.