Skip to content

Commit

Permalink
[FOLD] Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Jan 29, 2021
1 parent 0e2cdb6 commit 2a0b097
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 458 deletions.
2 changes: 1 addition & 1 deletion Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
"../../src/ripple/nodestore/impl/ShardInfo.cpp"
src/ripple/nodestore/impl/ShardInfo.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
Expand Down
92 changes: 86 additions & 6 deletions src/ripple/nodestore/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,79 @@ class Database : public Stoppable
void
onChildrenStopped() override;

/** @return The maximum number of ledgers stored in a shard
*/
[[nodiscard]] std::uint32_t
ledgersPerShard() const noexcept
{
return ledgersPerShard_;
}

/** @return The earliest ledger sequence allowed
*/
std::uint32_t
earliestLedgerSeq() const
[[nodiscard]] std::uint32_t
earliestLedgerSeq() const noexcept
{
return earliestLedgerSeq_;
}

/** @return The earliest shard index
*/
[[nodiscard]] std::uint32_t
earliestShardIndex() const noexcept
{
return earliestShardIndex_;
}

/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestLedgerSeq_;
return 1 + (shardIndex * ledgersPerShard_);
}

/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
return (shardIndex + 1) * ledgersPerShard_;
}

/** Calculates the shard index for a given ledger sequence
@param ledgerSeq ledger sequence
@return The shard index of the ledger sequence
*/
[[nodiscard]] std::uint32_t
seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
{
assert(ledgerSeq >= earliestLedgerSeq_);
return (ledgerSeq - 1) / ledgersPerShard_;
}

/** Calculates the maximum ledgers for a given shard index
@param shardIndex The shard index considered
@return The maximum ledgers pertaining to the shard index
@note The earliest shard may store less if the earliest ledger
sequence truncates its beginning
*/
[[nodiscard]] std::uint32_t
maxLedgers(std::uint32_t shardIndex) const noexcept;

protected:
beast::Journal const j_;
Scheduler& scheduler_;
Expand All @@ -247,6 +312,25 @@ class Database : public Stoppable
std::atomic<std::uint32_t> fetchHitCount_{0};
std::atomic<std::uint32_t> fetchSz_{0};

// The default is DEFAULT_LEDGERS_PER_SHARD (16384) to match the XRP ledger
// network. Can be set through the configuration file using the
// 'ledgers_per_shard' field under the 'node_db' and 'shard_db' stanzas.
// If specified, the value must be a multiple of 256 and equally assigned
// in both stanzas. Only unit tests or alternate networks should change
// this value.
std::uint32_t const ledgersPerShard_;

// The default is XRP_LEDGER_EARLIEST_SEQ (32570) to match the XRP ledger
// network's earliest allowed ledger sequence. Can be set through the
// configuration file using the 'earliest_seq' field under the 'node_db'
// and 'shard_db' stanzas. If specified, the value must be greater than zero
// and equally assigned in both stanzas. Only unit tests or alternate
// networks should change this value.
std::uint32_t const earliestLedgerSeq_;

// The earliest shard index
std::uint32_t const earliestShardIndex_;

void
stopReadThreads();

Expand Down Expand Up @@ -302,10 +386,6 @@ class Database : public Stoppable
std::vector<std::thread> readThreads_;
bool readShut_{false};

// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t const earliestLedgerSeq_;

virtual std::shared_ptr<NodeObject>
fetchNodeObject(
uint256 const& hash,
Expand Down
55 changes: 5 additions & 50 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class DatabaseShard : public Database
@return `true` if the database initialized without error
*/
virtual [[nodiscard]] bool
[[nodiscard]] virtual bool
init() = 0;

/** Prepare to store a new ledger in the shard being acquired
Expand All @@ -75,7 +75,7 @@ class DatabaseShard : public Database
between requests.
@implNote adds a new writable shard if necessary
*/
virtual [[nodiscard]] boost::optional<std::uint32_t>
[[nodiscard]] virtual boost::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) = 0;

/** Prepare one or more shard indexes to be imported into the database
Expand Down Expand Up @@ -118,7 +118,7 @@ class DatabaseShard : public Database
@param seq The sequence of the ledger
@return The ledger if found, nullptr otherwise
*/
virtual [[nodiscard]] std::shared_ptr<Ledger>
[[nodiscard]] virtual std::shared_ptr<Ledger>
fetchLedger(uint256 const& hash, std::uint32_t seq) = 0;

/** Notifies the database that the given ledger has been
Expand All @@ -133,61 +133,16 @@ class DatabaseShard : public Database
@return Information about shards held by this node
*/
virtual [[nodiscard]] std::unique_ptr<ShardInfo>
[[nodiscard]] virtual std::unique_ptr<ShardInfo>
getShardInfo() = 0;

