Skip to content

Commit

Permalink
Merge pull request #2069 from jamescowens/cull_beacon_db_memory
Browse files Browse the repository at this point in the history
beacon: Cull beacon db memory (passivation)
  • Loading branch information
jamescowens authored Mar 27, 2021
2 parents a8f6fbd + d036b82 commit f98753c
Show file tree
Hide file tree
Showing 6 changed files with 860 additions and 1,224 deletions.
175 changes: 144 additions & 31 deletions src/gridcoin/beacon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ bool BeaconRegistry::TryRenewal(Beacon_ptr& current_beacon_ptr, int& height, con
// Place a smart shared pointer to the renewed beacon in the active beacons map. Note that the
// subscript form of the insert with the same key replaces the current beacon entry with the
//renewal.
m_beacons[payload.m_cpid] = std::make_shared<Beacon>(renewal_iter->second);
m_beacons[payload.m_cpid] = renewal_iter->second;

return true;
}
Expand Down Expand Up @@ -446,7 +446,7 @@ void BeaconRegistry::Add(const ContractContext& ctx)
historical.GetAddress().ToString(),
historical.m_hash.GetHex());
}
m_beacons[payload.m_cpid] = std::make_shared<Beacon>(m_beacon_db[ctx.m_tx.GetHash()]);
m_beacons[payload.m_cpid] = m_beacon_db.find(ctx.m_tx.GetHash())->second;
return;
}

Expand Down Expand Up @@ -480,7 +480,7 @@ void BeaconRegistry::Add(const ContractContext& ctx)
}

// Insert a pointer to the entry in the m_pending map.
m_pending[pending.GetId()] = std::make_shared<Beacon>(m_beacon_db[ctx.m_tx.GetHash()]);
m_pending[pending.GetId()] = m_beacon_db.find(ctx.m_tx.GetHash())->second;
}

void BeaconRegistry::Delete(const ContractContext& ctx)
Expand Down Expand Up @@ -652,11 +652,11 @@ void BeaconRegistry::Revert(const ContractContext& ctx)
if (resurrect_iter != m_beacon_db.end())
{
// This is an element to the desired entry in the historical table.
Beacon& resurrected_beacon = resurrect_iter->second;
Beacon_ptr& resurrected_beacon = resurrect_iter->second;

// Resurrect the prior beacon.
// ------------- cpid -------------- smart shared pointer to element in historical map.
m_beacons[cpid] = std::make_shared<Beacon>(resurrected_beacon);
m_beacons[cpid] = resurrected_beacon;
}

// Erase the renewal record in the db that was reverted. No reason to keep it.
Expand Down Expand Up @@ -688,12 +688,12 @@ void BeaconRegistry::Revert(const ContractContext& ctx)

