Skip to content

Commit

Permalink
Cleanup the 'PeerSet' hierarchy:
Browse files Browse the repository at this point in the history
This commit introduces no functional changes but cleans up the
code and shrinks the surface area by removing dead and unused
code, leveraging std:: alternatives to hand-rolled code and
improving comments and documentation.
  • Loading branch information
thejohnfreeman authored and nbougalis committed May 5, 2020
1 parent d025f3f commit 5b5226d
Show file tree
Hide file tree
Showing 25 changed files with 357 additions and 570 deletions.
8 changes: 4 additions & 4 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: true
BinPackArguments: false
BinPackParameters: false
BraceWrapping:
BraceWrapping:
AfterClass: true
AfterControlStatement: true
AfterEnum: false
Expand All @@ -43,8 +43,8 @@ Cpp11BracedListStyle: true
DerivePointerAlignment: false
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
IncludeCategories:
ForEachMacros: [ Q_FOREACH, BOOST_FOREACH ]
IncludeCategories:
- Regex: '^<(BeastConfig)'
Priority: 0
- Regex: '^<(ripple)/'
Expand Down Expand Up @@ -84,4 +84,4 @@ SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Cpp11
TabWidth: 8
UseTab: Never
UseTab: Never
4 changes: 2 additions & 2 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
app_.overlay().foreach (send_always(
app_.overlay().foreach(send_always(
std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
}
else
Expand Down Expand Up @@ -709,7 +709,7 @@ RCLConsensus::Adaptor::notify(
}
s.set_firstseq(uMin);
s.set_lastseq(uMax);
app_.overlay().foreach (
app_.overlay().foreach(
send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
JLOG(j_.trace()) << "send status change to peer";
}
Expand Down
64 changes: 48 additions & 16 deletions src/ripple/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
namespace ripple {

// A ledger we are trying to acquire
class InboundLedger : public PeerSet,
public std::enable_shared_from_this<InboundLedger>,
public CountedObject<InboundLedger>
class InboundLedger final : public PeerSet,
public std::enable_shared_from_this<InboundLedger>,
public CountedObject<InboundLedger>
{
public:
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;

static char const*
getCountedObjectName()
{
Expand All @@ -62,14 +64,24 @@ class InboundLedger : public PeerSet,

~InboundLedger();

// Called when the PeerSet timer expires
void
execute() override;

// Called when another attempt is made to fetch this same ledger
void
update(std::uint32_t seq);

/** Returns true if we got all the data. */
bool
isComplete() const
{
return mComplete;
}

/** Returns false if we failed to get the data. */
bool
isFailed() const
{
return mFailed;
}

std::shared_ptr<Ledger const>
getLedger() const
{
Expand All @@ -82,12 +94,6 @@ class InboundLedger : public PeerSet,
return mSeq;
}

Reason
getReason() const
{
return mReason;
}

bool
checkLocal();
void
Expand All @@ -108,8 +114,17 @@ class InboundLedger : public PeerSet,
void
runData();

static LedgerInfo
deserializeHeader(Slice data, bool hasPrefix);
void
touch()
{
mLastAction = m_clock.now();
}

clock_type::time_point
getLastAction() const
{
return mLastAction;
}

private:
enum class TriggerReason { added, reply, timeout };
Expand Down Expand Up @@ -137,14 +152,20 @@ class InboundLedger : public PeerSet,
onTimer(bool progress, ScopedLockType& peerSetLock) override;

void
newPeer(std::shared_ptr<Peer> const& peer) override
queueJob() override;

void
onPeerAdded(std::shared_ptr<Peer> const& peer) override
{
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
if (mReason != Reason::HISTORY)
trigger(peer, TriggerReason::added);
}

std::size_t
getPeerCount() const;

std::weak_ptr<PeerSet>
pmDowncast() override;

Expand Down Expand Up @@ -179,6 +200,9 @@ class InboundLedger : public PeerSet,
std::vector<uint256>
neededStateHashes(int max, SHAMapSyncFilter* filter) const;

clock_type& m_clock;
clock_type::time_point mLastAction;

std::shared_ptr<Ledger> mLedger;
bool mHaveHeader;
bool mHaveState;
Expand All @@ -198,6 +222,14 @@ class InboundLedger : public PeerSet,
bool mReceiveDispatched;
};

/** Deserialize a ledger header from a byte array. */
LedgerInfo
deserializeHeader(Slice data);

/** Deserialize a ledger header (prefixed with 4 bytes) from a byte array. */
LedgerInfo
deserializePrefixedHeader(Slice data);

} // namespace ripple

#endif
6 changes: 0 additions & 6 deletions src/ripple/app/ledger/InboundLedgers.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,9 @@ class InboundLedgers
std::shared_ptr<Peer>,
std::shared_ptr<protocol::TMLedgerData>) = 0;

virtual void
doLedgerData(LedgerHash hash) = 0;

virtual void
gotStaleData(std::shared_ptr<protocol::TMLedgerData> packet) = 0;

virtual int
getFetchCount(int& timeoutCount) = 0;

virtual void
logFailure(uint256 const& h, std::uint32_t seq) = 0;

Expand Down
34 changes: 21 additions & 13 deletions src/ripple/app/ledger/InboundTransactions.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,35 @@ class InboundTransactions

virtual ~InboundTransactions() = 0;

/** Retrieves a transaction set by hash
/** Find and return a transaction set, or nullptr if it is missing.
*
* @param setHash The transaction set ID (digest of the SHAMap root node).
* @param acquire Whether to fetch the transaction set from the network if
* it is missing.
* @return The transaction set with ID setHash, or nullptr if it is
* missing.
*/
virtual std::shared_ptr<SHAMap>
getSet(uint256 const& setHash, bool acquire) = 0;

/** Gives data to an inbound transaction set
/** Add a transaction set from a LedgerData message.
*
* @param setHash The transaction set ID (digest of the SHAMap root node).
* @param peer The peer that sent the message.
* @param message The LedgerData message.
*/
virtual void
gotData(
uint256 const& setHash,
std::shared_ptr<Peer>,
std::shared_ptr<protocol::TMLedgerData>) = 0;

/** Gives set to the container
std::shared_ptr<Peer> peer,
std::shared_ptr<protocol::TMLedgerData> message) = 0;

/** Add a transaction set.
*
* @param setHash The transaction set ID (should match set.getHash()).
* @param set The transaction set.
* @param acquired Whether this transaction set was acquired from a peer,
* or constructed by ourself during consensus.
*/
virtual void
giveSet(
Expand All @@ -70,18 +85,11 @@ class InboundTransactions
*/
virtual void
newRound(std::uint32_t seq) = 0;

virtual Json::Value
getInfo() = 0;

virtual void
onStop() = 0;
};

std::unique_ptr<InboundTransactions>
make_InboundTransactions(
Application& app,
InboundTransactions::clock_type& clock,
Stoppable& parent,
beast::insight::Collector::ptr const& collector,
std::function<void(std::shared_ptr<SHAMap> const&, bool)> gotSet);
Expand Down
81 changes: 81 additions & 0 deletions src/ripple/app/ledger/InboundTransactions.uml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
@startuml
box "Server 1"
participant timer1 as "std::timer"
participant jq1 as "JobQueue"
participant ib1 as "InboundTransactions"
participant ta1 as "TransactionAcquire"
participant pi1 as "PeerImp"
end box

box "Server 2"
participant pi2 as "PeerImp"
participant jq2 as "JobQueue"
participant ib2 as "InboundTransactions"
end box

autoactivate on
[-> ib1 : getSet(rootHash)
ib1 -> ta1 : TransactionAcquire(rootHash)
return
ib1 -> ta1 : init(numPeers=2)
ta1 -> ta1 : addPeers(limit=2)
ta1 -> ta1 : onPeerAdded(peer)
ta1 -> ta1 : trigger(peer)
ta1 -> pi1 : send(TMGetLedger)
return
deactivate ta1
deactivate ta1
deactivate ta1
ta1 -> ta1 : setTimer()
ta1 -> timer1 : wait(() -> TransactionAcquire.queueJob())
return
deactivate ta1
return
return empty SHAMap

...
pi1 -> pi2 : onMessage(TMGetLedger)
pi2 -> jq2 : addJob(() -> PeerImp.getLedger(message))
return
deactivate pi2

...
jq2 -> pi2 : getLedger(message)
pi2 -> ib2 : getSet(rootHash)
|||
return SHAMap
pi2 -> pi2 : send(TMLedgerData)
deactivate pi2
deactivate pi2

...
pi2 -> pi1 : onMessage(TMLedgerData)
pi1 -> jq1 : addJob(() -> InboundTransactions.gotData(hash, message))
return
deactivate pi1

...
jq1 -> ib1 : gotData(hash, message)
ib1 -> ta1 : takeNodes(nodeIDs, blobs)
return useful | invalid
deactivate ib1

...
timer1 -> ta1 : queueJob()
ta1 -> jq1 : addJob(() -> TransactionAcquire.invokeOnTimer())
return
deactivate ta1

...
jq1 -> ta1 : invokeOnTimer()
ta1 -> ta1 : onTimer()
ta1 -> ta1 : addPeers(limit=1)
ta1 -> ta1 : onPeerAdded(peer)
ta1 -> ta1 : trigger(peer)
note right: mComplete = true;
deactivate ta1
deactivate ta1
deactivate ta1
deactivate ta1
deactivate ta1
@enduml
Loading

0 comments on commit 5b5226d

Please sign in to comment.