Skip to content

Commit

Permalink
Semaphore tracing feature (for tracing prune readers vs writers time) (
Browse files Browse the repository at this point in the history
…kaspanet#526)

* semaphore trace + feature

* comments

* unrelated: avoid mass fee mult due to possible edge cases

* style: refactor code, move tracing atomics to TraceInner structure (#5)

* style: refactor code, move tracing atomics to TraceInner structure

* style: fmt

* final refactor

---------

Co-authored-by: Maxim <[email protected]>
  • Loading branch information
michaelsutton and biryukovmaxim authored Aug 22, 2024
1 parent 866f62f commit 261a750
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 22 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion kaspad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,25 @@ kaspa-utxoindex.workspace = true
kaspa-wrpc-server.workspace = true

async-channel.workspace = true
cfg-if.workspace = true
clap.workspace = true
dhat = { workspace = true, optional = true }
serde.workspace = true
dirs.workspace = true
futures-util.workspace = true
log.workspace = true
num_cpus.workspace = true
rand.workspace = true
rayon.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
workflow-log.workspace = true

toml = "0.8.10"
serde_with = "3.7.0"

[features]
heap = ["dhat", "kaspa-alloc/heap"]
devnet-prealloc = ["kaspa-consensus/devnet-prealloc"]
semaphore-trace = ["kaspa-utils/semaphore-trace"]
8 changes: 7 additions & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ impl Runtime {
let log_dir = get_log_dir(args);

// Initialize the logger
kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level);
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
kaspa_core::log::init_logger(log_dir.as_deref(), &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path()));
} else {
kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level);
}
};

// Configure the panic behavior
// As we log the panic, we want to set it up after the logger
Expand Down
25 changes: 12 additions & 13 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy};
use kaspa_core::{debug, error, info, time::Stopwatch, warn};
use kaspa_mining_errors::{manager::MiningManagerError, mempool::RuleError};
use parking_lot::RwLock;
use std::{ops::Mul, sync::Arc};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;

pub struct MiningManager {
Expand Down Expand Up @@ -1044,29 +1044,28 @@ fn feerate_stats(transactions: Vec<Transaction>, calculated_fees: Vec<u64>) -> O
return None;
}
if transactions.len() != calculated_fees.len() + 1 {
error!("Transactions length must be one more than `calculated_fees` length");
error!(
"[feerate_stats] block template transactions length ({}) is expected to be one more than `calculated_fees` length ({})",
transactions.len(),
calculated_fees.len()
);
return None;
}
debug_assert!(transactions[0].is_coinbase());
let mut fees_and_masses = calculated_fees
let mut feerates = calculated_fees
.into_iter()
.zip(transactions
.iter()
// skip coinbase tx
.skip(1)
.map(Transaction::mass))
.map(|(fee, mass)| fee as f64 / mass as f64)
.collect_vec();
feerates.sort_unstable_by(f64::total_cmp);

// Sort by fee rate without performing division for each comparison.
// Using multiplication instead of division is faster and avoids the need
// to convert all values to floats. Division is only performed later when
// calculating the min, max, and median fee rates.
fees_and_masses.sort_unstable_by(|(lhs_fee, lhs_mass), (rhs_fee, rhs_mass)| lhs_fee.mul(rhs_mass).cmp(&rhs_fee.mul(lhs_mass)));

let div_as_f64 = |(fee, mass)| fee as f64 / mass as f64;
let max = div_as_f64(fees_and_masses[fees_and_masses.len() - 1]);
let min = div_as_f64(fees_and_masses[0]);
let median = div_as_f64(fees_and_masses[fees_and_masses.len() / 2]);
let max = feerates[feerates.len() - 1];
let min = feerates[0];
let median = feerates[feerates.len() / 2];

Some(Stats { max, median, min })
}
Expand Down
2 changes: 2 additions & 0 deletions simpa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kaspa-perf-monitor.workspace = true
kaspa-utils.workspace = true

async-channel.workspace = true
cfg-if.workspace = true
clap.workspace = true
dhat = { workspace = true, optional = true }
futures-util.workspace = true
Expand All @@ -38,3 +39,4 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }

[features]
heap = ["dhat", "kaspa-alloc/heap"]
semaphore-trace = ["kaspa-utils/semaphore-trace"]
15 changes: 13 additions & 2 deletions simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ use kaspa_consensus_core::{
BlockHashSet, BlockLevel, HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, time::unix_now, trace, warn};
use kaspa_core::{
info,
task::{service::AsyncService, tick::TickService},
time::unix_now,
trace, warn,
};
use kaspa_database::prelude::ConnBuilder;
use kaspa_database::{create_temp_db, load_existing_db};
use kaspa_hashes::Hash;
Expand Down Expand Up @@ -133,7 +138,13 @@ fn main() {
let args = Args::parse();

// Initialize the logger
kaspa_core::log::init_logger(None, &args.log_level);
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
kaspa_core::log::init_logger(None, &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path()));
} else {
kaspa_core::log::init_logger(None, &args.log_level);
}
};