if (deleted_beacon_record != m_beacon_db.end())
{
auto record_to_restore = m_beacon_db.find(deleted_beacon_record->second.m_prev_beacon_hash);
auto record_to_restore = m_beacon_db.find(deleted_beacon_record->second->m_prev_beacon_hash);

if (record_to_restore != m_beacon_db.end())
{
// Get a smart shared pointer to the beacon to restore
Beacon_ptr beacon_to_restore_ptr = std::make_shared<Beacon>(record_to_restore->second);
Beacon_ptr beacon_to_restore_ptr = record_to_restore->second;

// Check the beacon's status. If it was ACTIVE or RENEWAL, put it back in the m_beacons map
// under the cpid.
Expand Down Expand Up @@ -866,8 +866,7 @@ void BeaconRegistry::ActivatePending(

// This is the subscript form of insert. Important here because an activated beacon should
// overwrite any existing entry in the m_beacons map.
m_beacons[activated_beacon.m_cpid] =
std::make_shared<Beacon>(m_beacon_db[activated_beacon.m_hash]);
m_beacons[activated_beacon.m_cpid] = m_beacon_db.find(activated_beacon.m_hash)->second;

// Remove the pending beacon entry from the pending map. (Note this entry still exists in the historical
// table and the db.
Expand Down Expand Up @@ -940,8 +939,8 @@ void BeaconRegistry::Deactivate(const uint256 superblock_hash)
}

// Resurrect the pending record prior to the activation. This points to the pending record still in the db.
m_pending[static_cast<PendingBeacon>(pending_beacon_entry->second).GetId()] =
std::make_shared<Beacon>(pending_beacon_entry->second);
m_pending[static_cast<PendingBeacon>(*pending_beacon_entry->second).GetId()] =
pending_beacon_entry->second;

// Erase the entry from the active beacons map. This also increments the iterator.
iter = m_beacons.erase(iter);
Expand All @@ -964,22 +963,22 @@ void BeaconRegistry::Deactivate(const uint256 superblock_hash)
while (iter != m_beacon_db.end())
{
// The cpid in the historical beacon record to be matched.
Cpid cpid = iter->second.m_cpid;
Cpid cpid = iter->second->m_cpid;

uint256 match_hash = Hash(superblock_hash.begin(),
superblock_hash.end(),
iter->second.m_prev_beacon_hash.begin(),
iter->second.m_prev_beacon_hash.end());
iter->second->m_prev_beacon_hash.begin(),
iter->second->m_prev_beacon_hash.end());

// If the calculated match_hash matches the key (hash) of the historical beacon record, then
// restore the previous record pointed to by the historical beacon record to the pending map.
if (match_hash == iter->first)
{
uint256 resurrect_pending_hash = iter->second.m_prev_beacon_hash;
uint256 resurrect_pending_hash = iter->second->m_prev_beacon_hash;

if (!resurrect_pending_hash.IsNull())
{
Beacon_ptr resurrected_pending = std::make_shared<Beacon>(m_beacon_db.find(resurrect_pending_hash)->second);
Beacon_ptr resurrected_pending = m_beacon_db.find(resurrect_pending_hash)->second;

// Check that the status of the beacon to resurrect is PENDING. If it is not log an error but continue
// anyway.
Expand Down Expand Up @@ -1137,6 +1136,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
}

uint64_t recnum_high_watermark = 0;
uint64_t number_passivated = 0;

// Replay the storage map. The iterator is ordered by record number, which ensures that the correct
// elements end up in m_beacons and m_pending. Storage entries that are "mark deletions" are also inserted
Expand All @@ -1162,8 +1162,15 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&

// Insert the entry into the historical map. This includes ctx's where the beacon is marked deleted.
// --------------- hash ---------- does NOT include the Cpid.
m_historical[iter.second.m_hash] = beacon;
Beacon& historical_beacon = m_historical[iter.second.m_hash];
m_historical[iter.second.m_hash] = std::make_shared<Beacon>(beacon);
Beacon_ptr& historical_beacon_ptr = m_historical[iter.second.m_hash];

BeaconRegistry::HistoricalBeaconMap::iterator prev_historical_iter = m_historical.end();

if (!historical_beacon_ptr->m_prev_beacon_hash.IsNull())
{
prev_historical_iter = m_historical.find(historical_beacon_ptr->m_prev_beacon_hash);
}

if (beacon.m_status == BeaconStatusForStorage::PENDING)
{
Expand All @@ -1180,7 +1187,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
);

// Insert the pending beacon in the pending map.
m_pending[beacon.GetId()] = std::make_shared<Beacon>(historical_beacon);
m_pending[beacon.GetId()] = historical_beacon_ptr;
}

if (beacon.m_status == BeaconStatusForStorage::ACTIVE || beacon.m_status == BeaconStatusForStorage::RENEWAL)
Expand All @@ -1198,7 +1205,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
);

// Insert or replace the existing map entry for the cpid with the latest active or renewed for that CPID.
m_beacons[beacon.m_cpid] = std::make_shared<Beacon>(historical_beacon);
m_beacons[beacon.m_cpid] = historical_beacon_ptr;

// Delete any entry in the pending map with THE SAME public key.
auto pending_to_delete = m_pending.find(beacon.GetId());
Expand Down Expand Up @@ -1256,11 +1263,28 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
m_beacons.erase(beacon.m_cpid);
m_pending.erase(beacon.m_public_key.GetID());
}

