diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index bb59fcb8b2..90b5b95425 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -95,8 +95,8 @@ pub struct Consensus { pub ghostdag_store: Arc, // Services and managers - statuses_service: Arc>, - relations_service: Arc>, + statuses_service: MTStatusesService, + relations_service: MTRelationsService, reachability_service: MTReachabilityService, pub(super) difficulty_manager: DifficultyManager, pub(super) dag_traversal_manager: DagTraversalManager, @@ -154,8 +154,8 @@ impl Consensus { // Services and managers // - let statuses_service = Arc::new(MTStatusesService::new(statuses_store.clone())); - let relations_service = Arc::new(MTRelationsService::new(relations_store.clone())); + let statuses_service = MTStatusesService::new(statuses_store.clone()); + let relations_service = MTRelationsService::new(relations_store.clone()); let reachability_service = MTReachabilityService::new(reachability_store.clone()); let dag_traversal_manager = DagTraversalManager::new( params.genesis_hash, @@ -287,7 +287,7 @@ impl Consensus { past_median_time_manager.clone(), dag_traversal_manager.clone(), difficulty_manager.clone(), - depth_manager, + depth_manager.clone(), pruning_manager.clone(), parents_manager.clone(), counters.clone(), @@ -332,6 +332,7 @@ impl Consensus { virtual_state_store, ghostdag_manager.clone(), reachability_service.clone(), + relations_service.clone(), dag_traversal_manager.clone(), difficulty_manager.clone(), coinbase_manager.clone(), @@ -339,6 +340,7 @@ impl Consensus { past_median_time_manager.clone(), pruning_manager.clone(), parents_manager, + depth_manager, )); Self { diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index 55999eff35..b29bdf30cc 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -1,5 +1,6 @@ use std::{ env, fs, + path::PathBuf, sync::{Arc, Weak}, thread::JoinHandle, }; @@ -195,6 +196,12 @@ impl TempDbLifetime { pub fn new(tempdir: tempfile::TempDir, weak_db_ref: Weak) -> Self { Self { tempdir: Some(tempdir), weak_db_ref } } + + /// Tracks the DB reference and makes sure all strong refs are cleaned up + /// but does not remove the DB from disk when dropped. + pub fn without_destroy(weak_db_ref: Weak) -> Self { + Self { tempdir: None, weak_db_ref } + } } impl Drop for TempDbLifetime { @@ -223,10 +230,25 @@ pub fn create_temp_db() -> (TempDbLifetime, Arc) { let global_tempdir = env::temp_dir(); let kaspa_tempdir = global_tempdir.join("kaspa-rust"); fs::create_dir_all(kaspa_tempdir.as_path()).unwrap(); - let db_tempdir = tempfile::tempdir_in(kaspa_tempdir.as_path()).unwrap(); let db_path = db_tempdir.path().to_owned(); - let db = Arc::new(DB::open_default(db_path.to_str().unwrap()).unwrap()); (TempDbLifetime::new(db_tempdir, Arc::downgrade(&db)), db) } + +/// Creates a DB within the provided directory path. +/// Callers must keep the `TempDbLifetime` guard for as long as they wish the DB instance to exist. +pub fn create_permanent_db(db_path: String) -> (TempDbLifetime, Arc) { + let db_dir = PathBuf::from(db_path); + fs::create_dir(db_dir.as_path()).unwrap(); + let db = Arc::new(DB::open_default(db_dir.to_str().unwrap()).unwrap()); + (TempDbLifetime::without_destroy(Arc::downgrade(&db)), db) +} + +/// Loads an existing DB from the provided directory path. +/// Callers must keep the `TempDbLifetime` guard for as long as they wish the DB instance to exist. +pub fn load_existing_db(db_path: String) -> (TempDbLifetime, Arc) { + let db_dir = PathBuf::from(db_path); + let db = Arc::new(DB::open_default(db_dir.to_str().unwrap()).unwrap()); + (TempDbLifetime::without_destroy(Arc::downgrade(&db)), db) +} diff --git a/consensus/src/model/services/statuses.rs b/consensus/src/model/services/statuses.rs index ecaa5f83b4..c8a79b4387 100644 --- a/consensus/src/model/services/statuses.rs +++ b/consensus/src/model/services/statuses.rs @@ -5,6 +5,7 @@ use parking_lot::RwLock; use std::sync::Arc; /// Multi-threaded block-statuses service imp +#[derive(Clone)] pub struct MTStatusesService { store: Arc>, } diff --git a/consensus/src/pipeline/header_processor/post_pow_validation.rs b/consensus/src/pipeline/header_processor/post_pow_validation.rs index bc56851dea..dd4060bdfe 100644 --- a/consensus/src/pipeline/header_processor/post_pow_validation.rs +++ b/consensus/src/pipeline/header_processor/post_pow_validation.rs @@ -103,15 +103,17 @@ impl HeaderProcessor { let gd_data = ctx.ghostdag_data.as_ref().unwrap(); let merge_depth_root = self.depth_manager.calc_merge_depth_root(gd_data, ctx.pruning_point()); let finality_point = self.depth_manager.calc_finality_point(gd_data, ctx.pruning_point()); - let non_bounded_merge_depth_violating_blues: Vec = - self.depth_manager.non_bounded_merge_depth_violating_blues(gd_data, merge_depth_root).collect(); + let mut kosherizing_blues: Option> = None; - for red in gd_data.mergeset_reds.iter().cloned() { + for red in gd_data.mergeset_reds.iter().copied() { if self.reachability_service.is_dag_ancestor_of(merge_depth_root, red) { continue; } - - if !non_bounded_merge_depth_violating_blues.iter().any(|blue| self.reachability_service.is_dag_ancestor_of(red, *blue)) { + // Lazy load the kosherizing blocks since this case is extremely rare + if kosherizing_blues.is_none() { + kosherizing_blues = Some(self.depth_manager.kosherizing_blues(gd_data, merge_depth_root).collect()); + } + if !self.reachability_service.is_dag_ancestor_of_any(red, &mut kosherizing_blues.as_ref().unwrap().iter().copied()) { return Err(RuleError::ViolatingBoundedMergeDepth); } } diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index 6125d5786a..3a39b5e1bf 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -175,7 +175,7 @@ impl HeaderProcessor { block_window_cache_for_difficulty: Arc, block_window_cache_for_past_median_time: Arc, reachability_service: MTReachabilityService, - relations_service: Arc>, + relations_service: MTRelationsService, past_median_time_manager: PastMedianTimeManager, dag_traversal_manager: DagTraversalManager, difficulty_manager: DifficultyManager, @@ -391,7 +391,7 @@ impl HeaderProcessor { header.timestamp = self.genesis_timestamp; let header = Arc::new(header); let mut ctx = HeaderProcessingContext::new(self.genesis_hash, &header, PruningPointInfo::from_genesis(self.genesis_hash)); - ctx.ghostdag_data = Some(self.ghostdag_manager.genesis_ghostdag_data()); + ctx.ghostdag_data = Some(Arc::new(self.ghostdag_manager.genesis_ghostdag_data())); ctx.block_window_for_difficulty = Some(Default::default()); ctx.block_window_for_past_median_time = Some(Default::default()); ctx.mergeset_non_daa = Some(Default::default()); diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index e80f29f2a7..e7d506f782 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -2,19 +2,23 @@ use crate::{ consensus::DbGhostdagManager, constants::BLOCK_VERSION, model::{ - services::reachability::{MTReachabilityService, ReachabilityService}, + services::{ + reachability::{MTReachabilityService, ReachabilityService}, + relations::MTRelationsService, + }, stores::{ acceptance_data::{AcceptanceData, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, block_window_cache::BlockWindowCacheStore, daa::DbDaaStore, + depth::DbDepthStore, errors::StoreError, - ghostdag::{DbGhostdagStore, GhostdagStoreReader}, + ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader}, headers::{DbHeadersStore, HeaderStoreReader}, past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStore, PastPruningPointsStoreReader}, pruning::{DbPruningStore, PruningStore, PruningStoreReader}, reachability::DbReachabilityStore, - relations::DbRelationsStore, + relations::{DbRelationsStore, RelationsStoreReader}, statuses::{DbStatusesStore, StatusesStore, StatusesStoreBatchExtensions, StatusesStoreReader}, tips::{DbTipsStore, TipsStoreReader}, utxo_diffs::{DbUtxoDiffsStore, UtxoDiffsStoreReader}, @@ -27,9 +31,9 @@ use crate::{ params::Params, pipeline::{deps_manager::BlockTask, virtual_processor::utxo_validation::UtxoProcessingContext}, processes::{ - coinbase::CoinbaseManager, difficulty::DifficultyManager, parents_builder::ParentsManager, - past_median_time::PastMedianTimeManager, pruning::PruningManager, transaction_validator::TransactionValidator, - traversal_manager::DagTraversalManager, + block_depth::BlockDepthManager, coinbase::CoinbaseManager, difficulty::DifficultyManager, ghostdag::ordering::SortableBlock, + parents_builder::ParentsManager, past_median_time::PastMedianTimeManager, pruning::PruningManager, + transaction_validator::TransactionValidator, traversal_manager::DagTraversalManager, }, }; use consensus_core::{ @@ -40,6 +44,7 @@ use consensus_core::{ merkle::calc_hash_merkle_root, tx::Transaction, utxo::{utxo_diff::UtxoDiff, utxo_view::UtxoViewComposition}, + BlockHashSet, }; use hashes::Hash; use kaspa_core::{info, trace}; @@ -48,10 +53,15 @@ use muhash::MuHash; use crossbeam_channel::Receiver; use itertools::Itertools; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; -use rand::seq::SliceRandom; use rayon::ThreadPool; use rocksdb::WriteBatch; -use std::{ops::Deref, sync::Arc, time::SystemTime}; +use std::{ + cmp::{min, Reverse}, + collections::VecDeque, + ops::Deref, + sync::Arc, + time::SystemTime, +}; pub struct VirtualStateProcessor { // Channels @@ -92,6 +102,7 @@ pub struct VirtualStateProcessor { // Managers and services pub(super) ghostdag_manager: DbGhostdagManager, pub(super) reachability_service: MTReachabilityService, + pub(super) relations_service: MTRelationsService, pub(super) dag_traversal_manager: DagTraversalManager, pub(super) difficulty_manager: DifficultyManager, pub(super) coinbase_manager: CoinbaseManager, @@ -99,6 +110,7 @@ pub struct VirtualStateProcessor { pub(super) past_median_time_manager: PastMedianTimeManager, pub(super) pruning_manager: PruningManager, pub(super) parents_manager: ParentsManager, + pub(super) depth_manager: BlockDepthManager, } impl VirtualStateProcessor { @@ -127,6 +139,7 @@ impl VirtualStateProcessor { // Managers ghostdag_manager: DbGhostdagManager, reachability_service: MTReachabilityService, + relations_service: MTRelationsService, dag_traversal_manager: DagTraversalManager, difficulty_manager: DifficultyManager, coinbase_manager: CoinbaseManager, @@ -134,6 +147,7 @@ impl VirtualStateProcessor { past_median_time_manager: PastMedianTimeManager, pruning_manager: PruningManager, parents_manager: ParentsManager, + depth_manager: BlockDepthManager, ) -> Self { Self { receiver, @@ -162,6 +176,7 @@ impl VirtualStateProcessor { virtual_state_store, ghostdag_manager, reachability_service, + relations_service, dag_traversal_manager, difficulty_manager, coinbase_manager, @@ -169,6 +184,7 @@ impl VirtualStateProcessor { past_median_time_manager, pruning_manager, parents_manager, + depth_manager, } } @@ -198,18 +214,15 @@ impl VirtualStateProcessor { } fn resolve_virtual(self: &Arc) { - let prev_state = self.virtual_state_store.read().get().unwrap(); - let virtual_parents = self.pick_virtual_parents(); - // TODO: check finality violation // TODO: handle disqualified chain loop // TODO: acceptance data format // TODO: refactor this methods into multiple methods - let virtual_ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents); - + let prev_state = self.virtual_state_store.read().get().unwrap(); + let tips = self.body_tips_store.read().get().unwrap().iter().copied().collect_vec(); + let new_selected = self.ghostdag_manager.find_selected_parent(&mut tips.iter().copied()); let prev_selected = prev_state.ghostdag_data.selected_parent; - let new_selected = virtual_ghostdag_data.selected_parent; let mut split_point: Option = None; let mut accumulated_diff = prev_state.utxo_diff.clone().to_reversed(); @@ -269,8 +282,14 @@ impl VirtualStateProcessor { } } - match self.statuses_store.read().get(new_selected).unwrap() { + // NOTE: inlining this within the match captures the statuses store lock and should be avoided. + // TODO: wrap statuses store lock within a service + let new_selected_status = self.statuses_store.read().get(new_selected).unwrap(); + match new_selected_status { BlockStatus::StatusUTXOValid => { + let (virtual_parents, virtual_ghostdag_data) = self.pick_virtual_parents(new_selected, tips); + assert_eq!(virtual_ghostdag_data.selected_parent, new_selected); + // Calc the new virtual UTXO diff let selected_parent_multiset_hash = self.utxo_multisets_store.get(virtual_ghostdag_data.selected_parent).unwrap(); let selected_parent_utxo_view = self.virtual_utxo_store.as_ref().compose(&accumulated_diff); @@ -322,7 +341,7 @@ impl VirtualStateProcessor { } // TODO: Make a separate pruning processor and send to its channel here - self.maybe_update_pruning_point_and_candidate() + self.advance_pruning_point_and_candidate_if_possible() } fn commit_utxo_state(self: &Arc, current: Hash, mergeset_diff: UtxoDiff, multiset: MuHash, acceptance_data: AcceptanceData) { @@ -336,25 +355,129 @@ impl VirtualStateProcessor { drop(write_guard); } - fn pick_virtual_parents(self: &Arc) -> Vec { - // TODO: implement virtual parents selection rules - // 1. Max parents - // 2. Mergeset limit - // 3. Bounded merge depth - - let mut virtual_parents = self.body_tips_store.read().get().unwrap().iter().copied().collect_vec(); - if virtual_parents.len() > self.max_block_parents as usize { - // TEMP - let selected_parent = self.ghostdag_manager.find_selected_parent(&mut virtual_parents.iter().copied()); - let index = virtual_parents.iter().position(|&h| h == selected_parent).unwrap(); - virtual_parents.swap_remove(index); - let mut rng = rand::thread_rng(); - virtual_parents = std::iter::once(selected_parent) - .chain(virtual_parents.choose_multiple(&mut rng, self.max_block_parents as usize - 1).copied()) - .collect(); + /// Picks the virtual parents according to virtual parent selection pruning constrains. + /// Assumes `selected_parent` is a UTXO-valid block, and that `candidates` are an antichain + /// containing `selected_parent` s.t. it is the block with highest blue work amongst them. + fn pick_virtual_parents(&self, selected_parent: Hash, candidates: Vec) -> (Vec, GhostdagData) { + // TODO: tests + let max_block_parents = self.max_block_parents as usize; + + // Limit to max_block_parents*3 candidates, that way we don't go over thousands of tips when the network isn't healthy. + // There's no specific reason for a factor of 3, and its not a consensus rule, just an estimation saying we probably + // don't want to consider and calculate 3 times the amount of candidates for the set of parents. + let max_candidates = max_block_parents * 3; + let mut candidates = candidates + .into_iter() + .filter(|&h| h != selected_parent) // Filter the selected parent since we already know it must be included + .map(|block| Reverse(SortableBlock { hash: block, blue_work: self.ghostdag_store.get_blue_work(block).unwrap() })) + .k_smallest(max_candidates) // Takes the k largest blocks by blue work in descending order + .map(|s| s.0.hash) + .collect::>(); + // Prioritize half the blocks with highest blue work and half with lowest, so the network will merge splits faster. + if candidates.len() >= max_block_parents { + let max_additional_parents = max_block_parents - 1; // We already have the selected parent + let mut j = candidates.len() - 1; + for i in max_additional_parents / 2..max_additional_parents { + candidates.swap(i, j); + j -= 1; + } + } + + let mut virtual_parents = Vec::with_capacity(min(max_block_parents, candidates.len() + 1)); + virtual_parents.push(selected_parent); + let mut mergeset_size = 1; // Count the selected parent + + // Try adding parents as long as mergeset size and number of parents limits are not reached + while let Some(candidate) = candidates.pop_front() { + if mergeset_size >= self.mergeset_size_limit || virtual_parents.len() >= max_block_parents { + break; + } + match self.mergeset_increase(&virtual_parents, candidate, self.mergeset_size_limit - mergeset_size) { + MergesetIncreaseResult::Accepted { increase_size } => { + mergeset_size += increase_size; + virtual_parents.push(candidate); + } + MergesetIncreaseResult::Rejected { new_candidate } => { + // If we already have a candidate in the past of new candidate then skip. + if self.reachability_service.is_any_dag_ancestor(&mut candidates.iter().copied(), new_candidate) { + continue; // TODO: not sure this test is needed if candidates invariant as antichain is kept + } + // Remove all candidates which are in the future of the new candidate + candidates.retain(|&h| !self.reachability_service.is_dag_ancestor_of(new_candidate, h)); + candidates.push_back(new_candidate); + } + } + } + assert!(mergeset_size <= self.mergeset_size_limit); + assert!(virtual_parents.len() <= max_block_parents); + self.remove_bounded_merge_breaking_parents(virtual_parents) + } + + fn mergeset_increase(&self, selected_parents: &[Hash], candidate: Hash, budget: u64) -> MergesetIncreaseResult { + /* + Algo: + Traverse past(candidate) \setminus past(selected_parents) and make + sure the increase in mergeset size is within the available budget + */ + + let candidate_parents = self.relations_service.get_parents(candidate).unwrap(); + let mut queue: VecDeque<_> = candidate_parents.iter().copied().collect(); + let mut visited: BlockHashSet = queue.iter().copied().collect(); + let mut mergeset_increase = 1u64; // Starts with 1 to count for the candidate itself + + while let Some(current) = queue.pop_front() { + if self.reachability_service.is_dag_ancestor_of_any(current, &mut selected_parents.iter().copied()) { + continue; + } + mergeset_increase += 1; + if mergeset_increase > budget { + return MergesetIncreaseResult::Rejected { new_candidate: current }; + } + + let current_parents = self.relations_service.get_parents(current).unwrap(); + for &parent in current_parents.iter() { + if visited.insert(parent) { + queue.push_back(parent); + } + } + } + MergesetIncreaseResult::Accepted { increase_size: mergeset_increase } + } + + fn remove_bounded_merge_breaking_parents(&self, mut virtual_parents: Vec) -> (Vec, GhostdagData) { + let mut ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents); + let pruning_point = + self.pruning_manager.expected_header_pruning_point(ghostdag_data.to_compact(), self.pruning_store.read().get().unwrap()); + let merge_depth_root = self.depth_manager.calc_merge_depth_root(&ghostdag_data, pruning_point); + let mut kosherizing_blues: Option> = None; + let mut bad_reds = Vec::new(); + + // + // Note that the code below optimizes for the usual case where there are no merge bound violating blocks. + // + + // Find red blocks violating the merge bound and which are not kosherized by any blue + for red in ghostdag_data.mergeset_reds.iter().copied() { + if self.reachability_service.is_dag_ancestor_of(merge_depth_root, red) { + continue; + } + // Lazy load the kosherizing blocks since this case is extremely rare + if kosherizing_blues.is_none() { + kosherizing_blues = Some(self.depth_manager.kosherizing_blues(&ghostdag_data, merge_depth_root).collect()); + } + if !self.reachability_service.is_dag_ancestor_of_any(red, &mut kosherizing_blues.as_ref().unwrap().iter().copied()) { + bad_reds.push(red); + } } - virtual_parents + if !bad_reds.is_empty() { + // Remove all parents which lead to merging a bad red + virtual_parents.retain(|&h| !self.reachability_service.is_any_dag_ancestor(&mut bad_reds.iter().copied(), h)); + // Recompute ghostdag data since parents changed + ghostdag_data = self.ghostdag_manager.ghostdag(&virtual_parents); + } + + (virtual_parents, ghostdag_data) } pub fn build_block_template(self: &Arc, miner_data: MinerData, mut txs: Vec) -> BlockTemplate { @@ -377,7 +500,7 @@ impl VirtualStateProcessor { txs.insert(0, coinbase.tx); let version = BLOCK_VERSION; let parents_by_level = self.parents_manager.calc_block_parents(pruning_point, &virtual_state.parents); - let hash_merkle_root = calc_hash_merkle_root(&mut txs.iter()); + let hash_merkle_root = calc_hash_merkle_root(txs.iter()); let accepted_id_merkle_root = merkle::calc_merkle_root(virtual_state.accepted_tx_ids.iter().copied()); let utxo_commitment = virtual_state.multiset.clone().finalize(); // Past median time is the exclusive lower bound for valid block time, so we increase by 1 to get the valid min @@ -401,7 +524,7 @@ impl VirtualStateProcessor { BlockTemplate::new(MutableBlock::new(header, txs), miner_data, coinbase.has_red_reward, selected_parent_timestamp) } - fn maybe_update_pruning_point_and_candidate(self: &Arc) { + fn advance_pruning_point_and_candidate_if_possible(self: &Arc) { let virtual_sp = self.virtual_state_store.read().get().unwrap().ghostdag_data.selected_parent; if virtual_sp == self.genesis_hash { return; @@ -465,3 +588,8 @@ impl VirtualStateProcessor { } } } + +enum MergesetIncreaseResult { + Accepted { increase_size: u64 }, + Rejected { new_candidate: Hash }, +} diff --git a/consensus/src/processes/block_depth.rs b/consensus/src/processes/block_depth.rs index eb0c28ec09..27bba1ce28 100644 --- a/consensus/src/processes/block_depth.rs +++ b/consensus/src/processes/block_depth.rs @@ -80,7 +80,9 @@ impl Bl current } - pub fn non_bounded_merge_depth_violating_blues<'a>( + /// Returns the set of blues which are eligible for "kosherizing" merge bound violating blocks. + /// By prunality rules, these blocks must have `merge_depth_root` on their selected chain. + pub fn kosherizing_blues<'a>( &'a self, ghostdag_data: &'a GhostdagData, merge_depth_root: Hash, diff --git a/consensus/src/processes/ghostdag/protocol.rs b/consensus/src/processes/ghostdag/protocol.rs index c0f6b215dc..d64a65f3fe 100644 --- a/consensus/src/processes/ghostdag/protocol.rs +++ b/consensus/src/processes/ghostdag/protocol.rs @@ -26,7 +26,7 @@ pub struct GhostdagManager, - pub(super) relations_store: Arc, + pub(super) relations_store: S, pub(super) headers_store: Arc, pub(super) reachability_service: U, } @@ -36,22 +36,22 @@ impl, - relations_store: Arc, + relations_store: S, headers_store: Arc, reachability_service: U, ) -> Self { Self { genesis_hash, k, ghostdag_store, relations_store, reachability_service, headers_store } } - pub fn genesis_ghostdag_data(&self) -> Arc { - Arc::new(GhostdagData::new( + pub fn genesis_ghostdag_data(&self) -> GhostdagData { + GhostdagData::new( 0, Default::default(), // TODO: take blue score and work from actual genesis blockhash::ORIGIN, BlockHashes::new(Vec::new()), BlockHashes::new(Vec::new()), HashKTypeMap::new(BlockHashMap::new()), - )) + ) } pub fn find_selected_parent(&self, parents: &mut impl Iterator) -> Hash { @@ -177,7 +177,6 @@ impl = BlockHashMap::with_capacity(self.k as usize); - // Iterate over all blocks in the blue past of the new block that are not in the past // of blue_candidate, and check for each one of them if blue_candidate potentially // enlarges their blue anticone to be over K, or that they enlarge the blue anticone // of blue_candidate to be over K. let mut chain_block = ChainBlock { hash: None, data: new_block_data.into() }; - let mut candidate_blue_anticone_size: KType = 0; loop { diff --git a/consensus/src/processes/pruning.rs b/consensus/src/processes/pruning.rs index b3e59f9938..9d594c1701 100644 --- a/consensus/src/processes/pruning.rs +++ b/consensus/src/processes/pruning.rs @@ -1,9 +1,9 @@ use std::sync::Arc; +use super::reachability::ReachabilityResultExtensions; use crate::model::{ services::reachability::{MTReachabilityService, ReachabilityService}, stores::{ - errors::StoreError, ghostdag::{CompactGhostdagData, GhostdagStoreReader}, headers::HeaderStoreReader, past_pruning_points::PastPruningPointsStoreReader, @@ -13,8 +13,6 @@ use crate::model::{ }; use hashes::Hash; -use super::reachability::ReachabilityError; - #[derive(Clone)] pub struct PruningManager { pruning_depth: u64, @@ -121,23 +119,12 @@ impl { - if is_in_future_of_current_pruning_point { - Some(sp_header_pp) - } else { - None - } - } - Err(ReachabilityError::StoreError(e)) => { - if let StoreError::KeyNotFound(_) = e { - None - } else { - panic!("Unexpected store error: {:?}", e) - } - } - Err(err) => panic!("Unexpected reachability error: {:?}", err), - }; + // If the selected parent pruning point is in the future of current global pruning point, then provide it as a suggestion + let suggested_low_hash = self + .reachability_service + .is_dag_ancestor_of_result(current_pruning_point, sp_header_pp) + .unwrap_option() + .and_then(|b| if b { Some(sp_header_pp) } else { None }); let (new_pruning_points, _) = self.next_pruning_points_and_candidate_by_ghostdag_data( ghostdag_data, suggested_low_hash, diff --git a/consensus/src/processes/reachability/mod.rs b/consensus/src/processes/reachability/mod.rs index 2192be31bf..229138060e 100644 --- a/consensus/src/processes/reachability/mod.rs +++ b/consensus/src/processes/reachability/mod.rs @@ -32,6 +32,7 @@ impl ReachabilityError { pub type Result = std::result::Result; pub trait ReachabilityResultExtensions { + /// Unwraps the error into `None` if the internal error is `StoreError::KeyNotFound` or panics otherwise fn unwrap_option(self) -> Option; } diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 21d905bf1b..14e674e2a9 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -1,6 +1,9 @@ use clap::Parser; use consensus::{ - consensus::{test_consensus::create_temp_db, Consensus}, + consensus::{ + test_consensus::{create_temp_db, load_existing_db}, + Consensus, + }, constants::perf::{PerfParams, PERF_PARAMS}, errors::{BlockProcessResult, RuleError}, model::stores::{ @@ -67,10 +70,18 @@ struct Args { /// -- You may also specify =,=,... to set the log level for individual subsystems #[arg(long = "loglevel", default_value = "info")] log_level: String, + + /// Output directory to save the simulation DB + #[arg(short, long)] + output_dir: Option, + + /// Input directory of a previous simulation DB (NOTE: simulation args must be compatible with the original run) + #[arg(short, long)] + input_dir: Option, } /// Calculates the k parameter of the GHOSTDAG protocol such that anticones lager than k will be created -/// with probability less than 'delta' (follows eq. 1 from section 4.2 of the PHANTOM paper) +/// with probability less than `delta` (follows eq. 1 from section 4.2 of the PHANTOM paper) /// `x` is expected to be 2Dλ where D is the maximal network delay and λ is the block mining rate. /// `delta` is an upper bound for the probability of anticones larger than k. /// Returns the minimal k such that the above conditions hold. @@ -103,10 +114,21 @@ fn main() { let mut perf_params = PERF_PARAMS; adjust_consensus_params(&args, &mut params); adjust_perf_params(&args, ¶ms, &mut perf_params); - let until = if args.target_blocks.is_none() { args.sim_time * 1000 } else { u64::MAX }; // milliseconds - let mut sim = KaspaNetworkSimulator::new(args.delay, args.bps, args.target_blocks, ¶ms, &perf_params); - let (consensus, handles, _lifetime) = sim.init(args.miners, args.tpb, !args.quiet).run(until); - consensus.shutdown(handles); + + // Load an existing consensus or run the simulation + let (consensus, _lifetime) = if let Some(input_dir) = args.input_dir { + let (lifetime, db) = load_existing_db(input_dir); + let consensus = Arc::new(Consensus::with_perf_params(db, ¶ms, &perf_params)); + (consensus, lifetime) + } else { + let until = if args.target_blocks.is_none() { args.sim_time * 1000 } else { u64::MAX }; // milliseconds + let mut sim = KaspaNetworkSimulator::new(args.delay, args.bps, args.target_blocks, ¶ms, &perf_params, args.output_dir); + let (consensus, handles, lifetime) = sim.init(args.miners, args.tpb, !args.quiet).run(until); + consensus.shutdown(handles); + (consensus, lifetime) + }; + + // Benchmark the DAG validation time let (_lifetime2, db2) = create_temp_db(); let consensus2 = Arc::new(Consensus::with_perf_params(db2, ¶ms, &perf_params)); let handles2 = consensus2.init(); diff --git a/simpa/src/simulator/network.rs b/simpa/src/simulator/network.rs index 59f2db8785..2b2ca06a47 100644 --- a/simpa/src/simulator/network.rs +++ b/simpa/src/simulator/network.rs @@ -4,7 +4,7 @@ use std::thread::JoinHandle; use super::infra::Simulation; use super::miner::Miner; -use consensus::consensus::test_consensus::{create_temp_db, TempDbLifetime}; +use consensus::consensus::test_consensus::{create_permanent_db, create_temp_db, TempDbLifetime}; use consensus::consensus::Consensus; use consensus::constants::perf::PerfParams; use consensus::params::Params; @@ -22,11 +22,19 @@ pub struct KaspaNetworkSimulator { params: Params, // Consensus params perf_params: PerfParams, // Performance params bps: f64, // Blocks per second - target_blocks: Option, // Target simulation blocs + target_blocks: Option, // Target simulation blocks + output_dir: Option, // Possible permanent output directory } impl KaspaNetworkSimulator { - pub fn new(delay: f64, bps: f64, target_blocks: Option, params: &Params, perf_params: &PerfParams) -> Self { + pub fn new( + delay: f64, + bps: f64, + target_blocks: Option, + params: &Params, + perf_params: &PerfParams, + output_dir: Option, + ) -> Self { Self { simulation: Simulation::new((delay * 1000.0) as u64), consensuses: Vec::new(), @@ -34,6 +42,7 @@ impl KaspaNetworkSimulator { params: params.clone(), perf_params: perf_params.clone(), target_blocks, + output_dir, } } @@ -41,7 +50,11 @@ impl KaspaNetworkSimulator { let secp = secp256k1::Secp256k1::new(); let mut rng = rand::thread_rng(); for i in 0..num_miners { - let (lifetime, db) = create_temp_db(); + let (lifetime, db) = if i == 0 && self.output_dir.is_some() { + create_permanent_db(self.output_dir.clone().unwrap()) + } else { + create_temp_db() + }; let consensus = Arc::new(Consensus::with_perf_params(db, &self.params, &self.perf_params)); let handles = consensus.init(); let (sk, pk) = secp.generate_keypair(&mut rng);