Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify CachedDbAccess/ForCopy to one struct #74

Merged
merged 2 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions consensus/src/model/stores/acceptance_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,40 @@ const STORE_PREFIX: &[u8] = b"acceptance-data";
/// A DB + cache implementation of `DbAcceptanceDataStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbAcceptanceDataStore {
raw_db: Arc<DB>,
cached_access: CachedDbAccess<Hash, AcceptanceData, BlockHasher>,
db: Arc<DB>,
access: CachedDbAccess<Hash, Arc<AcceptanceData>, BlockHasher>,
}

impl DbAcceptanceDataStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) }
Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) }
}

pub fn clone_with_new_cache(&self, cache_size: u64) -> Self {
Self::new(Arc::clone(&self.raw_db), cache_size)
Self::new(Arc::clone(&self.db), cache_size)
}

pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, acceptance_data: Arc<AcceptanceData>) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(BatchDbWriter::new(batch), hash, &acceptance_data)?;
self.access.write(BatchDbWriter::new(batch), hash, acceptance_data)?;
Ok(())
}
}

impl AcceptanceDataStoreReader for DbAcceptanceDataStore {
fn get(&self, hash: Hash) -> Result<Arc<AcceptanceData>, StoreError> {
self.cached_access.read(hash)
self.access.read(hash)
}
}

impl AcceptanceDataStore for DbAcceptanceDataStore {
fn insert(&self, hash: Hash, acceptance_data: Arc<AcceptanceData>) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &acceptance_data)?;
self.access.write(DirectDbWriter::new(&self.db), hash, acceptance_data)?;
Ok(())
}
}
19 changes: 9 additions & 10 deletions consensus/src/model/stores/block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,40 @@ const STORE_PREFIX: &[u8] = b"block-transactions";
/// A DB + cache implementation of `BlockTransactionsStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbBlockTransactionsStore {
raw_db: Arc<DB>,
// `CachedDbAccess` is shallow cloned so no need to wrap with Arc
cached_access: CachedDbAccess<Hash, Vec<Transaction>, BlockHasher>,
db: Arc<DB>,
access: CachedDbAccess<Hash, Arc<Vec<Transaction>>, BlockHasher>,
}

impl DbBlockTransactionsStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) }
Self { db: Arc::clone(&db), access: CachedDbAccess::new(Arc::clone(&db), cache_size, STORE_PREFIX) }
}

pub fn clone_with_new_cache(&self, cache_size: u64) -> Self {
Self::new(Arc::clone(&self.raw_db), cache_size)
Self::new(Arc::clone(&self.db), cache_size)
}

pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, transactions: Arc<Vec<Transaction>>) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(BatchDbWriter::new(batch), hash, &transactions)?;
self.access.write(BatchDbWriter::new(batch), hash, transactions)?;
Ok(())
}
}

impl BlockTransactionsStoreReader for DbBlockTransactionsStore {
fn get(&self, hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError> {
self.cached_access.read(hash)
self.access.read(hash)
}
}

impl BlockTransactionsStore for DbBlockTransactionsStore {
fn insert(&self, hash: Hash, transactions: Arc<Vec<Transaction>>) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, &transactions)?;
self.access.write(DirectDbWriter::new(&self.db), hash, transactions)?;
Ok(())
}
}
24 changes: 10 additions & 14 deletions consensus/src/model/stores/daa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,45 @@ pub trait DaaStore: DaaStoreReader {
fn insert(&self, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError>;
}

const ADDED_BLOCKS_STORE_PREFIX: &[u8] = b"daa-added-blocks";
const STORE_PREFIX: &[u8] = b"daa-added-blocks";

/// A DB + cache implementation of `DaaStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbDaaStore {
raw_db: Arc<DB>,
// `CachedDbAccess` is shallow cloned so no need to wrap with Arc
cached_daa_added_blocks_access: CachedDbAccess<Hash, Vec<Hash>, BlockHasher>,
db: Arc<DB>,
access: CachedDbAccess<Hash, BlockHashes, BlockHasher>,
}

impl DbDaaStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self {
raw_db: Arc::clone(&db),
cached_daa_added_blocks_access: CachedDbAccess::new(db, cache_size, ADDED_BLOCKS_STORE_PREFIX),
}
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) }
}

pub fn clone_with_new_cache(&self, cache_size: u64) -> Self {
Self::new(Arc::clone(&self.raw_db), cache_size)
Self::new(Arc::clone(&self.db), cache_size)
}

pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError> {
if self.cached_daa_added_blocks_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_daa_added_blocks_access.write(BatchDbWriter::new(batch), hash, &added_blocks)?;
self.access.write(BatchDbWriter::new(batch), hash, added_blocks)?;
Ok(())
}
}

impl DaaStoreReader for DbDaaStore {
fn get_daa_added_blocks(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
self.cached_daa_added_blocks_access.read(hash)
self.access.read(hash)
}
}

impl DaaStore for DbDaaStore {
fn insert(&self, hash: Hash, added_blocks: BlockHashes) -> Result<(), StoreError> {
if self.cached_daa_added_blocks_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_daa_added_blocks_access.write(DirectDbWriter::new(&self.raw_db), hash, &added_blocks)?;
self.access.write(DirectDbWriter::new(&self.db), hash, added_blocks)?;
Ok(())
}
}
87 changes: 11 additions & 76 deletions consensus/src/model/stores/database/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::model::stores::{errors::StoreError, DB};
use serde::{de::DeserializeOwned, Serialize};
use std::{collections::hash_map::RandomState, hash::BuildHasher, sync::Arc};

/// A concurrent DB store with typed caching.
/// A concurrent DB store access with typed caching.
#[derive(Clone)]
pub struct CachedDbAccess<TKey, TData, S = RandomState>
where
Expand All @@ -13,7 +13,7 @@ where
db: Arc<DB>,

// Cache
cache: Cache<TKey, Arc<TData>, S>,
cache: Cache<TKey, TData, S>,

// DB bucket/path
prefix: &'static [u8],
Expand All @@ -29,7 +29,7 @@ where
Self { db, cache: Cache::new(cache_size), prefix }
}

pub fn read_from_cache(&self, key: TKey) -> Option<Arc<TData>>
pub fn read_from_cache(&self, key: TKey) -> Option<TData>
where
TKey: Copy + AsRef<[u8]>,
{
Expand All @@ -43,7 +43,7 @@ where
Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(self.prefix, key))?.is_some())
}

pub fn read(&self, key: TKey) -> Result<Arc<TData>, StoreError>
pub fn read(&self, key: TKey) -> Result<TData, StoreError>
where
TKey: Copy + AsRef<[u8]> + ToString,
TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime
Expand All @@ -53,30 +53,30 @@ where
} else {
let db_key = DbKey::new(self.prefix, key);
if let Some(slice) = self.db.get_pinned(&db_key)? {
let data: Arc<TData> = Arc::new(bincode::deserialize(&slice)?);
self.cache.insert(key, Arc::clone(&data));
let data: TData = bincode::deserialize(&slice)?;
self.cache.insert(key, data.clone());
Ok(data)
} else {
Err(StoreError::KeyNotFound(db_key))
}
}
}

pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: &Arc<TData>) -> Result<(), StoreError>
pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError>
where
TKey: Copy + AsRef<[u8]>,
TData: Serialize,
{
self.cache.insert(key, Arc::clone(data));
let bin_data = bincode::serialize(data.as_ref())?;
let bin_data = bincode::serialize(&data)?;
self.cache.insert(key, data);
writer.put(DbKey::new(self.prefix, key), bin_data)?;
Ok(())
}

pub fn write_many(
&self,
mut writer: impl DbWriter,
iter: &mut (impl Iterator<Item = (TKey, Arc<TData>)> + Clone),
iter: &mut (impl Iterator<Item = (TKey, TData)> + Clone),
) -> Result<(), StoreError>
where
TKey: Copy + AsRef<[u8]>,
Expand All @@ -85,7 +85,7 @@ where
let iter_clone = iter.clone();
self.cache.insert_many(iter);
for (key, data) in iter_clone {
let bin_data = bincode::serialize(data.as_ref())?;
let bin_data = bincode::serialize(&data)?;
writer.put(DbKey::new(self.prefix, key), bin_data)?;
}
Ok(())
Expand All @@ -112,68 +112,3 @@ where
Ok(())
}
}

/// A concurrent DB store with typed caching for `Copy` types.
/// TODO: try and generalize under `CachedDbAccess`
#[derive(Clone)]
pub struct CachedDbAccessForCopy<TKey, TData, S = RandomState>
where
TKey: Clone + std::hash::Hash + Eq + Send + Sync,
TData: Clone + Copy + Send + Sync,
{
db: Arc<DB>,

// Cache
cache: Cache<TKey, TData, S>,

// DB bucket/path
prefix: &'static [u8],
}

