diff --git a/consensus/src/model/stores/acceptance_data.rs b/consensus/src/model/stores/acceptance_data.rs index 8ee303f8b..557e134da 100644 --- a/consensus/src/model/stores/acceptance_data.rs +++ b/consensus/src/model/stores/acceptance_data.rs @@ -27,40 +27,40 @@ const STORE_PREFIX: &[u8] = b"acceptance-data"; /// A DB + cache implementation of `DbAcceptanceDataStore` trait, with concurrency support. #[derive(Clone)] pub struct DbAcceptanceDataStore { - raw_db: Arc, - cached_access: CachedDbAccess, + db: Arc, + access: CachedDbAccess, BlockHasher>, } impl DbAcceptanceDataStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, acceptance_data: Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), hash, &acceptance_data)?; + self.access.write(BatchDbWriter::new(batch), hash, acceptance_data)?; Ok(()) } } impl AcceptanceDataStoreReader for DbAcceptanceDataStore { fn get(&self, hash: Hash) -> Result, StoreError> { - self.cached_access.read(hash) + self.access.read(hash) } } impl AcceptanceDataStore for DbAcceptanceDataStore { fn insert(&self, hash: Hash, acceptance_data: Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &acceptance_data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, acceptance_data)?; Ok(()) } } diff --git a/consensus/src/model/stores/block_transactions.rs b/consensus/src/model/stores/block_transactions.rs index 11dafdc05..3556081be 100644 --- a/consensus/src/model/stores/block_transactions.rs +++ b/consensus/src/model/stores/block_transactions.rs @@ -23,41 +23,40 @@ const STORE_PREFIX: &[u8] = b"block-transactions"; /// A DB + cache implementation of `BlockTransactionsStore` trait, with concurrency support. #[derive(Clone)] pub struct DbBlockTransactionsStore { - raw_db: Arc, - // `CachedDbAccess` is shallow cloned so no need to wrap with Arc - cached_access: CachedDbAccess, BlockHasher>, + db: Arc, + access: CachedDbAccess>, BlockHasher>, } impl DbBlockTransactionsStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, transactions: Arc>) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), hash, &transactions)?; + self.access.write(BatchDbWriter::new(batch), hash, transactions)?; Ok(()) } } impl BlockTransactionsStoreReader for DbBlockTransactionsStore { fn get(&self, hash: Hash) -> Result>, StoreError> { - self.cached_access.read(hash) + self.access.read(hash) } } impl BlockTransactionsStore for DbBlockTransactionsStore { fn insert(&self, hash: Hash, transactions: Arc>) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &transactions)?; + self.access.write(DirectDbWriter::new(&self.db), hash, transactions)?; Ok(()) } } diff --git a/consensus/src/model/stores/daa.rs b/consensus/src/model/stores/daa.rs index 9122862c2..d9dbe9db9 100644 --- a/consensus/src/model/stores/daa.rs +++ b/consensus/src/model/stores/daa.rs @@ -18,49 +18,45 @@ pub trait DaaStore: DaaStoreReader { fn insert(&self, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError>; } -const ADDED_BLOCKS_STORE_PREFIX: &[u8] = b"daa-added-blocks"; +const STORE_PREFIX: &[u8] = b"daa-added-blocks"; /// A DB + cache implementation of `DaaStore` trait, with concurrency support. #[derive(Clone)] pub struct DbDaaStore { - raw_db: Arc, - // `CachedDbAccess` is shallow cloned so no need to wrap with Arc - cached_daa_added_blocks_access: CachedDbAccess, BlockHasher>, + db: Arc, + access: CachedDbAccess, } impl DbDaaStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { - raw_db: Arc::clone(&db), - cached_daa_added_blocks_access: CachedDbAccess::new(db, cache_size, ADDED_BLOCKS_STORE_PREFIX), - } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError> { - if self.cached_daa_added_blocks_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_daa_added_blocks_access.write(BatchDbWriter::new(batch), hash, &added_blocks)?; + self.access.write(BatchDbWriter::new(batch), hash, added_blocks)?; Ok(()) } } impl DaaStoreReader for DbDaaStore { fn get_daa_added_blocks(&self, hash: Hash) -> Result { - self.cached_daa_added_blocks_access.read(hash) + self.access.read(hash) } } impl DaaStore for DbDaaStore { fn insert(&self, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError> { - if self.cached_daa_added_blocks_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_daa_added_blocks_access.write(DirectDbWriter::new(&self.raw_db), hash, &added_blocks)?; + self.access.write(DirectDbWriter::new(&self.db), hash, added_blocks)?; Ok(()) } } diff --git a/consensus/src/model/stores/database/access.rs b/consensus/src/model/stores/database/access.rs index 5a35b47e9..329d04951 100644 --- a/consensus/src/model/stores/database/access.rs +++ b/consensus/src/model/stores/database/access.rs @@ -3,7 +3,7 @@ use crate::model::stores::{errors::StoreError, DB}; use serde::{de::DeserializeOwned, Serialize}; use std::{collections::hash_map::RandomState, hash::BuildHasher, sync::Arc}; -/// A concurrent DB store with typed caching. +/// A concurrent DB store access with typed caching. #[derive(Clone)] pub struct CachedDbAccess where @@ -13,7 +13,7 @@ where db: Arc, // Cache - cache: Cache, S>, + cache: Cache, // DB bucket/path prefix: &'static [u8], @@ -29,7 +29,7 @@ where Self { db, cache: Cache::new(cache_size), prefix } } - pub fn read_from_cache(&self, key: TKey) -> Option> + pub fn read_from_cache(&self, key: TKey) -> Option where TKey: Copy + AsRef<[u8]>, { @@ -43,7 +43,7 @@ where Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(self.prefix, key))?.is_some()) } - pub fn read(&self, key: TKey) -> Result, StoreError> + pub fn read(&self, key: TKey) -> Result where TKey: Copy + AsRef<[u8]> + ToString, TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime @@ -53,8 +53,8 @@ where } else { let db_key = DbKey::new(self.prefix, key); if let Some(slice) = self.db.get_pinned(&db_key)? { - let data: Arc = Arc::new(bincode::deserialize(&slice)?); - self.cache.insert(key, Arc::clone(&data)); + let data: TData = bincode::deserialize(&slice)?; + self.cache.insert(key, data.clone()); Ok(data) } else { Err(StoreError::KeyNotFound(db_key)) @@ -62,13 +62,13 @@ where } } - pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: &Arc) -> Result<(), StoreError> + pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError> where TKey: Copy + AsRef<[u8]>, TData: Serialize, { - self.cache.insert(key, Arc::clone(data)); - let bin_data = bincode::serialize(data.as_ref())?; + let bin_data = bincode::serialize(&data)?; + self.cache.insert(key, data); writer.put(DbKey::new(self.prefix, key), bin_data)?; Ok(()) } @@ -76,7 +76,7 @@ where pub fn write_many( &self, mut writer: impl DbWriter, - iter: &mut (impl Iterator)> + Clone), + iter: &mut (impl Iterator + Clone), ) -> Result<(), StoreError> where TKey: Copy + AsRef<[u8]>, @@ -85,7 +85,7 @@ where let iter_clone = iter.clone(); self.cache.insert_many(iter); for (key, data) in iter_clone { - let bin_data = bincode::serialize(data.as_ref())?; + let bin_data = bincode::serialize(&data)?; writer.put(DbKey::new(self.prefix, key), bin_data)?; } Ok(()) @@ -112,68 +112,3 @@ where Ok(()) } } - -/// A concurrent DB store with typed caching for `Copy` types. -/// TODO: try and generalize under `CachedDbAccess` -#[derive(Clone)] -pub struct CachedDbAccessForCopy -where - TKey: Clone + std::hash::Hash + Eq + Send + Sync, - TData: Clone + Copy + Send + Sync, -{ - db: Arc, - - // Cache - cache: Cache, - - // DB bucket/path - prefix: &'static [u8], -} - -impl CachedDbAccessForCopy -where - TKey: Clone + std::hash::Hash + Eq + Send + Sync, - TData: Clone + Copy + Send + Sync, - S: BuildHasher + Default, -{ - pub fn new(db: Arc, cache_size: u64, prefix: &'static [u8]) -> Self { - Self { db, cache: Cache::new(cache_size), prefix } - } - - pub fn has(&self, key: TKey) -> Result - where - TKey: Copy + AsRef<[u8]>, - { - Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(self.prefix, key))?.is_some()) - } - - pub fn read(&self, key: TKey) -> Result - where - TKey: Copy + AsRef<[u8]> + ToString, - TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime - { - if let Some(data) = self.cache.get(&key) { - Ok(data) - } else { - let db_key = DbKey::new(self.prefix, key); - if let Some(slice) = self.db.get_pinned(&db_key)? { - let data: TData = bincode::deserialize(&slice)?; - self.cache.insert(key, data); - Ok(data) - } else { - Err(StoreError::KeyNotFound(db_key)) - } - } - } - - pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError> - where - TKey: Copy + AsRef<[u8]>, - TData: Serialize, - { - self.cache.insert(key, data); - let bin_data = bincode::serialize(&data)?; - writer.put(DbKey::new(self.prefix, key), bin_data)?; - Ok(()) - } -} diff --git a/consensus/src/model/stores/database/mod.rs b/consensus/src/model/stores/database/mod.rs index 94bb811e0..da79562ac 100644 --- a/consensus/src/model/stores/database/mod.rs +++ b/consensus/src/model/stores/database/mod.rs @@ -5,7 +5,7 @@ mod key; mod writer; pub mod prelude { - pub use super::access::{CachedDbAccess, CachedDbAccessForCopy}; + pub use super::access::CachedDbAccess; pub use super::cache::Cache; pub use super::item::CachedDbItem; pub use super::key::DbKey; diff --git a/consensus/src/model/stores/depth.rs b/consensus/src/model/stores/depth.rs index e86556f1c..3e635aae6 100644 --- a/consensus/src/model/stores/depth.rs +++ b/consensus/src/model/stores/depth.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use super::{ - database::prelude::{BatchDbWriter, CachedDbAccessForCopy, DirectDbWriter}, + database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}, errors::StoreError, DB, }; @@ -31,17 +31,17 @@ struct BlockDepthInfo { /// A DB + cache implementation of `DepthStore` trait, with concurrency support. #[derive(Clone)] pub struct DbDepthStore { - raw_db: Arc, - cached_access: CachedDbAccessForCopy, + db: Arc, + access: CachedDbAccess, } impl DbDepthStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(db, cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch( @@ -51,30 +51,30 @@ impl DbDepthStore { merge_depth_root: Hash, finality_point: Hash, ) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), hash, BlockDepthInfo { merge_depth_root, finality_point })?; + self.access.write(BatchDbWriter::new(batch), hash, BlockDepthInfo { merge_depth_root, finality_point })?; Ok(()) } } impl DepthStoreReader for DbDepthStore { fn merge_depth_root(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.merge_depth_root) + Ok(self.access.read(hash)?.merge_depth_root) } fn finality_point(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.finality_point) + Ok(self.access.read(hash)?.finality_point) } } impl DepthStore for DbDepthStore { fn insert(&self, hash: Hash, merge_depth_root: Hash, finality_point: Hash) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, BlockDepthInfo { merge_depth_root, finality_point })?; + self.access.write(DirectDbWriter::new(&self.db), hash, BlockDepthInfo { merge_depth_root, finality_point })?; Ok(()) } } diff --git a/consensus/src/model/stores/ghostdag.rs b/consensus/src/model/stores/ghostdag.rs index f5646177a..a58e67da4 100644 --- a/consensus/src/model/stores/ghostdag.rs +++ b/consensus/src/model/stores/ghostdag.rs @@ -1,4 +1,4 @@ -use super::database::prelude::{BatchDbWriter, CachedDbAccess, CachedDbAccessForCopy, DbKey, DirectDbWriter}; +use super::database::prelude::{BatchDbWriter, CachedDbAccess, DbKey, DirectDbWriter}; use super::{errors::StoreError, DB}; use crate::processes::ghostdag::ordering::SortableBlock; use consensus_core::{blockhash::BlockHashes, BlueWorkType}; @@ -215,31 +215,30 @@ const COMPACT_STORE_PREFIX: &[u8] = b"compact-block-ghostdag-data"; /// A DB + cache implementation of `GhostdagStore` trait, with concurrency support. #[derive(Clone)] pub struct DbGhostdagStore { - raw_db: Arc, - // `CachedDbAccess` is shallow cloned so no need to wrap with Arc - cached_access: CachedDbAccess, - compact_cached_access: CachedDbAccessForCopy, + db: Arc, + access: CachedDbAccess, BlockHasher>, + compact_access: CachedDbAccess, } impl DbGhostdagStore { pub fn new(db: Arc, cache_size: u64) -> Self { Self { - raw_db: Arc::clone(&db), - cached_access: CachedDbAccess::new(db.clone(), cache_size, STORE_PREFIX), - compact_cached_access: CachedDbAccessForCopy::new(db, cache_size, COMPACT_STORE_PREFIX), + db: Arc::clone(&db), + access: CachedDbAccess::new(db.clone(), cache_size, STORE_PREFIX), + compact_access: CachedDbAccess::new(db, cache_size, COMPACT_STORE_PREFIX), } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, data: &Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), hash, data)?; - self.compact_cached_access.write( + self.access.write(BatchDbWriter::new(batch), hash, data.clone())?; + self.compact_access.write( BatchDbWriter::new(batch), hash, CompactGhostdagData { blue_score: data.blue_score, blue_work: data.blue_work, selected_parent: data.selected_parent }, @@ -250,53 +249,53 @@ impl DbGhostdagStore { impl GhostdagStoreReader for DbGhostdagStore { fn get_blue_score(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.blue_score) + Ok(self.access.read(hash)?.blue_score) } fn get_blue_work(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.blue_work) + Ok(self.access.read(hash)?.blue_work) } fn get_selected_parent(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.selected_parent) + Ok(self.access.read(hash)?.selected_parent) } fn get_mergeset_blues(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.cached_access.read(hash)?.mergeset_blues)) + Ok(Arc::clone(&self.access.read(hash)?.mergeset_blues)) } fn get_mergeset_reds(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.cached_access.read(hash)?.mergeset_reds)) + Ok(Arc::clone(&self.access.read(hash)?.mergeset_reds)) } fn get_blues_anticone_sizes(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.cached_access.read(hash)?.blues_anticone_sizes)) + Ok(Arc::clone(&self.access.read(hash)?.blues_anticone_sizes)) } fn get_data(&self, hash: Hash) -> Result, StoreError> { - self.cached_access.read(hash) + self.access.read(hash) } fn get_compact_data(&self, hash: Hash) -> Result { - self.compact_cached_access.read(hash) + self.compact_access.read(hash) } fn has(&self, hash: Hash) -> Result { - self.cached_access.has(hash) + self.access.has(hash) } } impl GhostdagStore for DbGhostdagStore { fn insert(&self, hash: Hash, data: Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &data)?; - if self.compact_cached_access.has(hash)? { + self.access.write(DirectDbWriter::new(&self.db), hash, data.clone())?; + if self.compact_access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.compact_cached_access.write( - DirectDbWriter::new(&self.raw_db), + self.compact_access.write( + DirectDbWriter::new(&self.db), hash, CompactGhostdagData { blue_score: data.blue_score, blue_work: data.blue_work, selected_parent: data.selected_parent }, )?; diff --git a/consensus/src/model/stores/headers.rs b/consensus/src/model/stores/headers.rs index b7f75cbe4..7376df87f 100644 --- a/consensus/src/model/stores/headers.rs +++ b/consensus/src/model/stores/headers.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use super::{ - database::prelude::{BatchDbWriter, CachedDbAccess, CachedDbAccessForCopy, DirectDbWriter}, + database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}, errors::StoreError, DB, }; @@ -16,7 +16,7 @@ pub trait HeaderStoreReader { fn get_timestamp(&self, hash: Hash) -> Result; fn get_bits(&self, hash: Hash) -> Result; fn get_header(&self, hash: Hash) -> Result, StoreError>; - fn get_header_with_block_level(&self, hash: Hash) -> Result, StoreError>; + fn get_header_with_block_level(&self, hash: Hash) -> Result; fn get_compact_header_data(&self, hash: Hash) -> Result; } @@ -45,34 +45,30 @@ pub struct CompactHeaderData { /// A DB + cache implementation of `HeaderStore` trait, with concurrency support. #[derive(Clone)] pub struct DbHeadersStore { - raw_db: Arc, - cached_compact_headers_access: CachedDbAccessForCopy, - cached_headers_access: CachedDbAccess, + db: Arc, + compact_headers_access: CachedDbAccess, + headers_access: CachedDbAccess, } impl DbHeadersStore { pub fn new(db: Arc, cache_size: u64) -> Self { Self { - raw_db: Arc::clone(&db), - cached_compact_headers_access: CachedDbAccessForCopy::new(Arc::clone(&db), cache_size, COMPACT_HEADER_DATA_STORE_PREFIX), - cached_headers_access: CachedDbAccess::new(db, cache_size, HEADERS_STORE_PREFIX), + db: Arc::clone(&db), + compact_headers_access: CachedDbAccess::new(Arc::clone(&db), cache_size, COMPACT_HEADER_DATA_STORE_PREFIX), + headers_access: CachedDbAccess::new(db, cache_size, HEADERS_STORE_PREFIX), } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, header: Arc
, block_level: u8) -> Result<(), StoreError> { - if self.cached_headers_access.has(hash)? { + if self.headers_access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_headers_access.write( - BatchDbWriter::new(batch), - hash, - &Arc::new(HeaderWithBlockLevel { header: header.clone(), block_level }), - )?; - self.cached_compact_headers_access.write( + self.headers_access.write(BatchDbWriter::new(batch), hash, HeaderWithBlockLevel { header: header.clone(), block_level })?; + self.compact_headers_access.write( BatchDbWriter::new(batch), hash, CompactHeaderData { @@ -88,43 +84,43 @@ impl DbHeadersStore { impl HeaderStoreReader for DbHeadersStore { fn get_daa_score(&self, hash: Hash) -> Result { - if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) { return Ok(header_with_block_level.header.daa_score); } - Ok(self.cached_compact_headers_access.read(hash)?.daa_score) + Ok(self.compact_headers_access.read(hash)?.daa_score) } fn get_blue_score(&self, hash: Hash) -> Result { - if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) { return Ok(header_with_block_level.header.blue_score); } - Ok(self.cached_compact_headers_access.read(hash)?.blue_score) + Ok(self.compact_headers_access.read(hash)?.blue_score) } fn get_timestamp(&self, hash: Hash) -> Result { - if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) { return Ok(header_with_block_level.header.timestamp); } - Ok(self.cached_compact_headers_access.read(hash)?.timestamp) + Ok(self.compact_headers_access.read(hash)?.timestamp) } fn get_bits(&self, hash: Hash) -> Result { - if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) { return Ok(header_with_block_level.header.bits); } - Ok(self.cached_compact_headers_access.read(hash)?.bits) + Ok(self.compact_headers_access.read(hash)?.bits) } fn get_header(&self, hash: Hash) -> Result, StoreError> { - Ok(self.cached_headers_access.read(hash)?.header.clone()) + Ok(self.headers_access.read(hash)?.header) } - fn get_header_with_block_level(&self, hash: Hash) -> Result, StoreError> { - self.cached_headers_access.read(hash) + fn get_header_with_block_level(&self, hash: Hash) -> Result { + self.headers_access.read(hash) } fn get_compact_header_data(&self, hash: Hash) -> Result { - if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + if let Some(header_with_block_level) = self.headers_access.read_from_cache(hash) { return Ok(CompactHeaderData { daa_score: header_with_block_level.header.daa_score, timestamp: header_with_block_level.header.timestamp, @@ -132,17 +128,17 @@ impl HeaderStoreReader for DbHeadersStore { blue_score: header_with_block_level.header.blue_score, }); } - self.cached_compact_headers_access.read(hash) + self.compact_headers_access.read(hash) } } impl HeaderStore for DbHeadersStore { fn insert(&self, hash: Hash, header: Arc
, block_level: u8) -> Result<(), StoreError> { - if self.cached_headers_access.has(hash)? { + if self.headers_access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_compact_headers_access.write( - DirectDbWriter::new(&self.raw_db), + self.compact_headers_access.write( + DirectDbWriter::new(&self.db), hash, CompactHeaderData { daa_score: header.daa_score, @@ -151,11 +147,7 @@ impl HeaderStore for DbHeadersStore { blue_score: header.blue_score, }, )?; - self.cached_headers_access.write( - DirectDbWriter::new(&self.raw_db), - hash, - &Arc::new(HeaderWithBlockLevel { header, block_level }), - )?; + self.headers_access.write(DirectDbWriter::new(&self.db), hash, HeaderWithBlockLevel { header, block_level })?; Ok(()) } } diff --git a/consensus/src/model/stores/headers_selected_tip.rs b/consensus/src/model/stores/headers_selected_tip.rs index a06f115c5..d9a42c5d3 100644 --- a/consensus/src/model/stores/headers_selected_tip.rs +++ b/consensus/src/model/stores/headers_selected_tip.rs @@ -21,32 +21,32 @@ pub const STORE_NAME: &[u8] = b"headers-selected-tip"; /// A DB + cache implementation of `HeadersSelectedTipStore` trait #[derive(Clone)] pub struct DbHeadersSelectedTipStore { - raw_db: Arc, - cached_access: CachedDbItem, + db: Arc, + access: CachedDbItem, } impl DbHeadersSelectedTipStore { pub fn new(db: Arc) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbItem::new(db.clone(), STORE_NAME) } + Self { db: Arc::clone(&db), access: CachedDbItem::new(db.clone(), STORE_NAME) } } pub fn clone_with_new_cache(&self) -> Self { - Self::new(Arc::clone(&self.raw_db)) + Self::new(Arc::clone(&self.db)) } pub fn set_batch(&mut self, batch: &mut WriteBatch, block: SortableBlock) -> StoreResult<()> { - self.cached_access.write(BatchDbWriter::new(batch), &block) + self.access.write(BatchDbWriter::new(batch), &block) } } impl HeadersSelectedTipStoreReader for DbHeadersSelectedTipStore { fn get(&self) -> StoreResult { - self.cached_access.read() + self.access.read() } } impl HeadersSelectedTipStore for DbHeadersSelectedTipStore { fn set(&mut self, block: SortableBlock) -> StoreResult<()> { - self.cached_access.write(DirectDbWriter::new(&self.raw_db), &block) + self.access.write(DirectDbWriter::new(&self.db), &block) } } diff --git a/consensus/src/model/stores/past_pruning_points.rs b/consensus/src/model/stores/past_pruning_points.rs index ded9058e6..c3e3515ca 100644 --- a/consensus/src/model/stores/past_pruning_points.rs +++ b/consensus/src/model/stores/past_pruning_points.rs @@ -1,7 +1,7 @@ use std::{fmt::Display, sync::Arc}; use super::{ - database::prelude::{BatchDbWriter, CachedDbAccessForCopy, DirectDbWriter}, + database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}, errors::{StoreError, StoreResult}, DB, }; @@ -43,40 +43,40 @@ const STORE_PREFIX: &[u8] = b"past-pruning-points"; /// A DB + cache implementation of `PastPruningPointsStore` trait, with concurrency support. #[derive(Clone)] pub struct DbPastPruningPointsStore { - raw_db: Arc, - cached_access: CachedDbAccessForCopy, + db: Arc, + access: CachedDbAccess, } impl DbPastPruningPointsStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, index: u64, pruning_point: Hash) -> Result<(), StoreError> { - if self.cached_access.has(index.into())? { + if self.access.has(index.into())? { return Err(StoreError::KeyAlreadyExists(index.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), index.into(), pruning_point)?; + self.access.write(BatchDbWriter::new(batch), index.into(), pruning_point)?; Ok(()) } } impl PastPruningPointsStoreReader for DbPastPruningPointsStore { fn get(&self, index: u64) -> StoreResult { - self.cached_access.read(index.into()) + self.access.read(index.into()) } } impl PastPruningPointsStore for DbPastPruningPointsStore { fn insert(&self, index: u64, pruning_point: Hash) -> StoreResult<()> { - if self.cached_access.has(index.into())? { + if self.access.has(index.into())? { return Err(StoreError::KeyAlreadyExists(index.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), index.into(), pruning_point)?; + self.access.write(DirectDbWriter::new(&self.db), index.into(), pruning_point)?; Ok(()) } } diff --git a/consensus/src/model/stores/pruning.rs b/consensus/src/model/stores/pruning.rs index 53a665c7f..c60a2a4de 100644 --- a/consensus/src/model/stores/pruning.rs +++ b/consensus/src/model/stores/pruning.rs @@ -45,46 +45,46 @@ pub trait PruningStore: PruningStoreReader { /// A DB + cache implementation of `PruningStore` trait, with concurrent readers support. #[derive(Clone)] pub struct DbPruningStore { - raw_db: Arc, - cached_access: CachedDbItem, + db: Arc, + access: CachedDbItem, } const PRUNING_POINT_KEY: &[u8] = b"pruning-point"; impl DbPruningStore { pub fn new(db: Arc) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbItem::new(db.clone(), PRUNING_POINT_KEY) } + Self { db: Arc::clone(&db), access: CachedDbItem::new(db.clone(), PRUNING_POINT_KEY) } } pub fn clone_with_new_cache(&self) -> Self { - Self::new(Arc::clone(&self.raw_db)) + Self::new(Arc::clone(&self.db)) } pub fn set_batch(&mut self, batch: &mut WriteBatch, pruning_point: Hash, candidate: Hash, index: u64) -> StoreResult<()> { - self.cached_access.write(BatchDbWriter::new(batch), &PruningPointInfo { pruning_point, candidate, index }) + self.access.write(BatchDbWriter::new(batch), &PruningPointInfo { pruning_point, candidate, index }) } } impl PruningStoreReader for DbPruningStore { fn pruning_point(&self) -> StoreResult { - Ok(self.cached_access.read()?.pruning_point) + Ok(self.access.read()?.pruning_point) } fn pruning_point_candidate(&self) -> StoreResult { - Ok(self.cached_access.read()?.candidate) + Ok(self.access.read()?.candidate) } fn pruning_point_index(&self) -> StoreResult { - Ok(self.cached_access.read()?.index) + Ok(self.access.read()?.index) } fn get(&self) -> StoreResult { - self.cached_access.read() + self.access.read() } } impl PruningStore for DbPruningStore { fn set(&mut self, pruning_point: Hash, candidate: Hash, index: u64) -> StoreResult<()> { - self.cached_access.write(DirectDbWriter::new(&self.raw_db), &PruningPointInfo::new(pruning_point, candidate, index)) + self.access.write(DirectDbWriter::new(&self.db), &PruningPointInfo::new(pruning_point, candidate, index)) } } diff --git a/consensus/src/model/stores/reachability.rs b/consensus/src/model/stores/reachability.rs index 6bcff6cb5..95d8c3a5e 100644 --- a/consensus/src/model/stores/reachability.rs +++ b/consensus/src/model/stores/reachability.rs @@ -59,77 +59,77 @@ const STORE_PREFIX: &[u8] = b"reachability-data"; /// A DB + cache implementation of `ReachabilityStore` trait, with concurrent readers support. #[derive(Clone)] pub struct DbReachabilityStore { - raw_db: Arc, - cached_access: CachedDbAccess, + db: Arc, + access: CachedDbAccess, BlockHasher>, reindex_root: CachedDbItem, } impl DbReachabilityStore { pub fn new(db: Arc, cache_size: u64) -> Self { Self { - raw_db: Arc::clone(&db), - cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX), + db: Arc::clone(&db), + access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX), reindex_root: CachedDbItem::new(db, REINDEX_ROOT_KEY), } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } } impl ReachabilityStore for DbReachabilityStore { fn init(&mut self, origin: Hash, capacity: Interval) -> Result<(), StoreError> { - debug_assert!(!self.cached_access.has(origin)?); + debug_assert!(!self.access.has(origin)?); let data = Arc::new(ReachabilityData::new(blockhash::NONE, capacity, 0)); let mut batch = WriteBatch::default(); - self.cached_access.write(BatchDbWriter::new(&mut batch), origin, &data)?; + self.access.write(BatchDbWriter::new(&mut batch), origin, data)?; self.reindex_root.write(BatchDbWriter::new(&mut batch), &origin)?; - self.raw_db.write(batch)?; + self.db.write(batch)?; Ok(()) } fn insert(&mut self, hash: Hash, parent: Hash, interval: Interval, height: u64) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } let data = Arc::new(ReachabilityData::new(parent, interval, height)); - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } fn set_interval(&mut self, hash: Hash, interval: Interval) -> Result<(), StoreError> { - let mut data = self.cached_access.read(hash)?; + let mut data = self.access.read(hash)?; Arc::make_mut(&mut data).interval = interval; - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } fn append_child(&mut self, hash: Hash, child: Hash) -> Result { - let mut data = self.cached_access.read(hash)?; + let mut data = self.access.read(hash)?; let height = data.height; let mut_data = Arc::make_mut(&mut data); Arc::make_mut(&mut mut_data.children).push(child); - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(height) } fn insert_future_covering_item(&mut self, hash: Hash, fci: Hash, insertion_index: usize) -> Result<(), StoreError> { - let mut data = self.cached_access.read(hash)?; + let mut data = self.access.read(hash)?; let mut_data = Arc::make_mut(&mut data); Arc::make_mut(&mut mut_data.future_covering_set).insert(insertion_index, fci); - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, data)?; Ok(()) } fn get_height(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.height) + Ok(self.access.read(hash)?.height) } fn set_reindex_root(&mut self, root: Hash) -> Result<(), StoreError> { - self.reindex_root.write(DirectDbWriter::new(&self.raw_db), &root) + self.reindex_root.write(DirectDbWriter::new(&self.db), &root) } fn get_reindex_root(&self) -> Result { @@ -139,23 +139,23 @@ impl ReachabilityStore for DbReachabilityStore { impl ReachabilityStoreReader for DbReachabilityStore { fn has(&self, hash: Hash) -> Result { - self.cached_access.has(hash) + self.access.has(hash) } fn get_interval(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.interval) + Ok(self.access.read(hash)?.interval) } fn get_parent(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.parent) + Ok(self.access.read(hash)?.parent) } fn get_children(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.cached_access.read(hash)?.children)) + Ok(Arc::clone(&self.access.read(hash)?.children)) } fn get_future_covering_set(&self, hash: Hash) -> Result { - Ok(Arc::clone(&self.cached_access.read(hash)?.future_covering_set)) + Ok(Arc::clone(&self.access.read(hash)?.future_covering_set)) } } @@ -174,7 +174,7 @@ impl<'a> StagingReachabilityStore<'a> { let mut store_write = RwLockUpgradableReadGuard::upgrade(self.store_read); for (k, v) in self.staging_writes { let data = Arc::new(v); - store_write.cached_access.write(BatchDbWriter::new(batch), k, &data)? + store_write.access.write(BatchDbWriter::new(batch), k, data)? } if let Some(root) = self.staging_reindex_root { store_write.reindex_root.write(BatchDbWriter::new(batch), &root)?; @@ -208,7 +208,7 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { return Ok(()); } - let mut data = (*self.store_read.cached_access.read(hash)?).clone(); + let mut data = (*self.store_read.access.read(hash)?).clone(); data.interval = interval; self.staging_writes.insert(hash, data); @@ -221,7 +221,7 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { return Ok(data.height); } - let mut data = (*self.store_read.cached_access.read(hash)?).clone(); + let mut data = (*self.store_read.access.read(hash)?).clone(); let height = data.height; Arc::make_mut(&mut data.children).push(child); self.staging_writes.insert(hash, data); @@ -235,7 +235,7 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { return Ok(()); } - let mut data = (*self.store_read.cached_access.read(hash)?).clone(); + let mut data = (*self.store_read.access.read(hash)?).clone(); Arc::make_mut(&mut data.future_covering_set).insert(insertion_index, fci); self.staging_writes.insert(hash, data); @@ -246,7 +246,7 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { if let Some(data) = self.staging_writes.get(&hash) { Ok(data.height) } else { - Ok(self.store_read.cached_access.read(hash)?.height) + Ok(self.store_read.access.read(hash)?.height) } } @@ -266,14 +266,14 @@ impl ReachabilityStore for StagingReachabilityStore<'_> { impl ReachabilityStoreReader for StagingReachabilityStore<'_> { fn has(&self, hash: Hash) -> Result { - Ok(self.staging_writes.contains_key(&hash) || self.store_read.cached_access.has(hash)?) + Ok(self.staging_writes.contains_key(&hash) || self.store_read.access.has(hash)?) } fn get_interval(&self, hash: Hash) -> Result { if let Some(data) = self.staging_writes.get(&hash) { Ok(data.interval) } else { - Ok(self.store_read.cached_access.read(hash)?.interval) + Ok(self.store_read.access.read(hash)?.interval) } } @@ -281,7 +281,7 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { if let Some(data) = self.staging_writes.get(&hash) { Ok(data.parent) } else { - Ok(self.store_read.cached_access.read(hash)?.parent) + Ok(self.store_read.access.read(hash)?.parent) } } @@ -289,7 +289,7 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { if let Some(data) = self.staging_writes.get(&hash) { Ok(BlockHashes::clone(&data.children)) } else { - Ok(BlockHashes::clone(&self.store_read.cached_access.read(hash)?.children)) + Ok(BlockHashes::clone(&self.store_read.access.read(hash)?.children)) } } @@ -297,7 +297,7 @@ impl ReachabilityStoreReader for StagingReachabilityStore<'_> { if let Some(data) = self.staging_writes.get(&hash) { Ok(BlockHashes::clone(&data.future_covering_set)) } else { - Ok(BlockHashes::clone(&self.store_read.cached_access.read(hash)?.future_covering_set)) + Ok(BlockHashes::clone(&self.store_read.access.read(hash)?.future_covering_set)) } } } diff --git a/consensus/src/model/stores/relations.rs b/consensus/src/model/stores/relations.rs index d3756eab2..d87073cff 100644 --- a/consensus/src/model/stores/relations.rs +++ b/consensus/src/model/stores/relations.rs @@ -30,22 +30,22 @@ const CHILDREN_PREFIX: &[u8] = b"block-children"; /// A DB + cache implementation of `RelationsStore` trait, with concurrent readers support. #[derive(Clone)] pub struct DbRelationsStore { - raw_db: Arc, - parents_access: CachedDbAccess, BlockHasher>, - children_access: CachedDbAccess, BlockHasher>, + db: Arc, + parents_access: CachedDbAccess>, BlockHasher>, + children_access: CachedDbAccess>, BlockHasher>, } impl DbRelationsStore { pub fn new(db: Arc, cache_size: u64) -> Self { Self { - raw_db: Arc::clone(&db), + db: Arc::clone(&db), parents_access: CachedDbAccess::new(Arc::clone(&db), cache_size, PARENTS_PREFIX), children_access: CachedDbAccess::new(db, cache_size, CHILDREN_PREFIX), } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } // Should be kept private and used only through `RelationsStoreBatchExtensions.insert_batch` @@ -55,16 +55,16 @@ impl DbRelationsStore { } // Insert a new entry for `hash` - self.parents_access.write(BatchDbWriter::new(batch), hash, &parents)?; + self.parents_access.write(BatchDbWriter::new(batch), hash, parents.clone())?; // The new hash has no children yet - self.children_access.write(BatchDbWriter::new(batch), hash, &BlockHashes::new(Vec::new()))?; + self.children_access.write(BatchDbWriter::new(batch), hash, BlockHashes::new(Vec::new()))?; // Update `children` for each parent for parent in parents.iter().cloned() { let mut children = (*self.get_children(parent)?).clone(); children.push(hash); - self.children_access.write(BatchDbWriter::new(batch), parent, &BlockHashes::new(children))?; + self.children_access.write(BatchDbWriter::new(batch), parent, BlockHashes::new(children))?; } Ok(()) @@ -114,22 +114,23 @@ impl RelationsStoreReader for DbRelationsStore { impl RelationsStore for DbRelationsStore { /// See `insert_batch` as well + /// TODO: use one function with DbWriter for both this function and insert_batch fn insert(&mut self, hash: Hash, parents: BlockHashes) -> Result<(), StoreError> { if self.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } // Insert a new entry for `hash` - self.parents_access.write(DirectDbWriter::new(&self.raw_db), hash, &parents)?; + self.parents_access.write(DirectDbWriter::new(&self.db), hash, parents.clone())?; // The new hash has no children yet - self.children_access.write(DirectDbWriter::new(&self.raw_db), hash, &BlockHashes::new(Vec::new()))?; + self.children_access.write(DirectDbWriter::new(&self.db), hash, BlockHashes::new(Vec::new()))?; // Update `children` for each parent for parent in parents.iter().cloned() { let mut children = (*self.get_children(parent)?).clone(); children.push(hash); - self.children_access.write(DirectDbWriter::new(&self.raw_db), parent, &BlockHashes::new(children))?; + self.children_access.write(DirectDbWriter::new(&self.db), parent, BlockHashes::new(children))?; } Ok(()) diff --git a/consensus/src/model/stores/statuses.rs b/consensus/src/model/stores/statuses.rs index 8553bdd6e..8a05ee205 100644 --- a/consensus/src/model/stores/statuses.rs +++ b/consensus/src/model/stores/statuses.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use super::{ - database::prelude::{BatchDbWriter, CachedDbAccessForCopy, DirectDbWriter}, + database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}, errors::{StoreError, StoreResult}, DB, }; @@ -59,17 +59,17 @@ const STORE_PREFIX: &[u8] = b"block-statuses"; /// A DB + cache implementation of `StatusesStore` trait, with concurrent readers support. #[derive(Clone)] pub struct DbStatusesStore { - raw_db: Arc, - cached_access: CachedDbAccessForCopy, + db: Arc, + access: CachedDbAccess, } impl DbStatusesStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(db, cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } } @@ -90,23 +90,23 @@ impl StatusesStoreBatchExtensions for Arc> { status: BlockStatus, ) -> Result, StoreError> { let write_guard = self.write(); - write_guard.cached_access.write(BatchDbWriter::new(batch), hash, status)?; + write_guard.access.write(BatchDbWriter::new(batch), hash, status)?; Ok(write_guard) } } impl StatusesStoreReader for DbStatusesStore { fn get(&self, hash: Hash) -> StoreResult { - self.cached_access.read(hash) + self.access.read(hash) } fn has(&self, hash: Hash) -> StoreResult { - self.cached_access.has(hash) + self.access.has(hash) } } impl StatusesStore for DbStatusesStore { fn set(&mut self, hash: Hash, status: BlockStatus) -> StoreResult<()> { - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, status) + self.access.write(DirectDbWriter::new(&self.db), hash, status) } } diff --git a/consensus/src/model/stores/tips.rs b/consensus/src/model/stores/tips.rs index e983959c6..8a29721f7 100644 --- a/consensus/src/model/stores/tips.rs +++ b/consensus/src/model/stores/tips.rs @@ -23,21 +23,21 @@ pub const STORE_NAME: &[u8] = b"body-tips"; /// A DB + cache implementation of `TipsStore` trait #[derive(Clone)] pub struct DbTipsStore { - raw_db: Arc, - cached_access: CachedDbItem>, + db: Arc, + access: CachedDbItem>, } impl DbTipsStore { pub fn new(db: Arc) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbItem::new(db.clone(), STORE_NAME) } + Self { db: Arc::clone(&db), access: CachedDbItem::new(db.clone(), STORE_NAME) } } pub fn clone_with_new_cache(&self) -> Self { - Self::new(Arc::clone(&self.raw_db)) + Self::new(Arc::clone(&self.db)) } pub fn init_batch(&mut self, batch: &mut WriteBatch, initial_tips: &[Hash]) -> StoreResult<()> { - self.cached_access.write(BatchDbWriter::new(batch), &Arc::new(BlockHashSet::from_iter(initial_tips.iter().copied()))) + self.access.write(BatchDbWriter::new(batch), &Arc::new(BlockHashSet::from_iter(initial_tips.iter().copied()))) } pub fn add_tip_batch( @@ -46,7 +46,7 @@ impl DbTipsStore { new_tip: Hash, new_tip_parents: &[Hash], ) -> StoreResult> { - self.cached_access.update(BatchDbWriter::new(batch), |tips| update_tips(tips, new_tip_parents, new_tip)) + self.access.update(BatchDbWriter::new(batch), |tips| update_tips(tips, new_tip_parents, new_tip)) } } @@ -62,13 +62,13 @@ fn update_tips(mut current_tips: Arc, new_tip_parents: &[Hash], ne impl TipsStoreReader for DbTipsStore { fn get(&self) -> StoreResult> { - self.cached_access.read() + self.access.read() } } impl TipsStore for DbTipsStore { fn add_tip(&mut self, new_tip: Hash, new_tip_parents: &[Hash]) -> StoreResult> { - self.cached_access.update(DirectDbWriter::new(&self.raw_db), |tips| update_tips(tips, new_tip_parents, new_tip)) + self.access.update(DirectDbWriter::new(&self.db), |tips| update_tips(tips, new_tip_parents, new_tip)) } } diff --git a/consensus/src/model/stores/utxo_diffs.rs b/consensus/src/model/stores/utxo_diffs.rs index ddc1a7b03..890c4bf09 100644 --- a/consensus/src/model/stores/utxo_diffs.rs +++ b/consensus/src/model/stores/utxo_diffs.rs @@ -28,40 +28,40 @@ const STORE_PREFIX: &[u8] = b"utxo-diffs"; /// A DB + cache implementation of `UtxoDifferencesStore` trait, with concurrency support. #[derive(Clone)] pub struct DbUtxoDiffsStore { - raw_db: Arc, - cached_access: CachedDbAccess, + db: Arc, + access: CachedDbAccess, BlockHasher>, } impl DbUtxoDiffsStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, utxo_diff: Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(BatchDbWriter::new(batch), hash, &utxo_diff)?; + self.access.write(BatchDbWriter::new(batch), hash, utxo_diff)?; Ok(()) } } impl UtxoDiffsStoreReader for DbUtxoDiffsStore { fn get(&self, hash: Hash) -> Result, StoreError> { - self.cached_access.read(hash) + self.access.read(hash) } } impl UtxoDiffsStore for DbUtxoDiffsStore { fn insert(&self, hash: Hash, utxo_diff: Arc) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &utxo_diff)?; + self.access.write(DirectDbWriter::new(&self.db), hash, utxo_diff)?; Ok(()) } } diff --git a/consensus/src/model/stores/utxo_multisets.rs b/consensus/src/model/stores/utxo_multisets.rs index 190c60eea..90a415d36 100644 --- a/consensus/src/model/stores/utxo_multisets.rs +++ b/consensus/src/model/stores/utxo_multisets.rs @@ -1,5 +1,5 @@ use super::{ - database::prelude::{BatchDbWriter, CachedDbAccessForCopy, DirectDbWriter}, + database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}, errors::StoreError, DB, }; @@ -23,48 +23,40 @@ const STORE_PREFIX: &[u8] = b"utxo-multisets"; /// A DB + cache implementation of `DbUtxoMultisetsStore` trait, with concurrency support. #[derive(Clone)] pub struct DbUtxoMultisetsStore { - raw_db: Arc, - cached_access: CachedDbAccessForCopy, + db: Arc, + access: CachedDbAccess, } impl DbUtxoMultisetsStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size) + Self::new(Arc::clone(&self.db), cache_size) } pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, multiset: MuHash) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write( - BatchDbWriter::new(batch), - hash, - multiset.try_into().expect("multiset is expected to be finalized"), - )?; + self.access.write(BatchDbWriter::new(batch), hash, multiset.try_into().expect("multiset is expected to be finalized"))?; Ok(()) } } impl UtxoMultisetsStoreReader for DbUtxoMultisetsStore { fn get(&self, hash: Hash) -> Result { - Ok(self.cached_access.read(hash)?.into()) + Ok(self.access.read(hash)?.into()) } } impl UtxoMultisetsStore for DbUtxoMultisetsStore { fn insert(&self, hash: Hash, multiset: MuHash) -> Result<(), StoreError> { - if self.cached_access.has(hash)? { + if self.access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_access.write( - DirectDbWriter::new(&self.raw_db), - hash, - multiset.try_into().expect("multiset is expected to be finalized"), - )?; + self.access.write(DirectDbWriter::new(&self.db), hash, multiset.try_into().expect("multiset is expected to be finalized"))?; Ok(()) } } diff --git a/consensus/src/model/stores/utxo_set.rs b/consensus/src/model/stores/utxo_set.rs index 4352cce60..4a491176b 100644 --- a/consensus/src/model/stores/utxo_set.rs +++ b/consensus/src/model/stores/utxo_set.rs @@ -62,24 +62,24 @@ impl From for TransactionOutpoint { #[derive(Clone)] pub struct DbUtxoSetStore { - raw_db: Arc, + db: Arc, prefix: &'static [u8], - cached_access: CachedDbAccess, + access: CachedDbAccess>, } impl DbUtxoSetStore { pub fn new(db: Arc, cache_size: u64, prefix: &'static [u8]) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, prefix), prefix } + Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, prefix), prefix } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { - Self::new(Arc::clone(&self.raw_db), cache_size, self.prefix) + Self::new(Arc::clone(&self.db), cache_size, self.prefix) } pub fn write_diff_batch(&self, batch: &mut WriteBatch, utxo_diff: &impl ImmutableUtxoDiff) -> Result<(), StoreError> { let mut writer = BatchDbWriter::new(batch); - self.cached_access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; - self.cached_access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; + self.access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; + self.access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; Ok(()) } } @@ -92,15 +92,15 @@ impl UtxoView for DbUtxoSetStore { impl UtxoSetStoreReader for DbUtxoSetStore { fn get(&self, outpoint: &TransactionOutpoint) -> Result, StoreError> { - self.cached_access.read((*outpoint).into()) + self.access.read((*outpoint).into()) } } impl UtxoSetStore for DbUtxoSetStore { fn write_diff(&self, utxo_diff: &UtxoDiff) -> Result<(), StoreError> { - let mut writer = DirectDbWriter::new(&self.raw_db); - self.cached_access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; - self.cached_access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; + let mut writer = DirectDbWriter::new(&self.db); + self.access.delete_many(&mut writer, &mut utxo_diff.removed().keys().map(|o| (*o).into()))?; + self.access.write_many(&mut writer, &mut utxo_diff.added().iter().map(|(o, e)| ((*o).into(), Arc::new(e.clone()))))?; Ok(()) } } diff --git a/consensus/src/model/stores/virtual_state.rs b/consensus/src/model/stores/virtual_state.rs index 56c44bfcd..07c5d0c6e 100644 --- a/consensus/src/model/stores/virtual_state.rs +++ b/consensus/src/model/stores/virtual_state.rs @@ -52,32 +52,32 @@ const STORE_PREFIX: &[u8] = b"virtual-state"; /// A DB + cache implementation of `VirtualStateStore` trait #[derive(Clone)] pub struct DbVirtualStateStore { - raw_db: Arc, - cached_access: CachedDbItem>, + db: Arc, + access: CachedDbItem>, } impl DbVirtualStateStore { pub fn new(db: Arc) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbItem::new(db.clone(), STORE_PREFIX) } + Self { db: Arc::clone(&db), access: CachedDbItem::new(db.clone(), STORE_PREFIX) } } pub fn clone_with_new_cache(&self) -> Self { - Self::new(Arc::clone(&self.raw_db)) + Self::new(Arc::clone(&self.db)) } pub fn set_batch(&mut self, batch: &mut WriteBatch, state: VirtualState) -> StoreResult<()> { - self.cached_access.write(BatchDbWriter::new(batch), &Arc::new(state)) + self.access.write(BatchDbWriter::new(batch), &Arc::new(state)) } } impl VirtualStateStoreReader for DbVirtualStateStore { fn get(&self) -> StoreResult> { - self.cached_access.read() + self.access.read() } } impl VirtualStateStore for DbVirtualStateStore { fn set(&mut self, state: VirtualState) -> StoreResult<()> { - self.cached_access.write(DirectDbWriter::new(&self.raw_db), &Arc::new(state)) + self.access.write(DirectDbWriter::new(&self.db), &Arc::new(state)) } } diff --git a/consensus/src/processes/parents_builder.rs b/consensus/src/processes/parents_builder.rs index 26c95504a..6be89d9f8 100644 --- a/consensus/src/processes/parents_builder.rs +++ b/consensus/src/processes/parents_builder.rs @@ -210,7 +210,7 @@ mod tests { use parking_lot::RwLock; struct HeaderStoreMock { - map: RwLock>>, + map: RwLock>, } impl HeaderStoreMock { @@ -245,7 +245,7 @@ mod tests { todo!() } - fn get_header_with_block_level(&self, hash: hashes::Hash) -> Result, StoreError> { + fn get_header_with_block_level(&self, hash: hashes::Hash) -> Result { Ok(self.map.read().get(&hash).unwrap().clone()) } } @@ -278,7 +278,7 @@ mod tests { let pruning_point: Hash = 1.into(); headers_store.map.write().insert( pruning_point, - Arc::new(HeaderWithBlockLevel { + HeaderWithBlockLevel { header: Arc::new(Header { hash: pruning_point, version: 0, @@ -301,13 +301,13 @@ mod tests { pruning_point: 1.into(), }), block_level: 0, - }), + }, ); let pp_anticone_block: Hash = 3001.into(); headers_store.map.write().insert( pp_anticone_block, - Arc::new(HeaderWithBlockLevel { + HeaderWithBlockLevel { header: Arc::new(Header { hash: pp_anticone_block, version: 0, @@ -330,13 +330,13 @@ mod tests { pruning_point: 1.into(), }), block_level: 0, - }), + }, ); let pp_anticone_block_child: Hash = 3002.into(); headers_store.map.write().insert( pp_anticone_block_child, - Arc::new(HeaderWithBlockLevel { + HeaderWithBlockLevel { header: Arc::new(Header { hash: pp_anticone_block_child, version: 0, @@ -359,7 +359,7 @@ mod tests { pruning_point: 1.into(), }), block_level: 0, - }), + }, ); struct TestBlock { id: u64, @@ -450,7 +450,7 @@ mod tests { headers_store.map.write().insert( hash, - Arc::new(HeaderWithBlockLevel { + HeaderWithBlockLevel { header: Arc::new(Header { hash, version: 0, @@ -467,7 +467,7 @@ mod tests { pruning_point: 1.into(), }), block_level: test_block.block_level, - }), + }, ); }