Skip to content

Commit

Permalink
[FOLD] Address feedback from Scott Schurr
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Mar 10, 2021
1 parent 00efbcf commit 58bffc7
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 131 deletions.
5 changes: 5 additions & 0 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ class DatabaseShard : public Database
*/
virtual boost::filesystem::path const&
getRootDir() const = 0;

/** Returns the number of queued tasks
*/
[[nodiscard]] virtual size_t
getNumTasks() const = 0;
};

extern std::unique_ptr<DatabaseShard>
Expand Down
239 changes: 115 additions & 124 deletions src/ripple/nodestore/impl/DatabaseShardImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1310,8 +1310,8 @@ DatabaseShardImp::finalizeShard(
relocateOutdatedShards(lock);

// Set the appropriate recent shard index
if (boundaryIndex == shard->index())
secondLatestShardIndex_ = boundaryIndex;
if (shard->index() == boundaryIndex)
secondLatestShardIndex_ = shard->index();
else
latestShardIndex_ = shard->index();

Expand Down Expand Up @@ -1533,151 +1533,135 @@ void
DatabaseShardImp::relocateOutdatedShards(
std::lock_guard<std::mutex> const& lock)
{
if (auto& cur = latestShardIndex_, &prev = secondLatestShardIndex_;
cur || prev)
{
auto const latestShardIndex =
seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex());
auto& cur{latestShardIndex_};
auto& prev{secondLatestShardIndex_};
if (!cur && !prev)
return;

auto const separateHistoricalPath = !historicalPaths_.empty();
auto const latestShardIndex =
seqToShardIndex(app_.getLedgerMaster().getValidLedgerIndex());
auto const separateHistoricalPath = !historicalPaths_.empty();

auto const removeShard =
[this](std::uint32_t const shardIndex) -> void {
canAdd_ = false;
auto const removeShard = [this](std::uint32_t const shardIndex) -> void {
canAdd_ = false;

if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (it->second)
removeFailedShard(it->second);
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
}
if (auto it = shards_.find(shardIndex); it != shards_.end())
{
if (it->second)
removeFailedShard(it->second);
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
};
}
else
{
JLOG(j_.warn()) << "can't find shard to remove";
}
};

auto const keepShard =
[this, &lock, removeShard, separateHistoricalPath](
std::uint32_t const shardIndex) -> bool {
if (numHistoricalShards(lock) >= maxHistoricalShards_)
{
JLOG(j_.error())
<< "maximum number of historical shards reached";
auto const keepShard = [this, &lock, removeShard, separateHistoricalPath](
std::uint32_t const shardIndex) -> bool {
if (numHistoricalShards(lock) >= maxHistoricalShards_)
{
JLOG(j_.error()) << "maximum number of historical shards reached";
removeShard(shardIndex);
return false;
}
if (separateHistoricalPath &&
!sufficientStorage(1, PathDesignation::historical, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
removeShard(shardIndex);
return false;
}

removeShard(shardIndex);
return false;
}
if (separateHistoricalPath &&
!sufficientStorage(1, PathDesignation::historical, lock))
{
JLOG(j_.error()) << "insufficient storage space available";
return true;
};

removeShard(shardIndex);
return false;
}
// Move a shard from the main shard path to a historical shard
// path by copying the contents, and creating a new shard.
auto const moveShard = [this,
&lock](std::uint32_t const shardIndex) -> void {
auto it{shards_.find(shardIndex)};
if (it == shards_.end())
{
JLOG(j_.warn()) << "can't find shard to move to historical path";
return;
}

return true;
};
auto& shard{it->second};

// Move a shard from the main shard path to a historical shard
// path by copying the contents, and creating a new shard.
auto const moveShard = [this,
&lock](std::uint32_t const shardIndex) -> void {
auto const dst = chooseHistoricalPath(lock);
// Close any open file descriptors before moving the shard
// directory. Don't call removeOnDestroy since that would
// attempt to close the fds after the directory has been moved.
if (!shard->tryClose())
{
JLOG(j_.warn()) << "can't close shard to move to historical path";
return;
}

if (auto it = shards_.find(shardIndex); it != shards_.end())
{
auto& shard{it->second};
auto const dst{chooseHistoricalPath(lock)};
try
{
// Move the shard directory to the new path
boost::filesystem::rename(
shard->getDir().string(), dst / std::to_string(shardIndex));
}
catch (...)
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to move to historical storage";
return;
}

// Close any open file descriptors before moving the shard
// directory. Don't call removeOnDestroy since that would
// attempt to close the fds after the directory has been moved.
if (!shard->tryClose())
{
JLOG(j_.warn())
<< "can't close shard to move to historical path";
return;
}
// Create a shard instance at the new location
shard = std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);

try
{
// Move the shard directory to the new path
boost::filesystem::rename(
shard->getDir().string(),
dst / std::to_string(shardIndex));
}
catch (...)
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to move to historical storage";
return;
}
// Open the new shard
if (!shard->init(scheduler_, *ctx_))
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to open in historical storage";
shard->removeOnDestroy();
shard.reset();
}
};

