Skip to content

Commit

Permalink
O(k log n) mempool transaction sampler + Fee estimation API (kaspanet…
Browse files Browse the repository at this point in the history
…#513)

* initial fee estimation logic + a python notebook detailing a challenge

* initial usage of btreeset for ready transactions

* initial frontier + sampling logic

* mempool sampling benchmark (wip)

* Use arc tx rather than tx id in order to save the indirect map access as well as reduce frontier sizes + filter all the top bucket and not only selected ones

* Modify mempool bmk and simnet settings

* Temp: rpc message initial

* Move sample to rand utils

* Fix top bucket sampling to match analysis

* Add outliers to the bmk

* sample comments and doc

* use b plus tree with argument customization in order to implement a highly-efficient O(k log n) one-shot mempool sampling

* todo

* keep a computed weight field

* Test feerate weight queries + an implied fix (change <= to <)

* temp remove warns

* 1. use inner BPlusTree in order to allow access to iterator as double ended
2. reduce collisions by removing the top element from the range if it was hit

* rename

* test btree rev iter

* clamp the query to the bounds (logically)

* use a larger tree for tests, add checks for clamped search bounds

* Add benchmarks for frontier insertions and removals

* add item removal to the queries test

* Important numeric stability improvement: use the visitor api to implement a prefix weight counter to be used for search narrowing

* test highly irregular sampling

* Implement initial selectors + order the code a bit

* Enhance and use the new selectors

* rename

* minor refactor

* minor optimizations etc

* increase default devnet prealloc amount to 100 tkas

* cleanup

* cleanup

* initial build_feerate_estimator

* todos

* minor

* Remove obsolete constant

* Restructure search tree methods into an encapsulated struct

* Rename module

* documentation and comments

* optimization: cmp with cached weight rather than compute feerate

* minor

* Finalize build fee estimator and add tests

* updated notebook

* fee estimator todos

* expose get_realtime_feerate_estimations from the mining manager

* min feerate from config

* sample_inplace doc

* test_total_mass_tracking

* test prefix weights

* test sequence selector

* fix rpc feerate structs + comment

* utils: expiring cache

* rpc core fee estimate call

* fee estimate verbose

* grpc fee estimate calls

* Benchmark worst-case collision cases + an optimization addressing these cases

* Expose SearchTree

* cli support (with @coderofstuff)

* addressing a few minor review comments

* feerate estimator - handle various edge cases (with @tiram88)

* one more test (with @tiram88)

* build_feerate_estimator - fix edge case of not trying the estimator without all frontier txs (+loop logic is more streamlined now)

* monitor feerate estimations (debug print every 10 secs)

* follow rpc naming conventions

* proto leave blank index range

* insert in correct abc location (keeping rest of the array as is for easier omega merge)

* fix comment to reflect the most updated final algo

* document feerate

* update notebook

* add an additional point to normal feerate buckets (between normal and low)

* enum order

* with 1 sec there are rare cases where mempool size does not change and we exit early

* final stuff
  • Loading branch information
michaelsutton authored Aug 16, 2024
1 parent 6bf1c75 commit 958bc64
Show file tree
Hide file tree
Showing 55 changed files with 2,826 additions and 143 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ impl Rpc {
}
}
}
RpcApiOps::GetFeeEstimate => {
let result = rpc.get_fee_estimate_call(GetFeeEstimateRequest {}).await?;
self.println(&ctx, result);
}
RpcApiOps::GetFeeEstimateExperimental => {
let verbose = if argv.is_empty() { false } else { argv.remove(0).parse().unwrap_or(false) };
let result = rpc.get_fee_estimate_experimental_call(GetFeeEstimateExperimentalRequest { verbose }).await?;
self.println(&ctx, result);
}
_ => {
tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n");
return Ok(());
Expand Down
3 changes: 2 additions & 1 deletion consensus/core/src/config/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ pub const SIMNET_PARAMS: Params = Params {
target_time_per_block: Testnet11Bps::target_time_per_block(),
past_median_time_sample_rate: Testnet11Bps::past_median_time_sample_rate(),
difficulty_sample_rate: Testnet11Bps::difficulty_adjustment_sample_rate(),
max_block_parents: Testnet11Bps::max_block_parents(),
// For simnet, we deviate from TN11 configuration and allow at least 64 parents in order to support mempool benchmarks out of the box
max_block_parents: if Testnet11Bps::max_block_parents() > 64 { Testnet11Bps::max_block_parents() } else { 64 },
mergeset_size_limit: Testnet11Bps::mergeset_size_limit(),
merge_depth: Testnet11Bps::merge_depth_bound(),
finality_depth: Testnet11Bps::finality_depth(),
Expand Down
2 changes: 1 addition & 1 deletion kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Default for Args {
#[cfg(feature = "devnet-prealloc")]
prealloc_address: None,
#[cfg(feature = "devnet-prealloc")]
prealloc_amount: 1_000_000,
prealloc_amount: 10_000_000_000,

disable_upnp: false,
disable_dns_seeding: false,
Expand Down
5 changes: 3 additions & 2 deletions kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,16 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm

let (address_manager, port_mapping_extender_svc) = AddressManager::new(config.clone(), meta_db, tick_service.clone());

let mining_monitor = Arc::new(MiningMonitor::new(mining_counters.clone(), tx_script_cache_counters.clone(), tick_service.clone()));
let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new_with_extended_config(
config.target_time_per_block,
false,
config.max_block_mass,
config.ram_scale,
config.block_template_cache_lifetime,
mining_counters,
mining_counters.clone(),
)));
let mining_monitor =
Arc::new(MiningMonitor::new(mining_manager.clone(), mining_counters, tx_script_cache_counters.clone(), tick_service.clone()));

let flow_context = Arc::new(FlowContext::new(
consensus_manager.clone(),
Expand Down
3 changes: 2 additions & 1 deletion mining/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ parking_lot.workspace = true
rand.workspace = true
serde.workspace = true
smallvec.workspace = true
sweep-bptree = "0.4.1"
thiserror.workspace = true
tokio = { workspace = true, features = [ "rt-multi-thread", "macros", "signal" ] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }

[dev-dependencies]
kaspa-txscript.workspace = true
Expand Down
222 changes: 219 additions & 3 deletions mining/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use kaspa_mining::model::topological_index::TopologicalIndex;
use std::collections::{hash_set::Iter, HashMap, HashSet};
use itertools::Itertools;
use kaspa_consensus_core::{
subnets::SUBNETWORK_ID_NATIVE,
tx::{Transaction, TransactionInput, TransactionOutpoint},
};
use kaspa_hashes::{HasherBase, TransactionID};
use kaspa_mining::{model::topological_index::TopologicalIndex, FeerateTransactionKey, Frontier, Policy};
use rand::{thread_rng, Rng};
use std::{
collections::{hash_set::Iter, HashMap, HashSet},
sync::Arc,
};

#[derive(Default)]
pub struct Dag<T>
Expand Down Expand Up @@ -68,5 +78,211 @@ pub fn bench_compare_topological_index_fns(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, bench_compare_topological_index_fns);
fn generate_unique_tx(i: u64) -> Arc<Transaction> {
let mut hasher = TransactionID::new();
let prev = hasher.update(i.to_le_bytes()).clone().finalize();
let input = TransactionInput::new(TransactionOutpoint::new(prev, 0), vec![], 0, 0);
Arc::new(Transaction::new(0, vec![input], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]))
}

fn build_feerate_key(fee: u64, mass: u64, id: u64) -> FeerateTransactionKey {
FeerateTransactionKey::new(fee, mass, generate_unique_tx(id))
}

pub fn bench_mempool_sampling(c: &mut Criterion) {
let mut rng = thread_rng();
let mut group = c.benchmark_group("mempool sampling");
let cap = 1_000_000;
let mut map = HashMap::with_capacity(cap);
for i in 0..cap as u64 {
let fee: u64 = if i % (cap as u64 / 100000) == 0 { 1000000 } else { rng.gen_range(1..10000) };
let mass: u64 = 1650;
let key = build_feerate_key(fee, mass, i);
map.insert(key.tx.id(), key);
}

let len = cap;
let mut frontier = Frontier::default();
for item in map.values().take(len).cloned() {
frontier.insert(item).then_some(()).unwrap();
}
group.bench_function("mempool one-shot sample", |b| {
b.iter(|| {
black_box({
let selected = frontier.sample_inplace(&mut rng, &Policy::new(500_000), &mut 0);
selected.iter().map(|k| k.mass).sum::<u64>()
})
})
});

// Benchmark frontier insertions and removals (see comparisons below)
let remove = map.values().take(map.len() / 10).cloned().collect_vec();
group.bench_function("frontier remove/add", |b| {
b.iter(|| {
black_box({
for r in remove.iter() {
frontier.remove(r).then_some(()).unwrap();
}
for r in remove.iter().cloned() {
frontier.insert(r).then_some(()).unwrap();
}
0
})
})
});

// Benchmark hashmap insertions and removals for comparison
let remove = map.iter().take(map.len() / 10).map(|(&k, v)| (k, v.clone())).collect_vec();
group.bench_function("map remove/add", |b| {
b.iter(|| {
black_box({
for r in remove.iter() {
map.remove(&r.0).unwrap();
}
for r in remove.iter().cloned() {
map.insert(r.0, r.1.clone());
}
0
})
})
});

// Benchmark std btree set insertions and removals for comparison
// Results show that frontier (sweep bptree) and std btree set are roughly the same.
// The slightly higher cost for sweep bptree should be attributed to subtree weight
// maintenance (see FeerateWeight)
#[allow(clippy::mutable_key_type)]
let mut std_btree = std::collections::BTreeSet::from_iter(map.values().cloned());
let remove = map.iter().take(map.len() / 10).map(|(&k, v)| (k, v.clone())).collect_vec();
group.bench_function("std btree remove/add", |b| {
b.iter(|| {
black_box({
for (_, key) in remove.iter() {
std_btree.remove(key).then_some(()).unwrap();
}
for (_, key) in remove.iter() {
std_btree.insert(key.clone());
}
0
})
})
});
group.finish();
}

pub fn bench_mempool_selectors(c: &mut Criterion) {
let mut rng = thread_rng();
let mut group = c.benchmark_group("mempool selectors");
let cap = 1_000_000;
let mut map = HashMap::with_capacity(cap);
for i in 0..cap as u64 {
let fee: u64 = rng.gen_range(1..1000000);
let mass: u64 = 1650;
let key = build_feerate_key(fee, mass, i);
map.insert(key.tx.id(), key);
}

for len in [100, 300, 350, 500, 1000, 2000, 5000, 10_000, 100_000, 500_000, 1_000_000].into_iter().rev() {
let mut frontier = Frontier::default();
for item in map.values().take(len).cloned() {
frontier.insert(item).then_some(()).unwrap();
}

group.bench_function(format!("rebalancing selector ({})", len), |b| {
b.iter(|| {
black_box({
let mut selector = frontier.build_rebalancing_selector();
selector.select_transactions().iter().map(|k| k.gas).sum::<u64>()
})
})
});

let mut collisions = 0;
let mut n = 0;

group.bench_function(format!("sample inplace selector ({})", len), |b| {
b.iter(|| {
black_box({
let mut selector = frontier.build_selector_sample_inplace(&mut collisions);
n += 1;
selector.select_transactions().iter().map(|k| k.gas).sum::<u64>()
})
})
});

if n > 0 {
println!("---------------------- \n Avg collisions: {}", collisions / n);
}

if frontier.total_mass() <= 500_000 {
group.bench_function(format!("take all selector ({})", len), |b| {
b.iter(|| {
black_box({
let mut selector = frontier.build_selector_take_all();
selector.select_transactions().iter().map(|k| k.gas).sum::<u64>()
})
})
});
}

group.bench_function(format!("dynamic selector ({})", len), |b| {
b.iter(|| {
black_box({
let mut selector = frontier.build_selector(&Policy::new(500_000));
selector.select_transactions().iter().map(|k| k.gas).sum::<u64>()
})
})
});
}

group.finish();
}

pub fn bench_inplace_sampling_worst_case(c: &mut Criterion) {
let mut group = c.benchmark_group("mempool inplace sampling");
let max_fee = u64::MAX;
let fee_steps = (0..10).map(|i| max_fee / 100u64.pow(i)).collect_vec();
for subgroup_size in [300, 200, 100, 80, 50, 30] {
let cap = 1_000_000;
let mut map = HashMap::with_capacity(cap);
for i in 0..cap as u64 {
let fee: u64 = if i < 300 { fee_steps[i as usize / subgroup_size] } else { 1 };
let mass: u64 = 1650;
let key = build_feerate_key(fee, mass, i);
map.insert(key.tx.id(), key);
}

let mut frontier = Frontier::default();
for item in map.values().cloned() {
frontier.insert(item).then_some(()).unwrap();
}

let mut collisions = 0;
let mut n = 0;

group.bench_function(format!("inplace sampling worst case (subgroup size: {})", subgroup_size), |b| {
b.iter(|| {
black_box({
let mut selector = frontier.build_selector_sample_inplace(&mut collisions);
n += 1;
selector.select_transactions().iter().map(|k| k.gas).sum::<u64>()
})
})
});

if n > 0 {
println!("---------------------- \n Avg collisions: {}", collisions / n);
}
}

group.finish();
}

criterion_group!(
benches,
bench_mempool_sampling,
bench_mempool_selectors,
bench_inplace_sampling_worst_case,
bench_compare_topological_index_fns
);
criterion_main!(benches);
23 changes: 7 additions & 16 deletions mining/src/block_template/builder.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
use super::{errors::BuilderResult, policy::Policy};
use crate::{block_template::selector::TransactionsSelector, model::candidate_tx::CandidateTransaction};
use super::errors::BuilderResult;
use kaspa_consensus_core::{
api::ConsensusApi,
block::{BlockTemplate, TemplateBuildMode},
block::{BlockTemplate, TemplateBuildMode, TemplateTransactionSelector},
coinbase::MinerData,
merkle::calc_hash_merkle_root,
tx::COINBASE_TRANSACTION_INDEX,
};
use kaspa_core::{
debug,
time::{unix_now, Stopwatch},
};
use kaspa_core::time::{unix_now, Stopwatch};

pub(crate) struct BlockTemplateBuilder {
policy: Policy,
}
pub(crate) struct BlockTemplateBuilder {}

impl BlockTemplateBuilder {
pub(crate) fn new(max_block_mass: u64) -> Self {
let policy = Policy::new(max_block_mass);
Self { policy }
pub(crate) fn new() -> Self {
Self {}
}

/// BuildBlockTemplate creates a block template for a miner to consume
Expand Down Expand Up @@ -89,12 +82,10 @@ impl BlockTemplateBuilder {
&self,
consensus: &dyn ConsensusApi,
miner_data: &MinerData,
transactions: Vec<CandidateTransaction>,
selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> BuilderResult<BlockTemplate> {
let _sw = Stopwatch::<20>::with_threshold("build_block_template op");
debug!("Considering {} transactions for a new block template", transactions.len());
let selector = Box::new(TransactionsSelector::new(self.policy.clone(), transactions));
Ok(consensus.build_block_template(miner_data.clone(), selector, build_mode)?)
}

Expand Down
6 changes: 3 additions & 3 deletions mining/src/block_template/policy.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/// Policy houses the policy (configuration parameters) which is used to control
/// the generation of block templates. See the documentation for
/// NewBlockTemplate for more details on each of these parameters are used.
/// NewBlockTemplate for more details on how each of these parameters are used.
#[derive(Clone)]
pub(crate) struct Policy {
pub struct Policy {
/// max_block_mass is the maximum block mass to be used when generating a block template.
pub(crate) max_block_mass: u64,
}

impl Policy {
pub(crate) fn new(max_block_mass: u64) -> Self {
pub fn new(max_block_mass: u64) -> Self {
Self { max_block_mass }
}
}
Loading

0 comments on commit 958bc64

Please sign in to comment.