Skip to content

Commit

Permalink
bench and compare.
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Sep 9, 2024
1 parent 6930ef9 commit 25eb1e2
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 125 deletions.
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ arc-swap.workspace = true
async-channel.workspace = true
bincode.workspace = true
crossbeam-channel.workspace = true
criterion.workspace = true
faster-hex.workspace = true
futures-util.workspace = true
indexmap.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub mod perf {

const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000;
const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000;

#[derive(Clone, Debug)]
Expand Down
1 change: 1 addition & 0 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl MemSizeEstimator for GhostdagData {
impl MemSizeEstimator for CompactGhostdagData {}

impl From<&GhostdagData> for CompactGhostdagData {
#[inline(always)]
fn from(value: &GhostdagData) -> Self {
Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent }
}
Expand Down
13 changes: 12 additions & 1 deletion consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId, VirtualStateProcessingMessage},
ProcessingCounters,
},
processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator},
processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator, window::WindowManager},
};
use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
Expand Down Expand Up @@ -82,6 +82,8 @@ pub struct BlockBodyProcessor {

/// Storage mass hardfork DAA score
pub(crate) storage_mass_activation_daa_score: u64,

time: Arc<RwLock<std::time::Instant>>,
}

impl BlockBodyProcessor {
Expand Down Expand Up @@ -132,6 +134,7 @@ impl BlockBodyProcessor {
notification_root,
counters,
storage_mass_activation_daa_score,
time: Arc::new(RwLock::new(std::time::Instant::now())),
}
}

Expand Down Expand Up @@ -233,6 +236,14 @@ impl BlockBodyProcessor {
}

fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
let mut time_guard = self.time.write();

if time_guard.elapsed().as_secs() > 10 {
self.window_manager.maybe_log();
*time_guard = std::time::Instant::now();
}
drop(time_guard);

let mut batch = WriteBatch::default();

// This is an append only store so it requires no lock.
Expand Down
22 changes: 20 additions & 2 deletions consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::{
},
params::Params,
pipeline::deps_manager::{BlockProcessingMessage, BlockTask, BlockTaskDependencyManager, TaskId},
processes::{ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions},
processes::{
ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations::RelationsStoreExtensions,
window::WindowManager,
},
};
use crossbeam_channel::{Receiver, Sender};
use itertools::Itertools;
Expand All @@ -43,7 +46,10 @@ use kaspa_utils::vec::VecExtensions;
use parking_lot::RwLock;
use rayon::ThreadPool;
use rocksdb::WriteBatch;
use std::sync::{atomic::Ordering, Arc};
use std::{
sync::{atomic::Ordering, Arc},
time::Instant,
};

use super::super::ProcessingCounters;

Expand Down Expand Up @@ -154,6 +160,8 @@ pub struct HeaderProcessor {

// Counters
counters: Arc<ProcessingCounters>,

time: Arc<RwLock<Instant>>,
}

impl HeaderProcessor {
Expand Down Expand Up @@ -206,6 +214,8 @@ impl HeaderProcessor {
mergeset_size_limit: params.mergeset_size_limit,
skip_proof_of_work: params.skip_proof_of_work,
max_block_level: params.max_block_level,

time: Arc::new(RwLock::new(Instant::now())),
}
}

Expand Down Expand Up @@ -362,6 +372,14 @@ impl HeaderProcessor {
}

fn commit_header(&self, ctx: HeaderProcessingContext, header: &Header) {
let mut time_guard = self.time.write();

if time_guard.elapsed().as_secs() > 10 {
self.window_manager.maybe_log();
*time_guard = Instant::now();
}
drop(time_guard);

let ghostdag_data = ctx.ghostdag_data.as_ref().unwrap();
let pp = ctx.pruning_point();

Expand Down
51 changes: 29 additions & 22 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use crate::{
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
block_window_cache::{BlockWindowCacheStore, BlockWindowHeap},
block_window_cache::BlockWindowCacheStore,
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStoreReader},
headers::{DbHeadersStore, HeaderStoreReader},
past_pruning_points::DbPastPruningPointsStore,
pruning::{DbPruningStore, PruningStoreReader},
Expand Down Expand Up @@ -229,6 +229,7 @@ impl VirtualStateProcessor {
notification_root,
counters,
storage_mass_activation_daa_score: params.storage_mass_activation_daa_score,
// bool: todo!(),
}
}

