Skip to content

Commit

Permalink
optimize window caches for ibd.
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Sep 4, 2024
1 parent 6ed8c71 commit baa761b
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 87 deletions.
1 change: 1 addition & 0 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ impl Consensus {
&services,
pruning_lock.clone(),
counters.clone(),

));

let body_processor = Arc::new(BlockBodyProcessor::new(
Expand Down
40 changes: 15 additions & 25 deletions consensus/src/pipeline/body_processor/body_validation_in_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use crate::{
model::stores::{ghostdag::GhostdagStoreReader, statuses::StatusesStoreReader},
processes::window::WindowManager,
};
use kaspa_consensus_core::{block::Block, constants::LOCK_TIME_THRESHOLD};
use kaspa_consensus_core::{block::Block};
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use once_cell::unsync::Lazy;
use std::sync::Arc;

impl BlockBodyProcessor {
Expand All @@ -25,34 +26,23 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
// calculating the past median time for the block may be expensive here, so we try and avoid it if possible
let pmt = 0u64; // we consider 0 to be an uninitialized default value
for tx in block.transactions.iter().filter(|tx| tx.lock_time > 0) {
if tx.lock_time < LOCK_TIME_THRESHOLD {
// will only check daa score
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(
tx,
block.header.daa_score,
pmt, // we don't need the past median time for this case
) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
} else if let Err(e) = self.transaction_validator.utxo_free_tx_validation(
tx,
block.header.daa_score,
// we most intialize the pmt value to the past median time of the block
if pmt > 0 {
pmt
} else {
self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?.0
},
) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
}

let pmt = Lazy::new(|| {
let (pmt, _) = self.window_manager.calc_past_median_time(
&self.ghostdag_store.get_data(block.hash()).unwrap()
).expect("expected header processor to have checked error case");
pmt
}
}
);

for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, &pmt) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
}
}
Ok(())
}