// Create a shard instance at the new location
shard =
std::make_shared<Shard>(app_, *this, shardIndex, dst, j_);
// See if either of the recent shards needs to be updated
bool const curNotSynched =
latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
bool const prevNotSynched = secondLatestShardIndex_ &&
*secondLatestShardIndex_ != latestShardIndex - 1;

// Open the new shard
if (!shard->init(scheduler_, *ctx_))
{
JLOG(j_.error()) << "shard " << shardIndex
<< " failed to open in historical storage";
shard->removeOnDestroy();
shard.reset();
}
}
else
{
JLOG(j_.warn())
<< "can't find shard to move to historical path";
}
};
// A new shard has been published. Move outdated
// shards to historical storage as needed
if (curNotSynched || prevNotSynched)
{
if (prev)
{
// Move the formerly second latest shard to historical storage
if (keepShard(*prev) && separateHistoricalPath)
moveShard(*prev);

// See if either of the recent shards needs to be updated
bool const curNotSynched =
latestShardIndex_ && *latestShardIndex_ != latestShardIndex;
bool const prevNotSynched = secondLatestShardIndex_ &&
*secondLatestShardIndex_ != latestShardIndex - 1;
prev = boost::none;
}

// A new shard has been published. Move outdated
// shards to historical storage as needed
if (curNotSynched || prevNotSynched)
if (cur)
{
if (prev)
{
// Move the formerly second latest shard to historical storage
if (keepShard(*prev) && separateHistoricalPath)
{
moveShard(*prev);
}

prev = boost::none;
}
// The formerly latest shard is now the second latest
if (cur == latestShardIndex - 1)
prev = cur;

if (cur)
// The formerly latest shard is no longer a 'recent' shard
else
{
// The formerly latest shard is now the second latest
if (cur == latestShardIndex - 1)
{
prev = cur;
}

// The formerly latest shard is no longer a 'recent' shard
else
{
// Move the formerly latest shard to historical storage
if (keepShard(*cur) && separateHistoricalPath)
{
moveShard(*cur);
}
}

cur = boost::none;
// Move the formerly latest shard to historical storage
if (keepShard(*cur) && separateHistoricalPath)
moveShard(*cur);
}

cur = boost::none;
}
}
}
Expand Down Expand Up @@ -1843,6 +1827,13 @@ DatabaseShardImp::getShardInfo(std::lock_guard<std::mutex> const&) const
return shardInfo;
}

size_t
DatabaseShardImp::getNumTasks() const
{
std::lock_guard lock(mutex_);
return taskQueue_->size();
}

void
DatabaseShardImp::updatePeers(std::lock_guard<std::mutex> const& lock) const
{
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/nodestore/impl/DatabaseShardImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class DatabaseShardImp : public DatabaseShard
std::unique_ptr<ShardInfo>
getShardInfo() const override;

[[nodiscard]] size_t
getNumTasks() const override;

boost::filesystem::path const&
getRootDir() const override
{
Expand Down
2 changes: 1 addition & 1 deletion src/ripple/nodestore/impl/Shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,6 @@ Shard::finalize(
}

lastAccess_ = std::chrono::steady_clock::now();
state_ = ShardState::finalized;

if (!initSQLite(lock))
return fail("failed to initialize SQLite databases");
Expand All @@ -797,6 +796,7 @@ Shard::finalize(
". Error: " + e.what());
}

state_ = ShardState::finalized;
return true;
}

Expand Down
8 changes: 8 additions & 0 deletions src/ripple/nodestore/impl/TaskQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class TaskQueue : public Stoppable, private Workers::Callback
void
addTask(std::function<void()> task);

/** Return the queue size
*/
size_t
size() const
{
return tasks_.size();
}

private:
std::mutex mutex_;
Workers workers_;
Expand Down
12 changes: 6 additions & 6 deletions src/test/nodestore/DatabaseShard_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,17 +471,17 @@ class DatabaseShard_test : public TestBase

std::optional<std::uint32_t>
waitShard(
DatabaseShard& db,
DatabaseShard& shardStore,
std::uint32_t shardIndex,
std::chrono::seconds timeout = shardStoreTimeout)
{
auto const end{std::chrono::system_clock::now() + timeout};
while (
!boost::icl::contains(db.getShardInfo()->finalized(), shardIndex))
while (shardStore.getNumTasks() || !boost::icl::contains(
shardStore.getShardInfo()->finalized(), shardIndex))
{
if (!BEAST_EXPECT(std::chrono::system_clock::now() < end))
return std::nullopt;
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

return shardIndex;
Expand All @@ -492,7 +492,7 @@ class DatabaseShard_test : public TestBase
TestData& data,
DatabaseShard& shardStore,
int maxShardIndex = 1,
int ledgerOffset = 0)
int shardOffset = 0)
{
int shardIndex{-1};

Expand All @@ -505,7 +505,7 @@ class DatabaseShard_test : public TestBase

shardIndex = shardStore.seqToShardIndex(*ledgerSeq);

int const arrInd = *ledgerSeq - (ledgersPerShard * ledgerOffset) -
int const arrInd = *ledgerSeq - (ledgersPerShard * shardOffset) -
ledgersPerShard - 1;
BEAST_EXPECT(
arrInd >= 0 && arrInd < maxShardIndex * ledgersPerShard);
Expand Down

0 comments on commit 58bffc7

Please sign in to comment.