/** @return The maximum number of ledgers stored in a shard
*/
virtual std::uint32_t
ledgersPerShard() const = 0;

/** @return The earliest shard index
*/
virtual std::uint32_t
earliestShardIndex() const = 0;

/** Calculates the shard index for a given ledger sequence
@param seq ledger sequence
@return The shard index of the ledger sequence
*/
virtual std::uint32_t
seqToShardIndex(std::uint32_t seq) const = 0;

/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const = 0;

/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const = 0;

/** Returns the root database directory
*/
virtual boost::filesystem::path const&
getRootDir() const = 0;

/** The number of ledgers in a shard */
static constexpr std::uint32_t ledgersPerShardDefault{16384u};
};

constexpr std::uint32_t
seqToShardIndex(
std::uint32_t ledgerSeq,
std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault)
{
return (ledgerSeq - 1) / ledgersPerShard;
}

extern [[nodiscard]] std::unique_ptr<DatabaseShard>
extern std::unique_ptr<DatabaseShard>
make_ShardStore(
Application& app,
Stoppable& parent,
Expand Down
50 changes: 39 additions & 11 deletions src/ripple/nodestore/ShardInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,61 @@ namespace NodeStore {
*/
class ShardInfo
{
public:
struct Incomplete
private:
class Incomplete
{
public:
Incomplete() = delete;
Incomplete(State state_, float progress_)
: state(state_), progress(progress_)
Incomplete(ShardState state_, std::uint32_t percentProgress_)
: state(state_), percentProgress(percentProgress_)
{
}

State const state;
float const progress;
ShardState const state;
std::uint32_t const percentProgress;
};

public:
[[nodiscard]] std::string
finalToString() const;
finalizedToString() const;

[[nodiscard]] bool
setFinalizedFromString(std::string const& str)
{
return from_string(finalized_, str);
}

[[nodiscard]] RangeSet<std::uint32_t> const&
finalized() const
{
return finalized_;
}

[[nodiscard]] std::string
incompleteToString() const;

[[nodiscard]] std::map<std::uint32_t, Incomplete> const&
incomplete() const
{
return incomplete_;
}

// Returns true if successful or false because of a duplicate index
bool
update(
std::uint32_t shardIndex,
ShardState state,
std::uint32_t percentProgress);

[[nodiscard]] protocol::TMPeerShardInfoV2
makeMessage(Application& app) const;

// Complete and verified immutable shards
RangeSet<std::uint32_t> final;
private:
// Finalized immutable shards
RangeSet<std::uint32_t> finalized_;

// Incomplete shards being acquired or verified
std::map<std::uint32_t, Incomplete> incomplete;
// Incomplete shards being acquired or finalized
std::map<std::uint32_t, Incomplete> incomplete_;
};

} // namespace NodeStore
Expand Down
19 changes: 7 additions & 12 deletions src/ripple/nodestore/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,17 @@ enum Status {
/** A batch of NodeObjects to write at once. */
using Batch = std::vector<std::shared_ptr<NodeObject>>;

} // namespace NodeStore

/** Shard states. */
enum class State {
acquire, // Being acquired
complete, // Backend contains all ledgers, requires finalizing
finalizing, // Being finalized
final, // Database verified, shard is immutable
enum class ShardState {
acquire, // Acquiring ledgers
complete, // Backend is ledger complete, database is unverified
finalizing, // Verifying database
finalized, // Database verified, shard is immutable
queued // Queued to be finalized
};

static constexpr State acquire = State::acquire;
static constexpr State complete = State::complete;
static constexpr State finalizing = State::finalizing;
static constexpr State final = State::final;
static constexpr State queued = State::queued;

} // namespace NodeStore
} // namespace ripple

#endif
17 changes: 17 additions & 0 deletions src/ripple/nodestore/impl/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ Database::Database(
: Stoppable(name, parent.getRoot())
, j_(journal)
, scheduler_(scheduler)
, ledgersPerShard_(get<std::uint32_t>(
config,
"ledgers_per_shard",
DEFAULT_LEDGERS_PER_SHARD))
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_(seqToShardIndex(earliestLedgerSeq_))
{
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>("Invalid ledgers_per_shard");

if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");

Expand Down Expand Up @@ -74,6 +82,15 @@ Database::onChildrenStopped()
stopped();
}

std::uint32_t
Database::maxLedgers(std::uint32_t shardIndex) const noexcept
{
if (shardIndex == earliestShardIndex_)
return lastLedgerSeq(shardIndex) - firstLedgerSeq(shardIndex) + 1;

return ledgersPerShard_;
}

void
Database::stopReadThreads()
{
Expand Down
Loading

0 comments on commit 2a0b097

Please sign in to comment.