fn check_parent_bodies_exist(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
// TODO: Skip this check for blocks in PP anticone that comes as part of the pruning proof.
Expand Down
61 changes: 32 additions & 29 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,7 @@ use crate::{
relations::MTRelationsService,
},
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
headers::{DbHeadersStore, HeaderStoreReader},
past_pruning_points::DbPastPruningPointsStore,
pruning::{DbPruningStore, PruningStoreReader},
pruning_utxoset::PruningUtxosetStores,
reachability::DbReachabilityStore,
relations::{DbRelationsStore, RelationsStoreReader},
selected_chain::{DbSelectedChainStore, SelectedChainStore},
statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader},
tips::{DbTipsStore, TipsStoreReader},
utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader},
utxo_multisets::{DbUtxoMultisetsStore, UtxoMultisetsStoreReader},
virtual_state::{LkgVirtualState, VirtualState, VirtualStateStoreReader, VirtualStores},
DB,
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, block_window_cache::{BlockWindowCacheStore, BlockWindowHeap}, daa::DbDaaStore, depth::{DbDepthStore, DepthStoreReader}, ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader}, headers::{DbHeadersStore, HeaderStoreReader}, past_pruning_points::DbPastPruningPointsStore, pruning::{DbPruningStore, PruningStoreReader}, pruning_utxoset::PruningUtxosetStores, reachability::DbReachabilityStore, relations::{DbRelationsStore, RelationsStoreReader}, selected_chain::{DbSelectedChainStore, SelectedChainStore}, statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader}, tips::{DbTipsStore, TipsStoreReader}, utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader}, utxo_multisets::{DbUtxoMultisetsStore, UtxoMultisetsStoreReader}, virtual_state::{LkgVirtualState, VirtualState, VirtualStateStoreReader, VirtualStores}, DB
},
},
params::Params,
Expand All @@ -40,10 +23,7 @@ use crate::{
virtual_processor::utxo_validation::UtxoProcessingContext, ProcessingCounters,
},
processes::{
coinbase::CoinbaseManager,
ghostdag::ordering::SortableBlock,
transaction_validator::{errors::TxResult, transaction_validator_populated::TxValidationFlags, TransactionValidator},
window::WindowManager,
coinbase::CoinbaseManager, ghostdag::ordering::SortableBlock, past_median_time, transaction_validator::{errors::TxResult, transaction_validator_populated::TxValidationFlags, TransactionValidator}, window::{DaaWindow, WindowManager}
},
};
use kaspa_consensus_core::{
Expand Down Expand Up @@ -90,10 +70,7 @@ use rayon::{
};
use rocksdb::WriteBatch;
use std::{
cmp::min,
collections::{BinaryHeap, HashMap, VecDeque},
ops::Deref,
sync::{atomic::Ordering, Arc},
cmp::min, collections::{BinaryHeap, HashMap, VecDeque}, io::Write, ops::Deref, sync::{atomic::Ordering, Arc}
};

pub struct VirtualStateProcessor {
Expand Down Expand Up @@ -149,6 +126,10 @@ pub struct VirtualStateProcessor {
pub(super) parents_manager: DbParentsManager,
pub(super) depth_manager: DbBlockDepthManager,

// block window caches
pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,

Expand Down Expand Up @@ -206,6 +187,9 @@ impl VirtualStateProcessor {
pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
lkg_virtual_state: storage.lkg_virtual_state.clone(),

block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

ghostdag_manager: services.ghostdag_primary_manager.clone(),
reachability_service: services.reachability_service.clone(),
relations_service: services.relations_service.clone(),
Expand Down Expand Up @@ -483,7 +467,9 @@ impl VirtualStateProcessor {
// Calc virtual DAA score, difficulty bits and past median time
let virtual_daa_window = self.window_manager.block_daa_window(&virtual_ghostdag_data)?;
let virtual_bits = self.window_manager.calculate_difficulty_bits(&virtual_ghostdag_data, &virtual_daa_window);
let virtual_past_median_time = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?.0;
let (virtual_past_median_time, past_median_time_window) = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?;

self.maybe_commit_windows(ctx.selected_parent(), virtual_daa_window.window, past_median_time_window);

// Calc virtual UTXO state relative to selected parent
self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, virtual_daa_window.daa_score);
Expand Down Expand Up @@ -534,6 +520,23 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

fn maybe_commit_windows(
&self,
selected_parent: Hash,
daa_window: Arc<BlockWindowHeap>,
past_median_time_window: Arc<BlockWindowHeap>,
) {
// TODO: this only important for ibd performance, as we incur cache misses otherwise.
// We could optimize this by only committing the windows if virtual processor where to have explict knowledge of being in ibd.
if !self.block_window_cache_for_difficulty.contains_key(&selected_parent) {
self.block_window_cache_for_difficulty.insert(selected_parent, daa_window);
}

if !self.block_window_cache_for_past_median_time.contains_key(&selected_parent) {
self.block_window_cache_for_past_median_time.insert(selected_parent, past_median_time_window);
}
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
///
/// Guaranteed to be `>= self.max_block_parents`
Expand Down Expand Up @@ -761,7 +764,7 @@ impl VirtualStateProcessor {
args: &TransactionValidationArgs,
) -> TxResult<()> {
self.transaction_validator.validate_tx_in_isolation(&mutable_tx.tx)?;
self.transaction_validator.utxo_free_tx_validation(&mutable_tx.tx, virtual_daa_score, virtual_past_median_time)?;
self.transaction_validator.utxo_free_tx_validation(&mutable_tx.tx, virtual_daa_score, &virtual_past_median_time)?;
self.validate_mempool_transaction_in_utxo_context(mutable_tx, virtual_utxo_view, virtual_daa_score, args)?;
Ok(())
}
Expand Down Expand Up @@ -847,7 +850,7 @@ impl VirtualStateProcessor {
// No need to validate the transaction in isolation since we rely on the mining manager to submit transactions
// which were previously validated through `validate_mempool_transaction_and_populate`, hence we only perform
// in-context validations
self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, virtual_state.past_median_time)?;
self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, &virtual_state.past_median_time)?;
let ValidatedTransaction { calculated_fee, .. } =
self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?;
Ok(calculated_fee)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use super::{
};

impl TransactionValidator {
pub fn utxo_free_tx_validation(&self, tx: &Transaction, ctx_daa_score: u64, ctx_block_time: u64) -> TxResult<()> {
pub fn utxo_free_tx_validation(&self, tx: &Transaction, ctx_daa_score: u64, ctx_block_time: &u64) -> TxResult<()> {
self.check_tx_is_finalized(tx, ctx_daa_score, ctx_block_time)
}

fn check_tx_is_finalized(&self, tx: &Transaction, ctx_daa_score: u64, ctx_block_time: u64) -> TxResult<()> {
fn check_tx_is_finalized(&self, tx: &Transaction, ctx_daa_score: u64, ctx_block_time: &u64) -> TxResult<()> {
// Lock time of zero means the transaction is finalized.
if tx.lock_time == 0 {
return Ok(());
Expand All @@ -22,7 +22,7 @@ impl TransactionValidator {
// which the transaction is finalized or a timestamp depending on if the
// value is before the LOCK_TIME_THRESHOLD. When it is under the
// threshold it is a DAA score.
let block_time_or_daa_score = if tx.lock_time < LOCK_TIME_THRESHOLD { ctx_daa_score } else { ctx_block_time };
let block_time_or_daa_score = if tx.lock_time < LOCK_TIME_THRESHOLD { ctx_daa_score } else { *ctx_block_time };
if tx.lock_time < block_time_or_daa_score {
return Ok(());
}
Expand Down
125 changes: 95 additions & 30 deletions consensus/src/processes/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use kaspa_consensus_core::{
};
use kaspa_hashes::Hash;
use kaspa_math::Uint256;
use kaspa_utils::refs::Refs;
use kaspa_utils::{arc::ArcExtensions, refs::Refs};
use once_cell::unsync::Lazy;
use std::{cmp::Reverse, iter::once, ops::Deref, sync::Arc};

Expand Down Expand Up @@ -332,33 +332,9 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
WindowType::FullDifficultyWindow | WindowType::VaryingWindow(_) => None,
};

if let Some(cache) = cache {
if let Some(selected_parent_binary_heap) = cache.get(&ghostdag_data.selected_parent) {
// Only use the cached window if it originates from here
if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() {
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap();

let mut heap =
Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_binary_heap).clone()));
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}

return if let Ok(heap) = Lazy::into_value(heap) {
Ok(Arc::new(heap.binary_heap))
} else {
Ok(selected_parent_binary_heap.clone())
};
}
}
}
if let Some(window) = self.try_init_from_cache(cache, sample_rate, window_size, ghostdag_data, &mut mergeset_non_daa_inserter) {
return Ok(window);
};

let mut window_heap = BoundedSizeBlockHeap::new(WindowOrigin::Sampled, window_size);
let parent_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap();
Expand All @@ -376,16 +352,17 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:

let mut current_ghostdag = parent_ghostdag;

// Walk down the chain until we cross the window boundaries
// Walk down the chain until we cross the window boundaries, or merge with an ancestor cache.
loop {

if current_ghostdag.selected_parent.is_origin() {
// Reaching origin means there's no more data, so we expect the window to already be full, otherwise we err.
// This error can happen only during an IBD from pruning proof when processing the first headers in the pruning point's
// future, and means that the syncer did not provide sufficient trusted information for proper validation
if window_heap.reached_size_bound() {
break;
} else {
return Err(RuleError::InsufficientDaaWindowSize(window_heap.binary_heap.len()));
return Err(RuleError::InsufficientDaaWindowSize(window_heap.len()));
}
}

Expand All @@ -394,6 +371,12 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
}

let parent_ghostdag = self.ghostdag_store.get_data(current_ghostdag.selected_parent).unwrap();

// Try to extend the window from the cache
if self.try_merge_from_cache(cache, sample_rate, window_size, &parent_ghostdag, &mut window_heap, &mut mergeset_non_daa_inserter) {
return Ok(Arc::new(window_heap.binary_heap));
};

let selected_parent_blue_work_too_low =
self.try_push_mergeset(&mut window_heap, sample_rate, &current_ghostdag, parent_ghostdag.blue_work);
// No need to further iterate since past of selected parent has even lower blue work
Expand All @@ -407,6 +390,84 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
Ok(Arc::new(window_heap.binary_heap))
}

fn build_heap_from_selected_parent_binary_heap<'a>(
&'a self,
selected_parent_binary_heap: &'a Arc<BlockWindowHeap>,
current_ghostdag: &'a GhostdagData,
sample_rate: u64,
window_size: usize,
mut mergeset_non_daa_inserter: impl FnMut(Hash),
) -> Option<Arc<BlockWindowHeap>>

{

if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() {
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(current_ghostdag.selected_parent).unwrap();

let mut heap =
Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, selected_parent_binary_heap.clone().unwrap_or_clone()));
for block in self.sampled_mergeset_iterator(sample_rate, &current_ghostdag, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
};

return if let Ok(heap) = Lazy::into_value(heap) {
Some(Arc::new(heap.binary_heap))
} else {
Some(selected_parent_binary_heap.clone())
};
} else {
None
}
}

fn get_from_cache(&self, cache: Option<&Arc<U>>, hash: Hash) -> Option<Arc<BlockWindowHeap>> {
cache.and_then(|cache| cache.get(&hash).and_then(|heap| Some(heap.clone())))
}

fn try_init_from_cache(
&self,
cache: Option<&Arc<U>>,
sample_rate: u64,
window_size: usize,
current_ghostdag: &GhostdagData,
mergeset_non_daa_inserter: &mut impl FnMut(Hash)
) -> Option<Arc<BlockWindowHeap>> {
self.get_from_cache(cache, current_ghostdag.selected_parent).and_then(
|ref selected_parent_binary_heap| {
self.build_heap_from_selected_parent_binary_heap(selected_parent_binary_heap, current_ghostdag, sample_rate, window_size, mergeset_non_daa_inserter)
}
)
}

/// Tries to extend the given heap with the sampled mergeset from the cache and returns the new heap.
/// If no current window is given, it will return the new heap.
fn try_merge_from_cache<'a>(
&'a self,
cache: Option<&Arc<U>>,
sample_rate: u64,
window_size: usize,
current_ghostdag: &'a GhostdagData,
current_window: &'a mut BoundedSizeBlockHeap, // use None if there is `no-pre-existing BoundedSizeBlockHeap`
mergeset_non_daa_inserter: &mut impl FnMut(Hash),
) -> bool {
if let Some(selected_parent_binary_heap) = cache.and_then(|cache| cache.get(&current_ghostdag.selected_parent)) {
if let Some(ancestor_heap) = self.build_heap_from_selected_parent_binary_heap(&selected_parent_binary_heap, current_ghostdag, sample_rate, window_size - current_window.len(), mergeset_non_daa_inserter) {
for block in ancestor_heap.iter() {
current_window.try_push(block.0.hash, block.0.blue_work);
}
return true;
}
}
false
}

fn try_push_mergeset(
&self,
heap: &mut BoundedSizeBlockHeap,
Expand Down Expand Up @@ -686,4 +747,8 @@ impl BoundedSizeBlockHeap {
self.binary_heap.push(r_sortable_block);
true
}

fn len(&self) -> usize {
self.binary_heap.len()
}
}

0 comments on commit baa761b

Please sign in to comment.