impl<TKey, TData, S> CachedDbAccessForCopy<TKey, TData, S>
where
TKey: Clone + std::hash::Hash + Eq + Send + Sync,
TData: Clone + Copy + Send + Sync,
S: BuildHasher + Default,
{
pub fn new(db: Arc<DB>, cache_size: u64, prefix: &'static [u8]) -> Self {
Self { db, cache: Cache::new(cache_size), prefix }
}

pub fn has(&self, key: TKey) -> Result<bool, StoreError>
where
TKey: Copy + AsRef<[u8]>,
{
Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(self.prefix, key))?.is_some())
}

pub fn read(&self, key: TKey) -> Result<TData, StoreError>
where
TKey: Copy + AsRef<[u8]> + ToString,
TData: DeserializeOwned, // We need `DeserializeOwned` since the slice coming from `db.get_pinned` has short lifetime
{
if let Some(data) = self.cache.get(&key) {
Ok(data)
} else {
let db_key = DbKey::new(self.prefix, key);
if let Some(slice) = self.db.get_pinned(&db_key)? {
let data: TData = bincode::deserialize(&slice)?;
self.cache.insert(key, data);
Ok(data)
} else {
Err(StoreError::KeyNotFound(db_key))
}
}
}

pub fn write(&self, mut writer: impl DbWriter, key: TKey, data: TData) -> Result<(), StoreError>
where
TKey: Copy + AsRef<[u8]>,
TData: Serialize,
{
self.cache.insert(key, data);
let bin_data = bincode::serialize(&data)?;
writer.put(DbKey::new(self.prefix, key), bin_data)?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion consensus/src/model/stores/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod key;
mod writer;

pub mod prelude {
pub use super::access::{CachedDbAccess, CachedDbAccessForCopy};
pub use super::access::CachedDbAccess;
pub use super::cache::Cache;
pub use super::item::CachedDbItem;
pub use super::key::DbKey;
Expand Down
22 changes: 11 additions & 11 deletions consensus/src/model/stores/depth.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use super::{
database::prelude::{BatchDbWriter, CachedDbAccessForCopy, DirectDbWriter},
database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter},
errors::StoreError,
DB,
};
Expand Down Expand Up @@ -31,17 +31,17 @@ struct BlockDepthInfo {
/// A DB + cache implementation of `DepthStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbDepthStore {
raw_db: Arc<DB>,
cached_access: CachedDbAccessForCopy<Hash, BlockDepthInfo, BlockHasher>,
db: Arc<DB>,
access: CachedDbAccess<Hash, BlockDepthInfo, BlockHasher>,
}

impl DbDepthStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(db, cache_size, STORE_PREFIX) }
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) }
}

pub fn clone_with_new_cache(&self, cache_size: u64) -> Self {
Self::new(Arc::clone(&self.raw_db), cache_size)
Self::new(Arc::clone(&self.db), cache_size)
}

pub fn insert_batch(
Expand All @@ -51,30 +51,30 @@ impl DbDepthStore {
merge_depth_root: Hash,
finality_point: Hash,
) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(BatchDbWriter::new(batch), hash, BlockDepthInfo { merge_depth_root, finality_point })?;
self.access.write(BatchDbWriter::new(batch), hash, BlockDepthInfo { merge_depth_root, finality_point })?;
Ok(())
}
}

impl DepthStoreReader for DbDepthStore {
fn merge_depth_root(&self, hash: Hash) -> Result<Hash, StoreError> {
Ok(self.cached_access.read(hash)?.merge_depth_root)
Ok(self.access.read(hash)?.merge_depth_root)
}

fn finality_point(&self, hash: Hash) -> Result<Hash, StoreError> {
Ok(self.cached_access.read(hash)?.finality_point)
Ok(self.access.read(hash)?.finality_point)
}
}

impl DepthStore for DbDepthStore {
fn insert(&self, hash: Hash, merge_depth_root: Hash, finality_point: Hash) -> Result<(), StoreError> {
if self.cached_access.has(hash)? {
if self.access.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}
self.cached_access.write(DirectDbWriter::new(&self.raw_db), hash, BlockDepthInfo { merge_depth_root, finality_point })?;
self.access.write(DirectDbWriter::new(&self.db), hash, BlockDepthInfo { merge_depth_root, finality_point })?;
Ok(())
}
}
Loading