Skip to content

Commit

Permalink
Maintain history back to the earliest persisted ledger:
Browse files Browse the repository at this point in the history
This makes behavior consistent with configurations both with and
without online delete.
  • Loading branch information
mtrippled authored and manojsdoshi committed Apr 7, 2020
1 parent a4e9878 commit e257a22
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 48 deletions.
7 changes: 1 addition & 6 deletions src/ripple/app/ledger/LedgerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STValidation.h>
#include <boost/optional.hpp>

#include <mutex>

Expand Down Expand Up @@ -280,11 +281,6 @@ class LedgerMaster
// Try to publish ledgers, acquire missing ledgers. Always called with
// m_mutex locked. The passed lock is a reminder to callers.
void doAdvance(std::unique_lock<std::recursive_mutex>&);
bool shouldAcquire(
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
std::uint32_t const ledgerHistoryIndex,
std::uint32_t const candidateLedger) const;

std::vector<std::shared_ptr<Ledger const>>
findNewLedgersToPublish(std::unique_lock<std::recursive_mutex>&);
Expand All @@ -295,7 +291,6 @@ class LedgerMaster
// The passed lock is a reminder to callers.
bool newPFWork(const char *name, std::unique_lock<std::recursive_mutex>&);

private:
Application& app_;
beast::Journal m_journal;

Expand Down
82 changes: 56 additions & 26 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <ripple/basics/Log.h>
#include <ripple/basics/TaggedCache.h>
#include <ripple/basics/UptimeClock.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/TimeKeeper.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Overlay.h>
Expand All @@ -47,6 +48,7 @@
#include <ripple/resource/Fees.h>
#include <algorithm>
#include <cassert>
#include <limits>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -140,6 +142,57 @@ static constexpr std::chrono::minutes MAX_LEDGER_AGE_ACQUIRE {1};
// Don't acquire history if write load is too high
static constexpr int MAX_WRITE_LOAD_ACQUIRE {8192};

// Helper function for LedgerMaster::doAdvance()
// Returns the minimum ledger sequence in SQL database, if any.
static boost::optional<LedgerIndex>
minSqlSeq(Application& app)
{
boost::optional<LedgerIndex> seq;
auto db = app.getLedgerDB().checkoutDb();
*db << "SELECT MIN(LedgerSeq) FROM Ledgers", soci::into(seq);
return seq;
}

// Helper function for LedgerMaster::doAdvance()
// Return true if candidateLedger should be fetched from the network.
static bool
shouldAcquire (
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
boost::optional<LedgerIndex> minSeq,
std::uint32_t const lastRotated,
std::uint32_t const candidateLedger,
beast::Journal j)
{
bool ret = [&]()
{
// Fetch ledger if it may be the current ledger
if (candidateLedger >= currentLedger)
return true;

// Or if it is within our configured history range:
if (currentLedger - candidateLedger <= ledgerHistory)
return true;

// Or it's greater than or equal to both:
// - the minimum persisted ledger or the maximum possible
// sequence value, if no persisted ledger, and
// - minimum ledger that will be persisted as of the next online
// deletion interval, or 1 if online deletion is disabled.
return
candidateLedger >= std::max(
minSeq.value_or(std::numeric_limits<LedgerIndex>::max()),
lastRotated + 1);
}();

JLOG (j.trace())
<< "Missing ledger "
<< candidateLedger
<< (ret ? " should" : " should NOT")
<< " be acquired";
return ret;
}

LedgerMaster::LedgerMaster (Application& app, Stopwatch& stopwatch,
Stoppable& parent,
beast::insight::Collector::ptr const& collector, beast::Journal journal)
Expand Down Expand Up @@ -1697,31 +1750,6 @@ LedgerMaster::releaseReplay ()
return std::move (replayData);
}

bool
LedgerMaster::shouldAcquire (
std::uint32_t const currentLedger,
std::uint32_t const ledgerHistory,
std::uint32_t const ledgerHistoryIndex,
std::uint32_t const candidateLedger) const
{

// Fetch ledger if it might be the current ledger,
// is requested by the advisory delete setting, or
// is within our configured history range

bool ret (candidateLedger >= currentLedger ||
((ledgerHistoryIndex > 0) &&
(candidateLedger > ledgerHistoryIndex)) ||
(currentLedger - candidateLedger) <= ledgerHistory);

JLOG (m_journal.trace())
<< "Missing ledger "
<< candidateLedger
<< (ret ? " should" : " should NOT")
<< " be acquired";
return ret;
}

