diff --git a/Cargo.lock b/Cargo.lock index a951993e9..436481a30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2439,6 +2439,7 @@ dependencies = [ "secp256k1", "serde", "serde_json", + "serde_with", "smallvec", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 7141101f9..094e24984 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -234,6 +234,7 @@ separator = "0.4.1" seqlock = "0.2.0" serde = { version = "1.0.190", features = ["derive", "rc"] } serde_bytes = "0.11.12" +serde_with = "3.11.0" serde_json = "1.0.107" serde_repr = "0.1.18" serde-value = "0.7.0" diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 443e591c8..74376fb77 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -39,6 +39,7 @@ rayon.workspace = true rocksdb.workspace = true secp256k1.workspace = true serde.workspace = true +serde_with.workspace = true smallvec.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/consensus/src/model/stores/block_transactions.rs b/consensus/src/model/stores/block_transactions.rs index 504268288..53aa487e0 100644 --- a/consensus/src/model/stores/block_transactions.rs +++ b/consensus/src/model/stores/block_transactions.rs @@ -1,5 +1,7 @@ +use kaspa_consensus_core::tx::Transaction; +use kaspa_consensus_core::tx::TransactionIndexType; use kaspa_consensus_core::tx::{TransactionInput, TransactionOutput}; -use kaspa_consensus_core::{tx::Transaction, BlockHasher}; +use kaspa_database::prelude::Cache; use kaspa_database::prelude::CachePolicy; use kaspa_database::prelude::StoreError; use kaspa_database::prelude::DB; @@ -9,10 +11,49 @@ use kaspa_hashes::Hash; use kaspa_utils::mem_size::MemSizeEstimator; use rocksdb::WriteBatch; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, Bytes}; +use std::fmt::Display; use std::sync::Arc; +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +struct BlockTransactionFullAccessKey(#[serde_as(as = "Bytes")] [u8; 36]); + +impl Display for BlockTransactionFullAccessKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl BlockTransactionFullAccessKey { + pub fn new(block_hash: &Hash, index: TransactionIndexType) -> Self { + let block_hash_bytes = block_hash.as_bytes(); + let index_bytes = index.to_be_bytes(); + let mut key = std::mem::MaybeUninit::uninit(); + let dest = key.as_mut_ptr() as *mut u8; + Self( + // unsafe, but avoids initializing array with zeros + unsafe { + std::ptr::copy_nonoverlapping(block_hash_bytes.as_ptr(), dest, block_hash_bytes.len()); + std::ptr::copy_nonoverlapping(index_bytes.as_ptr(), dest.add(block_hash_bytes.len()), index_bytes.len()); + key.assume_init() + }, + ) + } +} + +impl AsRef<[u8]> for BlockTransactionFullAccessKey { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct BlockBody(Arc>); + pub trait BlockTransactionsStoreReader { - fn get(&self, hash: Hash) -> Result>, StoreError>; + fn get(&self, block_hash: Hash) -> Result>, StoreError>; + fn get_at_index(&self, block_hash: Hash, index: TransactionIndexType) -> Result; } pub trait BlockTransactionsStore: BlockTransactionsStoreReader { @@ -21,9 +62,6 @@ pub trait BlockTransactionsStore: BlockTransactionsStoreReader { fn delete(&self, hash: Hash) -> Result<(), StoreError>; } -#[derive(Clone, Serialize, Deserialize)] -struct BlockBody(Arc>); - impl MemSizeEstimator for BlockBody { fn estimate_mem_bytes(&self) -> usize { const NORMAL_SIG_SIZE: usize = 66; @@ -45,12 +83,17 @@ impl MemSizeEstimator for BlockBody { #[derive(Clone)] pub struct DbBlockTransactionsStore { db: Arc, - access: CachedDbAccess, + access: CachedDbAccess, + cache: Cache, } impl DbBlockTransactionsStore { pub fn new(db: Arc, cache_policy: CachePolicy) -> Self { - Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::BlockTransactions.into()) } + Self { + db: Arc::clone(&db), + access: CachedDbAccess::new(Arc::clone(&db), CachePolicy::Empty, DatabaseStorePrefixes::BlockTransactions.into()), + cache: Cache::new(cache_policy), + } } pub fn clone_with_new_cache(&self, cache_policy: CachePolicy) -> Self { @@ -58,38 +101,70 @@ impl DbBlockTransactionsStore { } pub fn has(&self, hash: Hash) -> Result { - self.access.has(hash) + Ok(self.cache.contains_key(&hash) || self.access.has_bucket(hash.as_bytes().as_ref())?) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, transactions: Arc>) -> Result<(), StoreError> { - if self.access.has(hash)? { + if self.cache.contains_key(&hash) || self.access.has_bucket(hash.as_bytes().as_ref())? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(BatchDbWriter::new(batch), hash, BlockBody(transactions))?; + let writer = BatchDbWriter::new(batch); + self.cache.insert(hash, BlockBody(transactions.clone())); + self.access.write_many_without_cache( + writer, + &mut transactions + .iter() + .enumerate() + .map(|(index, tx)| (BlockTransactionFullAccessKey::new(&hash, index as TransactionIndexType), tx.clone())), + )?; Ok(()) } pub fn delete_batch(&self, batch: &mut WriteBatch, hash: Hash) -> Result<(), StoreError> { - self.access.delete(BatchDbWriter::new(batch), hash) + self.cache.remove(&hash); + self.access.delete_bucket(BatchDbWriter::new(batch), hash.as_bytes().as_ref()) } } impl BlockTransactionsStoreReader for DbBlockTransactionsStore { fn get(&self, hash: Hash) -> Result>, StoreError> { - Ok(self.access.read(hash)?.0) + self.cache + .get(&hash) + .map(|block_transactions| block_transactions.0.clone()) + .ok_or_else(|| StoreError::BucketNotFound(hash.to_string())); + if self.cache.contains_key(&hash) { + Ok(self.cache.get(&hash).unwrap().0.clone()) + } else { + Ok(Arc::new(self.access.read_bucket(hash.as_bytes().as_ref())?)) + } + } + + fn get_at_index(&self, block_hash: Hash, index: TransactionIndexType) -> Result { + if let Some(block_transactions) = self.cache.get(&block_hash) { + return Ok(block_transactions.0[index as usize].clone()); + } else { + self.access.read(BlockTransactionFullAccessKey::new(&block_hash, index)) + } } } impl BlockTransactionsStore for DbBlockTransactionsStore { fn insert(&self, hash: Hash, transactions: Arc>) -> Result<(), StoreError> { - if self.access.has(hash)? { + if self.access.has_bucket(hash.as_bytes().as_ref())? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(DirectDbWriter::new(&self.db), hash, BlockBody(transactions))?; - Ok(()) + self.cache.insert(hash, BlockBody(transactions.clone())); + self.access.write_many_without_cache( + DirectDbWriter::new(&self.db), + &mut transactions + .iter() + .enumerate() + .map(|(index, tx)| (BlockTransactionFullAccessKey::new(&hash, index as TransactionIndexType), tx.clone())), + ) } fn delete(&self, hash: Hash) -> Result<(), StoreError> { - self.access.delete(DirectDbWriter::new(&self.db), hash) + self.cache.remove(&hash); + self.access.delete_bucket(DirectDbWriter::new(&self.db), hash.as_bytes().as_ref()) } } diff --git a/database/src/access.rs b/database/src/access.rs index ad82197db..381974c62 100644 --- a/database/src/access.rs +++ b/database/src/access.rs @@ -46,6 +46,14 @@ where Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(&self.prefix, key))?.is_some()) } + pub fn has_bucket(&self, bucket: &[u8]) -> Result { + let db_key = DbKey::prefix_only(&self.prefix).with_bucket(bucket); + let mut read_opts = ReadOptions::default(); + read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref())); + + Ok(self.db.iterator_opt(IteratorMode::Start, read_opts).next().is_some()) + } + pub fn read(&self, key: TKey) -> Result where TKey: Clone + AsRef<[u8]> + ToString, @@ -165,6 +173,37 @@ where Ok(()) } + /// Deletes a prefix bucket from the db. + /// Note: This does not clear from the cache. + pub fn delete_bucket(&self, mut writer: impl DbWriter, bucket: &[u8]) -> Result<(), StoreError> +where { + let db_key = DbKey::new_with_bucket(&self.prefix, &bucket, []); + let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds(); + writer.delete_range(from.unwrap(), to.unwrap())?; + Ok(()) + } + + pub fn read_bucket(&self, bucket: &[u8]) -> Result, StoreError> + where + TData: DeserializeOwned, + { + let db_key = DbKey::prefix_only(&self.prefix).with_bucket(bucket); + let mut read_opts = ReadOptions::default(); + read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref())); + + self.db + .iterator_opt(IteratorMode::Start, read_opts) + .into_iter() + .map(|item| match item { + Ok((_, value_bytes)) => match bincode::deserialize::(value_bytes.as_ref()) { + Ok(value) => Ok(value), + Err(err) => Err(err.into()), + }, + Err(err) => Err(err.into()), + }) + .collect() + } + /// A dynamic iterator that can iterate through a specific prefix / bucket, or from a certain start point. //TODO: loop and chain iterators for multi-prefix / bucket iterator. pub fn seek_iterator( diff --git a/database/src/errors.rs b/database/src/errors.rs index 8467c5e65..e24e91486 100644 --- a/database/src/errors.rs +++ b/database/src/errors.rs @@ -7,6 +7,9 @@ pub enum StoreError { #[error("key {0} not found in store")] KeyNotFound(DbKey), + #[error("bucket {0} not found in store")] + BucketNotFound(String), + #[error("key {0} already exists in store")] KeyAlreadyExists(String), diff --git a/database/src/key.rs b/database/src/key.rs index 83fa8ebb2..5baf1649f 100644 --- a/database/src/key.rs +++ b/database/src/key.rs @@ -40,6 +40,15 @@ impl DbKey { self.prefix_len += bucket.as_ref().len(); } + pub fn with_bucket(mut self, bucket: TBucket) -> Self + where + TBucket: Copy + AsRef<[u8]>, + { + self.path.extend(bucket.as_ref().iter().copied()); + self.prefix_len += bucket.as_ref().len(); + self + } + pub fn add_key(&mut self, key: TKey) where TKey: Clone + AsRef<[u8]>,