Skip to content

Commit

Permalink
Implement node-to-shard RPC control
Browse files Browse the repository at this point in the history
  • Loading branch information
undertome committed Oct 15, 2021
1 parent b1db666 commit 811ad96
Show file tree
Hide file tree
Showing 14 changed files with 640 additions and 85 deletions.
3 changes: 2 additions & 1 deletion Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ target_sources (rippled PRIVATE
src/ripple/rpc/handlers/LogLevel.cpp
src/ripple/rpc/handlers/LogRotate.cpp
src/ripple/rpc/handlers/Manifest.cpp
src/ripple/rpc/handlers/NodeToShardStatus.cpp
src/ripple/rpc/handlers/NodeToShard.cpp
src/ripple/rpc/handlers/NoRippleCheck.cpp
src/ripple/rpc/handlers/OwnerInfo.cpp
src/ripple/rpc/handlers/PathFind.cpp
Expand Down Expand Up @@ -934,6 +934,7 @@ if (tests)
src/test/rpc/LedgerRPC_test.cpp
src/test/rpc/LedgerRequestRPC_test.cpp
src/test/rpc/ManifestRPC_test.cpp
src/test/rpc/NodeToShardRPC_test.cpp
src/test/rpc/NoRippleCheck_test.cpp
src/test/rpc/NoRipple_test.cpp
src/test/rpc/OwnerInfo_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/main/Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ printHelp(const po::options_description& desc)
" ledger_request <ledger>\n"
" log_level [[<partition>] <severity>]\n"
" logrotate\n"
" nodetoshard_status\n"
" node_to_shard [status|start|stop]\n"
" peers\n"
" ping\n"
" random\n"
Expand Down
9 changes: 9 additions & 0 deletions src/ripple/app/main/NodeStoreScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ NodeStoreScheduler::NodeStoreScheduler(JobQueue& jobQueue) : jobQueue_(jobQueue)
void
NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
{
if (jobQueue_.isStopped())
return;

if (!jobQueue_.addJob(jtWRITE, "NodeObject::store", [&task](Job&) {
task.performScheduledTask();
}))
Expand All @@ -42,6 +45,9 @@ NodeStoreScheduler::scheduleTask(NodeStore::Task& task)
void
NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report)
{
if (jobQueue_.isStopped())
return;

jobQueue_.addLoadEvents(
report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ
: jtNS_SYNC_READ,
Expand All @@ -52,6 +58,9 @@ NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report)
void
NodeStoreScheduler::onBatchWrite(NodeStore::BatchWriteReport const& report)
{
if (jobQueue_.isStopped())
return;

jobQueue_.addLoadEvents(jtNS_WRITE, report.writeCount, report.elapsed);
}

Expand Down
11 changes: 10 additions & 1 deletion src/ripple/net/impl/RPCCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,15 @@ class RPCParser
return jvRequest;
}

Json::Value
parseNodeToShard(Json::Value const& jvParams)
{
Json::Value jvRequest;
jvRequest[jss::action] = jvParams[0u].asString();

return jvRequest;
}

// peer_reservations_add <public_key> [<name>]
Json::Value
parsePeerReservationsAdd(Json::Value const& jvParams)
Expand Down Expand Up @@ -1257,7 +1266,7 @@ class RPCParser
{"log_level", &RPCParser::parseLogLevel, 0, 2},
{"logrotate", &RPCParser::parseAsIs, 0, 0},
{"manifest", &RPCParser::parseManifest, 1, 1},
{"nodetoshard_status", &RPCParser::parseAsIs, 0, 0},
{"node_to_shard", &RPCParser::parseNodeToShard, 1, 1},
{"owner_info", &RPCParser::parseAccountItems, 1, 3},
{"peers", &RPCParser::parseAsIs, 0, 0},
{"ping", &RPCParser::parseAsIs, 0, 0},
Expand Down
16 changes: 16 additions & 0 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,25 @@ class DatabaseShard : public Database
[[nodiscard]] virtual boost::filesystem::path const&
getRootDir() const = 0;

/** Returns a JSON object detailing the status of an ongoing
database import if one is running, otherwise an error
object.
*/
virtual Json::Value
getDatabaseImportStatus() const = 0;

/** Initiates a NodeStore to ShardStore import and returns
the result in a JSON object.
*/
virtual Json::Value
startNodeToShard() = 0;

/** Terminates a NodeStore to ShardStore import and returns
the result in a JSON object.
*/
virtual Json::Value
stopNodeToShard() = 0;

/** Returns the first ledger sequence of the shard currently being imported
from the NodeStore
Expand Down
139 changes: 116 additions & 23 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,8 @@ DatabaseShardImp::stop()
}
}

std::unique_lock lock(mutex_);

// Notify the shard being imported
// from the node store to stop
if (databaseImportStatus_)
Expand All @@ -735,7 +737,28 @@ DatabaseShardImp::stop()
// Wait for the node store import thread
// if necessary
if (databaseImporter_.joinable())
databaseImporter_.join();
{
// Tells the import function to halt
haltDatabaseImport_ = true;

// Wait for the function to exit
while (databaseImportStatus_)
{
// Unlock just in case the import
// function is waiting on the mutex
lock.unlock();

std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
}

// Calling join while holding the mutex_ without
// first making sure that doImportDatabase has
// exited could lead to deadlock via the mutex
// acquisition that occurs in that function
if (databaseImporter_.joinable())
databaseImporter_.join();
}
}

void
Expand All @@ -754,15 +777,19 @@ DatabaseShardImp::importDatabase(Database& source)
return;
}

// Run the lengthy node store import process in the background
// on a dedicated thread.
databaseImporter_ = std::thread([this] { doImportDatabase(); });
startDatabaseImportThread(lock);
}

void
DatabaseShardImp::doImportDatabase()
{
if (isStopping())
auto shouldHalt = [this] {
bool expected = true;
return haltDatabaseImport_.compare_exchange_strong(expected, false) ||
isStopping();
};

if (shouldHalt())
return;

auto loadLedger =
Expand Down Expand Up @@ -848,7 +875,7 @@ DatabaseShardImp::doImportDatabase()
for (std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
++shardIndex)
{
if (isStopping())
if (shouldHalt())
return;

auto const pathDesignation = [this, shardIndex] {
Expand Down Expand Up @@ -920,7 +947,7 @@ DatabaseShardImp::doImportDatabase()
continue;
}

if (isStopping())
if (shouldHalt())
return;

bool const needsHistoricalPath =
Expand All @@ -938,7 +965,7 @@ DatabaseShardImp::doImportDatabase()
{
std::lock_guard lock(mutex_);

if (isStopping())
if (shouldHalt())
return;

databaseImportStatus_->currentIndex = shardIndex;
Expand Down Expand Up @@ -967,7 +994,7 @@ DatabaseShardImp::doImportDatabase()

while (auto const ledgerSeq = shard->prepare())
{
if (isStopping())
if (shouldHalt())
return;

// Not const so it may be moved later
Expand All @@ -989,7 +1016,7 @@ DatabaseShardImp::doImportDatabase()
recentStored = std::move(ledger);
}

if (isStopping())
if (shouldHalt())
return;

using namespace boost::filesystem;
Expand Down Expand Up @@ -1044,17 +1071,14 @@ DatabaseShardImp::doImportDatabase()
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to import from the NodeStore";
shard->removeOnDestroy();

if (shard)
shard->removeOnDestroy();
}
}

{
std::lock_guard lock(mutex_);
if (isStopping())
return;

databaseImportStatus_.reset();
}
if (shouldHalt())
return;

updateFileStats();
}
Expand Down Expand Up @@ -1200,10 +1224,10 @@ DatabaseShardImp::sweep()
Json::Value
DatabaseShardImp::getDatabaseImportStatus() const
{
Json::Value ret(Json::objectValue);

if (std::lock_guard lock(mutex_); databaseImportStatus_)
{
Json::Value ret(Json::objectValue);

ret[jss::firstShardIndex] = databaseImportStatus_->earliestIndex;
ret[jss::lastShardIndex] = databaseImportStatus_->latestIndex;
ret[jss::currentShardIndex] = databaseImportStatus_->currentIndex;
Expand All @@ -1216,11 +1240,59 @@ DatabaseShardImp::getDatabaseImportStatus() const
currentShard[jss::storedSeqs] = shard->getStoredSeqs();

ret[jss::currentShard] = currentShard;

if (haltDatabaseImport_)
ret[jss::message] = "Database import halt initiated...";

return ret;
}
else
ret = "Database import not running";

return ret;
return RPC::make_error(rpcINTERNAL, "Database import not running");
}

Json::Value
DatabaseShardImp::startNodeToShard()
{
std::lock_guard lock(mutex_);

if (!init_)
return RPC::make_error(rpcINTERNAL, "Shard store not initialized");

if (databaseImporter_.joinable())
return RPC::make_error(
rpcINTERNAL, "Database import already in progress");

if (isStopping())
return RPC::make_error(rpcINTERNAL, "Node is shutting down");

startDatabaseImportThread(lock);

Json::Value result(Json::objectValue);
result[jss::message] = "Database import initiated...";

return result;
}

Json::Value
DatabaseShardImp::stopNodeToShard()
{
std::lock_guard lock(mutex_);

if (!init_)
return RPC::make_error(rpcINTERNAL, "Shard store not initialized");

if (!databaseImporter_.joinable())
return RPC::make_error(rpcINTERNAL, "Database import not running");

if (isStopping())
return RPC::make_error(rpcINTERNAL, "Node is shutting down");

haltDatabaseImport_ = true;

Json::Value result(Json::objectValue);
result[jss::message] = "Database import halt initiated...";

return result;
}

std::optional<std::uint32_t>
Expand Down Expand Up @@ -2131,6 +2203,27 @@ DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
}
}

void
DatabaseShardImp::startDatabaseImportThread(std::lock_guard<std::mutex> const&)
{
// Run the lengthy node store import process in the background
// on a dedicated thread.
databaseImporter_ = std::thread([this] {
doImportDatabase();

std::lock_guard lock(mutex_);

// Make sure to clear this in case the import
// exited early.
databaseImportStatus_.reset();

// Detach the thread so subsequent attempts
// to start the import won't get held up by
// the old thread of execution
databaseImporter_.detach();
});
}

//------------------------------------------------------------------------------

std::unique_ptr<DatabaseShard>
Expand Down
13 changes: 13 additions & 0 deletions src/ripple/nodestore/impl/DatabaseShardImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ class DatabaseShardImp : public DatabaseShard
Json::Value
getDatabaseImportStatus() const override;

Json::Value
startNodeToShard() override;

Json::Value
stopNodeToShard() override;

std::optional<std::uint32_t>
getDatabaseImportSequence() const override;

Expand Down Expand Up @@ -285,6 +291,9 @@ class DatabaseShardImp : public DatabaseShard
// Thread for running node store import
std::thread databaseImporter_;

// Indicates whether the import should stop
std::atomic_bool haltDatabaseImport_{false};

// Initialize settings from the configuration file
// Lock must be held
bool
Expand Down Expand Up @@ -407,6 +416,10 @@ class DatabaseShardImp : public DatabaseShard
// Update peers with the status of every complete and incomplete shard
void
updatePeers(std::lock_guard<std::mutex> const& lock) const;

// Start the node store import process
void
startDatabaseImportThread(std::lock_guard<std::mutex> const&);
};

} // namespace NodeStore
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/rpc/handlers/Handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ doLogRotate(RPC::JsonContext&);
Json::Value
doManifest(RPC::JsonContext&);
Json::Value
doNodeToShardStatus(RPC::JsonContext&);
doNodeToShard(RPC::JsonContext&);
Json::Value
doNoRippleCheck(RPC::JsonContext&);
Json::Value
Expand Down
Loading

0 comments on commit 811ad96

Please sign in to comment.