Skip to content

Commit

Permalink
show code
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Sep 3, 2024
1 parent d3ae942 commit 3172767
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 12 deletions.
12 changes: 10 additions & 2 deletions consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
deps_manager::{BlockProcessingMessage, BlockTaskDependencyManager, TaskId, VirtualStateProcessingMessage},
ProcessingCounters,
},
processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator},
processes::{coinbase::CoinbaseManager, transaction_validator::TransactionValidator, window::WindowManager},
};
use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
Expand All @@ -37,7 +37,7 @@ use kaspa_notify::notifier::Notify;
use parking_lot::RwLock;
use rayon::ThreadPool;
use rocksdb::WriteBatch;
use std::sync::{atomic::Ordering, Arc};
use std::{sync::{atomic::Ordering, Arc}, time::{Duration, Instant}};

pub struct BlockBodyProcessor {
// Channels
Expand Down Expand Up @@ -82,6 +82,8 @@ pub struct BlockBodyProcessor {

/// Storage mass hardfork DAA score
pub(crate) storage_mass_activation_daa_score: u64,

last_log_instance: Arc<RwLock<Instant>>,
}

impl BlockBodyProcessor {
Expand Down Expand Up @@ -132,6 +134,7 @@ impl BlockBodyProcessor {
notification_root,
counters,
storage_mass_activation_daa_score,
last_log_instance: Arc::new(RwLock::new(Instant::now())),
}
}

Expand Down Expand Up @@ -233,6 +236,11 @@ impl BlockBodyProcessor {
}

fn commit_body(self: &Arc<BlockBodyProcessor>, hash: Hash, parents: &[Hash], transactions: Arc<Vec<Transaction>>) {
if self.last_log_instance.read().elapsed() > Duration::from_secs(10) {
self.window_manager.maybe_log_perf();
*self.last_log_instance.write() = Instant::now();
}

let mut batch = WriteBatch::default();

// This is an append only store so it requires no lock.
Expand Down
16 changes: 15 additions & 1 deletion consensus/src/pipeline/header_processor/post_pow_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::processes::window::WindowManager;
use kaspa_consensus_core::header::Header;
use kaspa_hashes::Hash;
use std::collections::HashSet;
use std::sync::atomic::Ordering;

impl HeaderProcessor {
pub fn post_pow_validation(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
Expand All @@ -18,41 +19,49 @@ impl HeaderProcessor {
}

pub fn check_median_timestamp(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let (past_median_time, window) = self.window_manager.calc_past_median_time(ctx.ghostdag_data())?;
ctx.block_window_for_past_median_time = Some(window);

if header.timestamp <= past_median_time {
return Err(RuleError::TimeTooOld(header.timestamp, past_median_time));
}

self.benching.check_median_timestamp_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

pub fn check_merge_size_limit(&self, ctx: &mut HeaderProcessingContext) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let mergeset_size = ctx.ghostdag_data().mergeset_size() as u64;
if mergeset_size > self.mergeset_size_limit {
return Err(RuleError::MergeSetTooBig(mergeset_size, self.mergeset_size_limit));
}
self.benching.check_merge_size_limit_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_blue_score(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let gd_blue_score = ctx.ghostdag_data().blue_score;
if gd_blue_score != header.blue_score {
return Err(RuleError::UnexpectedHeaderBlueScore(gd_blue_score, header.blue_score));
}
self.benching.check_blue_score_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_blue_work(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let gd_blue_work = ctx.ghostdag_data().blue_work;
if gd_blue_work != header.blue_work {
return Err(RuleError::UnexpectedHeaderBlueWork(gd_blue_work, header.blue_work));
}
self.benching.check_blue_work_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

pub fn check_indirect_parents(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let expected_block_parents = self.parents_manager.calc_block_parents(ctx.pruning_point(), header.direct_parents());
if header.parents_by_level.len() != expected_block_parents.len()
|| !expected_block_parents.iter().enumerate().all(|(block_level, expected_level_parents)| {
Expand All @@ -70,18 +79,22 @@ impl HeaderProcessor {
TwoDimVecDisplay(header.parents_by_level.clone()),
));
};
self.benching.check_indirect_parents_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

pub fn check_pruning_point(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let expected = self.pruning_point_manager.expected_header_pruning_point(ctx.ghostdag_data().to_compact(), ctx.pruning_info);
if expected != header.pruning_point {
return Err(RuleError::WrongHeaderPruningPoint(expected, header.pruning_point));
}
self.benching.check_pruning_point_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

pub fn check_bounded_merge_depth(&self, ctx: &mut HeaderProcessingContext) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let ghostdag_data = ctx.ghostdag_data();
let merge_depth_root = self.depth_manager.calc_merge_depth_root(ghostdag_data, ctx.pruning_point());
let finality_point = self.depth_manager.calc_finality_point(ghostdag_data, ctx.pruning_point());
Expand All @@ -102,6 +115,7 @@ impl HeaderProcessor {

ctx.merge_depth_root = Some(merge_depth_root);
ctx.finality_point = Some(finality_point);
self.benching.check_bounded_merge_depth_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}
}
25 changes: 19 additions & 6 deletions consensus/src/pipeline/header_processor/pre_ghostdag_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use kaspa_consensus_core::BlockLevel;
use kaspa_core::time::unix_now;
use kaspa_database::prelude::StoreResultExtensions;
use std::cmp::max;
use std::sync::atomic::Ordering;

impl HeaderProcessor {
/// Validates the header in isolation including pow check against header declared bits.
Expand All @@ -18,7 +19,7 @@ impl HeaderProcessor {
self.check_header_version(header)?;
self.check_block_timestamp_in_isolation(header)?;
self.check_parents_limit(header)?;
Self::check_parents_not_origin(header)?;
self.check_parents_not_origin(header)?;
self.check_pow_and_calc_block_level(header)
}

Expand All @@ -29,42 +30,49 @@ impl HeaderProcessor {
}

fn check_header_version(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
if header.version != constants::BLOCK_VERSION {
return Err(RuleError::WrongBlockVersion(header.version));
}
self.benching.check_header_version_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_block_timestamp_in_isolation(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
// Timestamp deviation tolerance is in seconds so we multiply by 1000 to get milliseconds (without BPS dependency)
let max_block_time = unix_now() + self.timestamp_deviation_tolerance * 1000;
if header.timestamp > max_block_time {
return Err(RuleError::TimeTooFarIntoTheFuture(header.timestamp, max_block_time));
}
self.benching.check_block_timestamp_in_isolation_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_parents_limit(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
if header.direct_parents().is_empty() {
return Err(RuleError::NoParents);
}

if header.direct_parents().len() > self.max_block_parents as usize {
return Err(RuleError::TooManyParents(header.direct_parents().len(), self.max_block_parents as usize));
}

self.benching.check_parents_limit_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_parents_not_origin(header: &Header) -> BlockProcessResult<()> {
fn check_parents_not_origin(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
if header.direct_parents().iter().any(|&parent| parent.is_origin()) {
return Err(RuleError::OriginParent);
}

self.benching.check_parents_not_origin_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_parents_exist(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let mut missing_parents = Vec::new();
for parent in header.direct_parents() {
match self.statuses_store.read().get(*parent).unwrap_option() {
Expand All @@ -78,10 +86,12 @@ impl HeaderProcessor {
if !missing_parents.is_empty() {
return Err(RuleError::MissingParents(missing_parents));
}
self.benching.check_parents_exist_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_parents_incest(&self, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let parents = header.direct_parents();
for parent_a in parents.iter() {
for parent_b in parents.iter() {
Expand All @@ -94,16 +104,19 @@ impl HeaderProcessor {
}
}
}

self.benching.check_parents_incest_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_pow_and_calc_block_level(&self, header: &Header) -> BlockProcessResult<BlockLevel> {
let time = std::time::Instant::now();
let state = kaspa_pow::State::new(header);
let (passed, pow) = state.check_pow(header.nonce);
if passed || self.skip_proof_of_work {
let signed_block_level = self.max_block_level as i64 - pow.bits() as i64;
Ok(max(signed_block_level, 0) as BlockLevel)
let res = max(signed_block_level, 0) as BlockLevel;
self.benching.check_pow_and_calc_block_level_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(res)
} else {
Err(RuleError::InvalidPoW)
}
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/pipeline/header_processor/pre_pow_validation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::Ordering;

use super::*;
use crate::errors::{BlockProcessResult, RuleError};
use crate::model::services::reachability::ReachabilityService;
Expand All @@ -12,6 +14,7 @@ impl HeaderProcessor {
}

fn check_pruning_violation(&self, ctx: &HeaderProcessingContext) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let known_parents = ctx.direct_known_parents();

// We check that the new block is in the future of the pruning point by verifying that at least
Expand All @@ -20,10 +23,12 @@ impl HeaderProcessor {
if !self.reachability_service.is_dag_ancestor_of_any(ctx.pruning_point(), &mut known_parents.iter().copied()) {
return Err(RuleError::PruningViolation(ctx.pruning_point()));
}
self.benching.check_pruning_violation_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}

fn check_difficulty_and_daa_score(&self, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> {
let time = std::time::Instant::now();
let ghostdag_data = ctx.ghostdag_data();
let daa_window = self.window_manager.block_daa_window(ghostdag_data)?;

Expand All @@ -39,6 +44,7 @@ impl HeaderProcessor {
}

ctx.block_window_for_difficulty = Some(daa_window.window);
self.benching.check_difficulty_and_daa_score_bench.fetch_add(time.elapsed().as_millis().try_into().unwrap(), Ordering::SeqCst);
Ok(())
}
}
Loading

0 comments on commit 3172767

Please sign in to comment.