if (prev_historical_iter != m_historical.end())
{
// Note that passivation is not expected to be successful for every call. See the comments
// in the passivate() function.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool> passivation_result
= passivate(prev_historical_iter);

number_passivated += passivation_result.second;
}
}

LogPrint(LogFlags::BEACON, "INFO: %s: number of historical records passivated: %" PRId64 ".",
__func__,
number_passivated);

// Set the in-memory record number stored variable to the highest recnum encountered during the replay above.
m_recnum_stored = recnum_high_watermark;

// Set the needs passivation flag to true, because the one-by-one passivation done above may not catch everything.
m_needs_passivation = true;

return height;
}

Expand All @@ -1271,6 +1295,26 @@ void BeaconRegistry::ResetInMemoryOnly()
m_beacon_db.clear_in_memory_only();
}

uint64_t BeaconRegistry::PassivateDB()
{
return m_beacon_db.passivate_db();
}

// This is static and called by the scheduler.
void BeaconRegistry::RunBeaconDBPassivation()
{
TRY_LOCK(cs_main, locked_main);

if (!locked_main)
{
return;
}

BeaconRegistry& beacons = GetBeaconRegistry();

beacons.PassivateDB();
}

// Required to make the linker happy.
constexpr uint32_t BeaconRegistry::BeaconDB::CURRENT_VERSION;

Expand All @@ -1280,6 +1324,7 @@ void BeaconRegistry::BeaconDB::clear_in_memory_only()
m_database_init = false;
m_height_stored = 0;
m_recnum_stored = 0;
m_needs_passivation = false;
}

bool BeaconRegistry::BeaconDB::clear_leveldb()
Expand All @@ -1305,11 +1350,41 @@ bool BeaconRegistry::BeaconDB::clear_leveldb()
m_height_stored = 0;
m_recnum_stored = 0;
m_database_init = false;
m_needs_passivation = false;
m_needs_IsContract_correction = false;

return status;
}

uint64_t BeaconRegistry::BeaconDB::passivate_db()
{
uint64_t number_passivated = 0;

// Don't bother to go through the historical beacon map unless the needs passivation flag is set. This makes
// this function extremely light for most calls from the periodic schedule.
if (m_needs_passivation)
{
for (auto iter = m_historical.begin(); iter != m_historical.end(); /*no-op*/)
{
// The passivate function increments the iterator.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool> result = passivate(iter);

iter = result.first;
number_passivated += result.second;

}
}

LogPrint(BCLog::LogFlags::BEACON, "INFO %s: Passivated %" PRId64 " elements from beacon db.",
__func__,
number_passivated);

// Set needs passivation flag to false after passivating the db.
m_needs_passivation = false;

return number_passivated;
}

bool BeaconRegistry::BeaconDB::clear()
{
clear_in_memory_only();
Expand Down Expand Up @@ -1403,17 +1478,21 @@ bool BeaconRegistry::BeaconDB::insert(const uint256 &hash, const int& height, co
beacon.m_status.Raw() // status
);

m_historical.insert(std::make_pair(hash, beacon));
m_historical.insert(std::make_pair(hash, std::make_shared<Beacon>(beacon)));

status = Store(hash, static_cast<StorageBeacon>(beacon));

if (height) status &= StoreDBHeight(height);

// Set needs passivation flag to true to allow the scheduled passivation to remove unnecessary records from
// memory.
m_needs_passivation = true;

return status;
}
}

bool BeaconRegistry::BeaconDB::erase(uint256 hash)
bool BeaconRegistry::BeaconDB::erase(const uint256& hash)
{
auto iter = m_historical.find(hash);

Expand All @@ -1425,6 +1504,42 @@ bool BeaconRegistry::BeaconDB::erase(uint256 hash)
return Delete(hash);
}