// Configure the panic behavior
// As we log the panic, we want to set it up after the logger
Expand Down
8 changes: 6 additions & 2 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ repository.workspace = true

[dependencies]
arc-swap.workspace = true
parking_lot.workspace = true
async-channel.workspace = true
borsh.workspace = true
cfg-if.workspace = true
event-listener.workspace = true
faster-hex.workspace = true
ipnet.workspace = true
itertools.workspace = true
log.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
serde.workspace = true
smallvec.workspace = true
thiserror.workspace = true
triggered.workspace = true
uuid.workspace = true
log.workspace = true
wasm-bindgen.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand All @@ -42,3 +43,6 @@ rand.workspace = true
[[bench]]
name = "bench"
harness = false

[features]
semaphore-trace = []
5 changes: 5 additions & 0 deletions utils/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
pub mod rwlock;
pub(crate) mod semaphore;

#[cfg(feature = "semaphore-trace")]
pub fn semaphore_module_path() -> &'static str {
semaphore::get_module_path()
}
92 changes: 89 additions & 3 deletions utils/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,64 @@ use std::{
time::Duration,
};

#[cfg(feature = "semaphore-trace")]
mod trace {
use super::*;
use log::debug;
use once_cell::sync::Lazy;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;

static SYS_START: Lazy<SystemTime> = Lazy::new(SystemTime::now);

#[inline]
pub(super) fn sys_now() -> u64 {
SystemTime::now().duration_since(*SYS_START).unwrap_or_default().as_micros() as u64
}

#[derive(Debug, Default)]
pub struct TraceInner {
readers_start: AtomicU64,
readers_time: AtomicU64,
log_time: AtomicU64,
log_value: AtomicU64,
}

impl TraceInner {
pub(super) fn mark_readers_start(&self) {
self.readers_start.store(sys_now(), Ordering::Relaxed);
}

pub(super) fn mark_readers_end(&self) {
let start = self.readers_start.load(Ordering::Relaxed);
let now = sys_now();
if start < now {
let readers_time = self.readers_time.fetch_add(now - start, Ordering::Relaxed) + now - start;
let log_time = self.log_time.load(Ordering::Relaxed);
if log_time + (Duration::from_secs(10).as_micros() as u64) < now {
let log_value = self.log_value.load(Ordering::Relaxed);
debug!(
"Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.2}",
Duration::from_micros(now - log_time),
Duration::from_micros(readers_time - log_value),
(readers_time - log_value) as f64 / (now - log_time) as f64
);
self.log_value.store(readers_time, Ordering::Relaxed);
self.log_time.store(now, Ordering::Relaxed);
}
}
}
}
}

#[cfg(feature = "semaphore-trace")]
use trace::*;

#[cfg(feature = "semaphore-trace")]
pub(crate) fn get_module_path() -> &'static str {
module_path!()
}

/// A low-level non-fair semaphore. The semaphore is non-fair in the sense that clients acquiring
/// a lower number of permits might get their allocation before earlier clients which requested more
/// permits -- if the semaphore can provide the lower allocation but not the larger. This non-fairness
Expand All @@ -15,13 +73,28 @@ use std::{
pub(crate) struct Semaphore {
counter: AtomicUsize,
signal: Event,
#[cfg(feature = "semaphore-trace")]
trace_inner: TraceInner,
}

impl Semaphore {
pub const MAX_PERMITS: usize = usize::MAX;

pub const fn new(available_permits: usize) -> Semaphore {
Semaphore { counter: AtomicUsize::new(available_permits), signal: Event::new() }
pub fn new(available_permits: usize) -> Semaphore {
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
trace_inner: Default::default(),
}
} else {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
}
}
}
}

/// Tries to acquire `permits` slots from the semaphore. Upon success, returns the acquired slot
Expand All @@ -33,7 +106,14 @@ impl Semaphore {
}

match self.counter.compare_exchange_weak(count, count - permits, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => return Some(count),
Ok(_) => {
#[cfg(feature = "semaphore-trace")]
if permits == 1 && count == Self::MAX_PERMITS {
// permits == 1 indicates a reader, count == Self::MAX_PERMITS indicates it is the first reader
self.trace_inner.mark_readers_start();
}
return Some(count);
}
Err(c) => count = c,
}
}
Expand Down Expand Up @@ -75,6 +155,12 @@ impl Semaphore {
/// Returns the released slot
pub fn release(&self, permits: usize) -> usize {
let slot = self.counter.fetch_add(permits, Ordering::AcqRel) + permits;

#[cfg(feature = "semaphore-trace")]
if permits == 1 && slot == Self::MAX_PERMITS {
// permits == 1 indicates a reader, slot == Self::MAX_PERMITS indicates it is the last reader
self.trace_inner.mark_readers_end();
}
self.signal.notify(permits);
slot
}
Expand Down

0 comments on commit 261a750

Please sign in to comment.