Skip to content

Commit

Permalink
Extend peer shard info
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelportilla committed Mar 3, 2021
1 parent c0a0b79 commit ea7d7be
Show file tree
Hide file tree
Showing 32 changed files with 1,522 additions and 926 deletions.
1 change: 1 addition & 0 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/ShardInfo.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
main sources:
Expand Down
18 changes: 8 additions & 10 deletions src/ripple/basics/RangeSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,15 @@ template <class T>
std::string
to_string(RangeSet<T> const& rs)
{
using ripple::to_string;

if (rs.empty())
return "empty";
std::string res = "";

std::string s;
for (auto const& interval : rs)
{
if (!res.empty())
res += ",";
res += to_string(interval);
}
return res;
s += ripple::to_string(interval) + ",";
s.pop_back();

return s;
}

/** Convert the given styled string to a RangeSet.
Expand All @@ -122,13 +119,14 @@ to_string(RangeSet<T> const& rs)
@return True on successfully converting styled string
*/
template <class T>
bool
[[nodiscard]] bool
from_string(RangeSet<T>& rs, std::string const& s)
{
std::vector<std::string> intervals;
std::vector<std::string> tokens;
bool result{true};

rs.clear();
boost::split(tokens, s, boost::algorithm::is_any_of(","));
for (auto const& t : tokens)
{
Expand Down
92 changes: 86 additions & 6 deletions src/ripple/nodestore/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,79 @@ class Database : public Stoppable
void
onChildrenStopped() override;

/** @return The maximum number of ledgers stored in a shard
*/
[[nodiscard]] std::uint32_t
ledgersPerShard() const noexcept
{
return ledgersPerShard_;
}

/** @return The earliest ledger sequence allowed
*/
std::uint32_t
earliestLedgerSeq() const
[[nodiscard]] std::uint32_t
earliestLedgerSeq() const noexcept
{
return earliestLedgerSeq_;
}

/** @return The earliest shard index
*/
[[nodiscard]] std::uint32_t
earliestShardIndex() const noexcept
{
return earliestShardIndex_;
}

/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
if (shardIndex <= earliestShardIndex_)
return earliestLedgerSeq_;
return 1 + (shardIndex * ledgersPerShard_);
}

/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
[[nodiscard]] std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const noexcept
{
assert(shardIndex >= earliestShardIndex_);
return (shardIndex + 1) * ledgersPerShard_;
}

/** Calculates the shard index for a given ledger sequence
@param ledgerSeq ledger sequence
@return The shard index of the ledger sequence
*/
[[nodiscard]] std::uint32_t
seqToShardIndex(std::uint32_t ledgerSeq) const noexcept
{
assert(ledgerSeq >= earliestLedgerSeq_);
return (ledgerSeq - 1) / ledgersPerShard_;
}

/** Calculates the maximum ledgers for a given shard index
@param shardIndex The shard index considered
@return The maximum ledgers pertaining to the shard index
@note The earliest shard may store less if the earliest ledger
sequence truncates its beginning
*/
[[nodiscard]] std::uint32_t
maxLedgers(std::uint32_t shardIndex) const noexcept;

protected:
beast::Journal const j_;
Scheduler& scheduler_;
Expand All @@ -242,6 +307,25 @@ class Database : public Stoppable
std::atomic<std::uint32_t> fetchHitCount_{0};
std::atomic<std::uint32_t> fetchSz_{0};

// The default is DEFAULT_LEDGERS_PER_SHARD (16384) to match the XRP ledger
// network. Can be set through the configuration file using the
// 'ledgers_per_shard' field under the 'node_db' and 'shard_db' stanzas.
// If specified, the value must be a multiple of 256 and equally assigned
// in both stanzas. Only unit tests or alternate networks should change
// this value.
std::uint32_t const ledgersPerShard_;

// The default is XRP_LEDGER_EARLIEST_SEQ (32570) to match the XRP ledger
// network's earliest allowed ledger sequence. Can be set through the
// configuration file using the 'earliest_seq' field under the 'node_db'
// and 'shard_db' stanzas. If specified, the value must be greater than zero
// and equally assigned in both stanzas. Only unit tests or alternate
// networks should change this value.
std::uint32_t const earliestLedgerSeq_;

// The earliest shard index
std::uint32_t const earliestShardIndex_;

void
stopReadThreads();

Expand Down Expand Up @@ -293,10 +377,6 @@ class Database : public Stoppable
std::vector<std::thread> readThreads_;
bool readShut_{false};

// The default is 32570 to match the XRP ledger network's earliest
// allowed sequence. Alternate networks may set this value.
std::uint32_t const earliestLedgerSeq_;

virtual std::shared_ptr<NodeObject>
fetchNodeObject(
uint256 const& hash,
Expand Down
61 changes: 8 additions & 53 deletions src/ripple/nodestore/DatabaseShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#define RIPPLE_NODESTORE_DATABASESHARD_H_INCLUDED

#include <ripple/app/ledger/Ledger.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/nodestore/Database.h>
#include <ripple/nodestore/ShardInfo.h>
#include <ripple/nodestore/Types.h>

#include <boost/optional.hpp>
Expand Down Expand Up @@ -61,7 +61,7 @@ class DatabaseShard : public Database
@return `true` if the database initialized without error
*/
virtual bool
[[nodiscard]] virtual bool
init() = 0;

/** Prepare to store a new ledger in the shard being acquired
Expand All @@ -75,7 +75,7 @@ class DatabaseShard : public Database
between requests.
@implNote adds a new writable shard if necessary
*/
virtual boost::optional<std::uint32_t>
[[nodiscard]] virtual boost::optional<std::uint32_t>
prepareLedger(std::uint32_t validLedgerSeq) = 0;

/** Prepare one or more shard indexes to be imported into the database
Expand Down Expand Up @@ -118,7 +118,7 @@ class DatabaseShard : public Database
@param seq The sequence of the ledger
@return The ledger if found, nullptr otherwise
*/
virtual std::shared_ptr<Ledger>
[[nodiscard]] virtual std::shared_ptr<Ledger>
fetchLedger(uint256 const& hash, std::uint32_t seq) = 0;

/** Notifies the database that the given ledger has been
Expand All @@ -129,64 +129,19 @@ class DatabaseShard : public Database
virtual void
setStored(std::shared_ptr<Ledger const> const& ledger) = 0;

/** Query which complete shards are stored
@return the indexes of complete shards
*/
virtual std::string
getCompleteShards() = 0;

/** @return The maximum number of ledgers stored in a shard
*/
virtual std::uint32_t
ledgersPerShard() const = 0;

/** @return The earliest shard index
*/
virtual std::uint32_t
earliestShardIndex() const = 0;

/** Calculates the shard index for a given ledger sequence
/** Query information about shards held
@param seq ledger sequence
@return The shard index of the ledger sequence
@return Information about shards held by this node
*/
virtual std::uint32_t
seqToShardIndex(std::uint32_t seq) const = 0;

/** Calculates the first ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The first ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
firstLedgerSeq(std::uint32_t shardIndex) const = 0;

/** Calculates the last ledger sequence for a given shard index
@param shardIndex The shard index considered
@return The last ledger sequence pertaining to the shard index
*/
virtual std::uint32_t
lastLedgerSeq(std::uint32_t shardIndex) const = 0;
[[nodiscard]] virtual std::unique_ptr<ShardInfo>
getShardInfo() const = 0;

/** Returns the root database directory
*/
virtual boost::filesystem::path const&
getRootDir() const = 0;

/** The number of ledgers in a shard */
static constexpr std::uint32_t ledgersPerShardDefault{16384u};
};

constexpr std::uint32_t
seqToShardIndex(
std::uint32_t ledgerSeq,
std::uint32_t ledgersPerShard = DatabaseShard::ledgersPerShardDefault)
{
return (ledgerSeq - 1) / ledgersPerShard;
}

extern std::unique_ptr<DatabaseShard>
make_ShardStore(
Application& app,
Expand Down
123 changes: 123 additions & 0 deletions src/ripple/nodestore/ShardInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED
#define RIPPLE_NODESTORE_SHARDINFO_H_INCLUDED

#include <ripple/basics/RangeSet.h>
#include <ripple/nodestore/Types.h>
#include <ripple/nodestore/impl/Shard.h>
#include <ripple/protocol/messages.h>

namespace ripple {
namespace NodeStore {

/* Contains information on the status of shards for a node
*/
class ShardInfo
{
private:
class Incomplete
{
public:
Incomplete() = delete;
Incomplete(ShardState state, std::uint32_t percentProgress)
: state_(state), percentProgress_(percentProgress)
{
}

[[nodiscard]] ShardState
state() const noexcept
{
return state_;
}

[[nodiscard]] std::uint32_t
percentProgress() const noexcept
{
return percentProgress_;
}

private:
ShardState state_;
std::uint32_t percentProgress_;
};

public:
[[nodiscard]] NetClock::time_point const&
msgTimestamp() const
{
return msgTimestamp_;
}

void
setMsgTimestamp(NetClock::time_point const& timestamp)
{
msgTimestamp_ = timestamp;
}

[[nodiscard]] std::string
finalizedToString() const;

[[nodiscard]] bool
setFinalizedFromString(std::string const& str)
{
return from_string(finalized_, str);
}

[[nodiscard]] RangeSet<std::uint32_t> const&
finalized() const
{
return finalized_;
}

[[nodiscard]] std::string
incompleteToString() const;

[[nodiscard]] std::map<std::uint32_t, Incomplete> const&
incomplete() const
{
return incomplete_;
}

// Returns true if successful or false because of a duplicate index
bool
update(
std::uint32_t shardIndex,
ShardState state,
std::uint32_t percentProgress);

[[nodiscard]] protocol::TMPeerShardInfoV2
makeMessage(Application& app);

private:
// Finalized immutable shards
RangeSet<std::uint32_t> finalized_;

// Incomplete shards being acquired or finalized
std::map<std::uint32_t, Incomplete> incomplete_;

// Message creation time
NetClock::time_point msgTimestamp_;
};

} // namespace NodeStore
} // namespace ripple

#endif
Loading

0 comments on commit ea7d7be

Please sign in to comment.