// Note that this function uses the shared pointer use_count() to determine whether an element in
// m_historical is referenced by either the m_beacons or m_pending map and if not, erases it, leaving the backing
// state in leveldb untouched. Note that the use of use_count() in multithreaded environments must be carefully
// considered because it is only approximate. In this case it is exact. Access to the entire BeaconRegistry class
// and everything in it is protected by the cs_main lock and is therefore single threaded. This method of passivating
// is MUCH faster than searching through m_beacons and m_pending for each element, because they are not keyed by hash.
//
// Note that this function acts very similarly to the map erase function with an iterator argument, but with a standard
// pair returned. The first part of the pair a boolean as to whether the element was passivated, and the
// second is is an iterator to the next element. This is designed to be traversed in a for loop just like map erase.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool>
BeaconRegistry::BeaconDB::passivate(BeaconRegistry::HistoricalBeaconMap::iterator& iter)
{
// m-historical itself holds one reference, additional references can be held by m_beacons and m_pending.
// If there is only one reference then remove the shared_pointer from m_historical, which will implicitly destroy
// the shared_pointer object.
if (iter->second.use_count() == 1)
{
iter = m_historical.erase(iter);
return std::make_pair(iter, true);
}
else
{
LogPrint(BCLog::LogFlags::BEACON, "INFO: %s: Passivate called for historical beacon record with hash %s that "
"has existing reference count %li. This is expected under certain situations, "
"such as a forced (re)advertisement, where the new pending beacon is allowed "
"to expire, while the original is still active.",
__func__,
iter->second->m_hash.GetHex(),
iter->second.use_count());

++iter;
return std::make_pair(iter, false);
}
}

BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::begin()
{
return m_historical.begin();
Expand All @@ -1435,7 +1550,7 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::end()
return m_historical.end();
}

BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(uint256& hash)
BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(const uint256& hash)
{
// See if beacon from that ctx_hash is already in the historical map. If so, get iterator.
auto iter = m_historical.find(hash);
Expand All @@ -1448,7 +1563,10 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(uin
// If the load from leveldb is successful, insert into the historical map and return the iterator.
if (Load(hash, beacon))
{
iter = m_historical.insert(std::make_pair(hash, static_cast<Beacon>(beacon))).first;
iter = m_historical.insert(std::make_pair(hash, std::make_shared<Beacon>(static_cast<Beacon>(beacon)))).first;

// Set the needs passivation flag to true
m_needs_passivation = true;
}
}

Expand All @@ -1463,11 +1581,6 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::advance(
return ++iter;
}

Beacon& BeaconRegistry::BeaconDB::operator[](const uint256& hash)
{
return m_historical[hash];
}

bool BeaconRegistry::BeaconDB::Store(const uint256& hash, const StorageBeacon& beacon)
{
CTxDB txdb("rw");
Expand All @@ -1479,7 +1592,7 @@ bool BeaconRegistry::BeaconDB::Store(const uint256& hash, const StorageBeacon& b
return txdb.WriteGenericSerializable(key, std::make_pair(m_recnum_stored, beacon));
}

bool BeaconRegistry::BeaconDB::Load(uint256& hash, StorageBeacon& beacon)
bool BeaconRegistry::BeaconDB::Load(const uint256& hash, StorageBeacon& beacon)
{
CTxDB txdb("r");

Expand All @@ -1494,7 +1607,7 @@ bool BeaconRegistry::BeaconDB::Load(uint256& hash, StorageBeacon& beacon)
return status;
}

bool BeaconRegistry::BeaconDB::Delete(uint256& hash)
bool BeaconRegistry::BeaconDB::Delete(const uint256& hash)
{
CTxDB txdb("rw");

Expand Down
Loading

0 comments on commit f98753c

Please sign in to comment.