void
LedgerMaster::fetchForHistory(
std::uint32_t missing,
Expand Down Expand Up @@ -1875,7 +1903,9 @@ void LedgerMaster::doAdvance (std::unique_lock<std::recursive_mutex>& sl)
"tryAdvance discovered missing " << *missing;
if ((mFillInProgress == 0 || *missing > mFillInProgress) &&
shouldAcquire(mValidLedgerSeq, ledger_history_,
app_.getSHAMapStore().getCanDelete(), *missing))
minSqlSeq(app_),
app_.getSHAMapStore().getLastRotated(), *missing,
m_journal))
{
JLOG(m_journal.trace()) <<
"advanceThread should acquire";
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/app/misc/SHAMapStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class SHAMapStore
/** Whether advisory delete is enabled. */
virtual bool advisoryDelete() const = 0;

/** Last ledger which was copied during rotation of backends. */
/** Maximum ledger that has been deleted, or will be deleted if
* currently in the act of online deletion.
*/
virtual LedgerIndex getLastRotated() = 0;

/** Highest ledger that may be deleted. */
Expand Down
28 changes: 14 additions & 14 deletions src/ripple/app/misc/SHAMapStoreImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void
SHAMapStoreImp::run()
{
beast::setCurrentThreadName ("SHAMapStore");
LedgerIndex lastRotated = state_db_.getState().lastRotated;
lastRotated_ = state_db_.getState().lastRotated;
netOPs_ = &app_.getOPs();
ledgerMaster_ = &app_.getLedgerMaster();
fullBelowCache_ = &app_.family().fullbelow();
Expand All @@ -333,7 +333,7 @@ SHAMapStoreImp::run()
if (advisoryDelete_)
canDelete_ = state_db_.getCanDelete ();

while (1)
while (true)
{
healthy_ = true;
std::shared_ptr<Ledger const> validatedLedger;
Expand All @@ -356,20 +356,20 @@ SHAMapStoreImp::run()
continue;
}

LedgerIndex validatedSeq = validatedLedger->info().seq;
if (!lastRotated)
LedgerIndex const validatedSeq = validatedLedger->info().seq;
if (!lastRotated_)
{
lastRotated = validatedSeq;
state_db_.setLastRotated (lastRotated);
lastRotated_ = validatedSeq;
state_db_.setLastRotated (lastRotated_);
}

// will delete up to (not including) lastRotated)
if (validatedSeq >= lastRotated + deleteInterval_
&& canDelete_ >= lastRotated - 1)
// will delete up to (not including) lastRotated_
if (validatedSeq >= lastRotated_ + deleteInterval_
&& canDelete_ >= lastRotated_ - 1)
{
JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq
<< " lastRotated " << lastRotated << " deleteInterval "
<< deleteInterval_ << " canDelete_ " << canDelete_;
<< " lastRotated_ " << lastRotated_ << " deleteInterval "
<< deleteInterval_ << " canDelete_ " << canDelete_;

switch (health())
{
Expand All @@ -383,7 +383,7 @@ SHAMapStoreImp::run()
;
}

clearPrior (lastRotated);
clearPrior (lastRotated_);
switch (health())
{
case Health::stopping:
Expand Down Expand Up @@ -448,13 +448,13 @@ SHAMapStoreImp::run()

std::string nextArchiveDir =
dbRotating_->getWritableBackend()->getName();
lastRotated = validatedSeq;
lastRotated_ = validatedSeq;
std::shared_ptr<NodeStore::Backend> oldBackend;
{
std::lock_guard lock (dbRotating_->peekMutex());

state_db_.setState (SavedState {newBackend->getName(),
nextArchiveDir, lastRotated});
nextArchiveDir, lastRotated_});
clearCaches (validatedSeq);
oldBackend = dbRotating_->rotateBackends(
std::move(newBackend),
Expand Down
10 changes: 9 additions & 1 deletion src/ripple/app/misc/SHAMapStoreImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ripple/core/DatabaseCon.h>
#include <ripple/nodestore/DatabaseRotating.h>

#include <atomic>
#include <condition_variable>
#include <thread>

Expand Down Expand Up @@ -84,6 +85,13 @@ class SHAMapStoreImp : public SHAMapStore
static std::uint32_t const minimumDeletionInterval_ = 256;
// minimum # of ledgers required for standalone mode.
static std::uint32_t const minimumDeletionIntervalSA_ = 8;
// Ledger sequence at which the last deletion interval was triggered,
// or the current validated sequence as of first use
// if there have been no prior deletions. Deletion occurs up to (but
// not including) this value. All ledgers past this value are accumulated
// until the next online deletion. This value is persisted to SQLite
// nearly immediately after modification.
std::atomic<LedgerIndex> lastRotated_ {};

NodeStore::Scheduler& scheduler_;
beast::Journal const journal_;
Expand Down Expand Up @@ -159,7 +167,7 @@ class SHAMapStoreImp : public SHAMapStore
LedgerIndex
getLastRotated() override
{
return state_db_.getState().lastRotated;
return lastRotated_;
}

// All ledgers before and including this are unprotected
Expand Down

0 comments on commit e257a22

Please sign in to comment.