Skip to content

Commit

Permalink
[FOLD] Improve shard shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Feb 24, 2020
1 parent 96810ea commit fb7ee53
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 22 deletions.
24 changes: 15 additions & 9 deletions src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,23 @@ Database::copyLedger(
batch.reserve(batchWritePreallocationSize);
};
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
auto visit = [&](SHAMapAbstractNode& node)
{
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), srcLedger.info().seq))
{
batch.emplace_back(std::move(nObj));
if (batch.size() >= batchWritePreallocationSize)
storeBatch();
if (batch.size() < batchWritePreallocationSize)
return true;

storeBatch();

if (!isStopping())
return true;
}
else
error = true;
return !error;

error = true;
return false;
};

// Store ledger header
Expand All @@ -319,10 +325,10 @@ Database::copyLedger(
{
auto have = next->stateMap().snapShot(false);
srcLedger.stateMap().snapShot(
false)->visitDifferences(&(*have), f);
false)->visitDifferences(&(*have), visit);
}
else
srcLedger.stateMap().snapShot(false)->visitNodes(f);
srcLedger.stateMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
}
Expand All @@ -337,7 +343,7 @@ Database::copyLedger(
" transaction map invalid";
return false;
}
srcLedger.txMap().snapShot(false)->visitNodes(f);
srcLedger.txMap().snapShot(false)->visitNodes(visit);
if (error)
return false;
}
Expand Down
37 changes: 33 additions & 4 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ DatabaseShardImp::DatabaseShardImp(

DatabaseShardImp::~DatabaseShardImp()
{
// Stop threads before data members are destroyed
stopThreads();
if (!isStopping())
onStop();

// Close backend databases before destroying the context
std::lock_guard lock(mutex_);
Expand Down Expand Up @@ -673,6 +673,19 @@ DatabaseShardImp::validate()
app_.shardFamily()->reset();
}

void
DatabaseShardImp::onStop()
{
{
std::lock_guard lock(mutex_);
for (auto const& e : shards_)
if (e.second.shard)
e.second.shard->stop();
}

Database::onStop();
}

void
DatabaseShardImp::import(Database& source)
{
Expand Down Expand Up @@ -972,6 +985,9 @@ DatabaseShardImp::asyncFetch(
bool
DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& srcLedger)
{
if (isStopping())
return false;

auto const seq {srcLedger->info().seq};
auto const shardIndex {seqToShardIndex(seq)};
std::shared_ptr<Shard> shard;
Expand Down Expand Up @@ -1195,6 +1211,9 @@ DatabaseShardImp::finalizeShard(
shardInfo.state = ShardInfo::State::finalize;
taskQueue_->addTask([this, shardIndex, writeSQLite]() mutable
{
if (isStopping())
return;

std::shared_ptr<Shard> shard;
{
std::lock_guard lock(mutex_);
Expand All @@ -1210,6 +1229,9 @@ DatabaseShardImp::finalizeShard(

if (!shard->finalize(writeSQLite))
{
if (isStopping())
return;

// Finalize failed, remove shard
{
std::lock_guard lock(mutex_);
Expand All @@ -1234,6 +1256,9 @@ DatabaseShardImp::finalizeShard(
return;
}

if (isStopping())
return;

{
std::lock_guard lock(mutex_);
auto const it {shards_.find(shardIndex)};
Expand Down Expand Up @@ -1277,7 +1302,8 @@ DatabaseShardImp::setFileStats()
return;

for (auto const& e : shards_)
wptrShards.push_back(e.second.shard);
if (e.second.shard)
wptrShards.push_back(e.second.shard);
}

std::uint64_t sumSz {0};
Expand Down Expand Up @@ -1318,7 +1344,8 @@ DatabaseShardImp::updateStatus(std::lock_guard<std::mutex>&)
{
RangeSet<std::uint32_t> rs;
for (auto const& e : shards_)
rs.insert(e.second.shard->index());
if (e.second.state == ShardInfo::State::final)
rs.insert(e.second.shard->index());
status_ = to_string(rs);
}
else
Expand All @@ -1336,7 +1363,9 @@ DatabaseShardImp::getCache(std::uint32_t seq)

if (auto const it {shards_.find(shardIndex)};
it != shards_.end() && it->second.shard)
{
shard = it->second.shard;
}
else
return {};
}
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/nodestore/impl/DatabaseShardImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ class DatabaseShardImp : public DatabaseShard
return backendName_;
}

void
onStop() override;

/** Import the application local node store
@param source The application node store.
Expand Down
9 changes: 7 additions & 2 deletions src/ripple/nodestore/impl/Shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ Shard::valLedger(
bool error {false};
auto visit = [this, &error](SHAMapAbstractNode& node)
{
if (stop_)
return false;
if (!valFetch(node.getNodeHash().as_uint256()))
error = true;
return !error;
Expand All @@ -957,6 +959,8 @@ Shard::valLedger(
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
if (stop_)
return false;
if (error)
return fail("Invalid state map");
}
Expand All @@ -976,6 +980,8 @@ Shard::valLedger(
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
if (stop_)
return false;
if (error)
return fail("Invalid transaction map");
}
Expand All @@ -1001,7 +1007,7 @@ Shard::valFetch(uint256 const& hash)
switch (backend_->fetch(hash.begin(), &nObj))
{
case ok:
break;
return nObj;
case notFound:
return fail("Missing node object");
case dataCorrupt:
Expand All @@ -1015,7 +1021,6 @@ Shard::valFetch(uint256 const& hash)
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
return nObj;
}

} // NodeStore
Expand Down
21 changes: 14 additions & 7 deletions src/ripple/nodestore/impl/Shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <boost/filesystem.hpp>
#include <nudb/nudb.hpp>

#include <atomic>
#include <tuple>

namespace ripple {
Expand Down Expand Up @@ -122,6 +123,9 @@ class Shard final
bool
finalize(const bool writeSQLite);

void
stop() {stop_ = true;}

private:
// Current shard version
static constexpr std::uint32_t version_ {1};
Expand All @@ -147,10 +151,6 @@ class Shard final
// The earliest shard may store less ledgers than subsequent shards
std::uint32_t const maxLedgers_;

// Older shard without an acquire database or final key
// Eventually this should be removed
bool legacy_ {false};

// Database positive cache
std::shared_ptr<PCache> pCache_;

Expand Down Expand Up @@ -183,15 +183,22 @@ class Shard final
// True if backend has stored all ledgers pertaining to the shard
bool backendComplete_ {false};

// True if the shard is complete, validated, and immutable
bool final_ {false};

// Tracks ledger sequences stored in the backend when building a shard
RangeSet<std::uint32_t> storedSeqs_;

// Used as an optimization for visitDifferences
std::shared_ptr<Ledger const> lastStored_;

// Older shard without an acquire database or final key
// Eventually there will be no need for this and should be removed
bool legacy_ {false};

// True if the backend has a final key stored
bool final_ {false};

// Determines if the shard needs to stop processing for shutdown
std::atomic<bool> stop_ {false};

// Set the backend cache
// Lock over mutex_ required
void
Expand Down

0 comments on commit fb7ee53

Please sign in to comment.