Expand Down Expand Up @@ -311,11 +312,18 @@ impl VirtualStateProcessor {
.expect("all possible rule errors are unexpected here");

// Update the pruning processor about the virtual state change
let sink_ghostdag_data = self.ghostdag_primary_store.get_compact_data(new_sink).unwrap();
let sink_ghostdag_data = self.ghostdag_primary_store.get_data(new_sink).unwrap();

// update window caches - for ibd performance. see method comment for more details.
if prev_sink != new_sink {
self.maybe_commit_windows(new_sink, &sink_ghostdag_data);
};

let compact_sink_ghostdag_data = CompactGhostdagData::from(sink_ghostdag_data.as_ref());
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
// the internal channel does not grow with no need (since we only care about the most recent message)
let _consume = self.pruning_receiver.try_iter().count();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap();

// Emit notifications
let accumulated_diff = Arc::new(accumulated_diff);
Expand All @@ -327,7 +335,7 @@ impl VirtualStateProcessor {
.notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
Expand Down Expand Up @@ -491,9 +499,7 @@ impl VirtualStateProcessor {
// Calc virtual DAA score, difficulty bits and past median time
let virtual_daa_window = self.window_manager.block_daa_window(&virtual_ghostdag_data)?;
let virtual_bits = self.window_manager.calculate_difficulty_bits(&virtual_ghostdag_data, &virtual_daa_window);
let (virtual_past_median_time, past_median_time_window) = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?;

self.maybe_commit_windows(ctx.selected_parent(), virtual_daa_window.window, past_median_time_window);
let (virtual_past_median_time, _) = self.window_manager.calc_past_median_time(&virtual_ghostdag_data)?;

// Calc virtual UTXO state relative to selected parent
self.calculate_utxo_state(&mut ctx, &selected_parent_utxo_view, virtual_daa_window.daa_score);
Expand Down Expand Up @@ -544,21 +550,22 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

fn maybe_commit_windows(
&self,
selected_parent: Hash,
daa_window: Arc<BlockWindowHeap>,
past_median_time_window: Arc<BlockWindowHeap>,
) {
// TODO: this only important for ibd performance, as we incur cache misses otherwise.
// We could optimize this by only committing the windows if virtual processor where to have explict knowledge of being in ibd.
if !self.block_window_cache_for_difficulty.contains_key(&selected_parent) {
self.block_window_cache_for_difficulty.insert(selected_parent, daa_window);
}
fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) {
// this is only important for ibd performance, as we incur expensive cache misses otherwise.
// this occurs because we cannot rely on header processing to pre-cache in this scenario.

if !self.block_window_cache_for_past_median_time.contains_key(&selected_parent) {
self.block_window_cache_for_past_median_time.insert(selected_parent, past_median_time_window);
}
// TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd.
// above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method.

if !self.block_window_cache_for_difficulty.contains_key(&new_sink) {
self.block_window_cache_for_difficulty
.insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window);
};

if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) {
self.block_window_cache_for_past_median_time
.insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data).unwrap().1);
};
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/ghostdag/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::model::{

use super::protocol::GhostdagManager;

#[derive(Eq, Clone, Serialize, Deserialize)]
#[derive(Eq, Clone, Serialize, Deserialize, Debug)]
pub struct SortableBlock {
pub hash: Hash,
pub blue_work: BlueWorkType,
Expand Down
Loading

0 comments on commit 25eb1e2

Please sign in to comment.