Skip to content

Commit

Permalink
consensus_bucketed_tx_storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Nov 22, 2024
1 parent a0aeec3 commit 821c7f2
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ separator = "0.4.1"
seqlock = "0.2.0"
serde = { version = "1.0.190", features = ["derive", "rc"] }
serde_bytes = "0.11.12"
serde_with = "3.11.0"
serde_json = "1.0.107"
serde_repr = "0.1.18"
serde-value = "0.7.0"
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rayon.workspace = true
rocksdb.workspace = true
secp256k1.workspace = true
serde.workspace = true
serde_with.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
107 changes: 91 additions & 16 deletions consensus/src/model/stores/block_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use kaspa_consensus_core::tx::Transaction;
use kaspa_consensus_core::tx::TransactionIndexType;
use kaspa_consensus_core::tx::{TransactionInput, TransactionOutput};
use kaspa_consensus_core::{tx::Transaction, BlockHasher};
use kaspa_database::prelude::Cache;
use kaspa_database::prelude::CachePolicy;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::DB;
Expand All @@ -9,10 +11,49 @@ use kaspa_hashes::Hash;
use kaspa_utils::mem_size::MemSizeEstimator;
use rocksdb::WriteBatch;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, Bytes};
use std::fmt::Display;
use std::sync::Arc;

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
struct BlockTransactionFullAccessKey(#[serde_as(as = "Bytes")] [u8; 36]);

impl Display for BlockTransactionFullAccessKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl BlockTransactionFullAccessKey {
pub fn new(block_hash: &Hash, index: TransactionIndexType) -> Self {
let block_hash_bytes = block_hash.as_bytes();
let index_bytes = index.to_be_bytes();
let mut key = std::mem::MaybeUninit::uninit();
let dest = key.as_mut_ptr() as *mut u8;
Self(
// unsafe, but avoids initializing array with zeros
unsafe {
std::ptr::copy_nonoverlapping(block_hash_bytes.as_ptr(), dest, block_hash_bytes.len());
std::ptr::copy_nonoverlapping(index_bytes.as_ptr(), dest.add(block_hash_bytes.len()), index_bytes.len());
key.assume_init()
},
)
}
}

impl AsRef<[u8]> for BlockTransactionFullAccessKey {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct BlockBody(Arc<Vec<Transaction>>);

pub trait BlockTransactionsStoreReader {
fn get(&self, hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError>;
fn get(&self, block_hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError>;
fn get_at_index(&self, block_hash: Hash, index: TransactionIndexType) -> Result<Transaction, StoreError>;
}

pub trait BlockTransactionsStore: BlockTransactionsStoreReader {
Expand All @@ -21,9 +62,6 @@ pub trait BlockTransactionsStore: BlockTransactionsStoreReader {
fn delete(&self, hash: Hash) -> Result<(), StoreError>;
}

#[derive(Clone, Serialize, Deserialize)]
struct BlockBody(Arc<Vec<Transaction>>);

impl MemSizeEstimator for BlockBody {
fn estimate_mem_bytes(&self) -> usize {
const NORMAL_SIG_SIZE: usize = 66;
Expand All @@ -45,51 +83,88 @@ impl MemSizeEstimator for BlockBody {
#[derive(Clone)]
pub struct DbBlockTransactionsStore {
db: Arc<DB>,
access: CachedDbAccess<Hash, BlockBody, BlockHasher>,
access: CachedDbAccess<BlockTransactionFullAccessKey, Transaction>,
cache: Cache<Hash, BlockBody>,
}

impl DbBlockTransactionsStore {
pub fn new(db: Arc<DB>, cache_policy: CachePolicy) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::BlockTransactions.into()) }
Self {
db: Arc::clone(&db),
access: CachedDbAccess::new(Arc::clone(&db), CachePolicy::Empty, DatabaseStorePrefixes::BlockTransactions.into()),
cache: Cache::new(cache_policy),
}
}

pub fn clone_with_new_cache(&self, cache_policy: CachePolicy) -> Self {
Self::new(Arc::clone(&self.db), cache_policy)
}

pub fn has(&self, hash: Hash) -> Result<bool, StoreError> {
self.access.has(hash)
Ok(self.cache.contains_key(&hash) || self.access.has_bucket(hash.as_bytes().as_ref())?)
}

pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, transactions: Arc<Vec<Transaction>>) -> Result<(), StoreError> {
if self.access.has(hash)? {
if self.cache.contains_key(&hash) || self.access.has_bucket(hash.as_bytes().as_ref())? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(BatchDbWriter::new(batch), hash, BlockBody(transactions))?;
let writer = BatchDbWriter::new(batch);
self.cache.insert(hash, BlockBody(transactions.clone()));
self.access.write_many_without_cache(
writer,
&mut transactions
.iter()
.enumerate()
.map(|(index, tx)| (BlockTransactionFullAccessKey::new(&hash, index as TransactionIndexType), tx.clone())),
)?;
Ok(())
}

pub fn delete_batch(&self, batch: &mut WriteBatch, hash: Hash) -> Result<(), StoreError> {
self.access.delete(BatchDbWriter::new(batch), hash)
self.cache.remove(&hash);
self.access.delete_bucket(BatchDbWriter::new(batch), hash.as_bytes().as_ref())
}
}

impl BlockTransactionsStoreReader for DbBlockTransactionsStore {
fn get(&self, hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError> {
Ok(self.access.read(hash)?.0)
self.cache
.get(&hash)
.map(|block_transactions| block_transactions.0.clone())
.ok_or_else(|| StoreError::BucketNotFound(hash.to_string()));
if self.cache.contains_key(&hash) {
Ok(self.cache.get(&hash).unwrap().0.clone())
} else {
Ok(Arc::new(self.access.read_bucket(hash.as_bytes().as_ref())?))
}
}

fn get_at_index(&self, block_hash: Hash, index: TransactionIndexType) -> Result<Transaction, StoreError> {
if let Some(block_transactions) = self.cache.get(&block_hash) {
return Ok(block_transactions.0[index as usize].clone());
} else {
self.access.read(BlockTransactionFullAccessKey::new(&block_hash, index))
}
}
}

impl BlockTransactionsStore for DbBlockTransactionsStore {
fn insert(&self, hash: Hash, transactions: Arc<Vec<Transaction>>) -> Result<(), StoreError> {
if self.access.has(hash)? {
if self.access.has_bucket(hash.as_bytes().as_ref())? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(DirectDbWriter::new(&self.db), hash, BlockBody(transactions))?;
Ok(())
self.cache.insert(hash, BlockBody(transactions.clone()));
self.access.write_many_without_cache(
DirectDbWriter::new(&self.db),
&mut transactions
.iter()
.enumerate()
.map(|(index, tx)| (BlockTransactionFullAccessKey::new(&hash, index as TransactionIndexType), tx.clone())),
)
}

fn delete(&self, hash: Hash) -> Result<(), StoreError> {
self.access.delete(DirectDbWriter::new(&self.db), hash)
self.cache.remove(&hash);
self.access.delete_bucket(DirectDbWriter::new(&self.db), hash.as_bytes().as_ref())
}
}
39 changes: 39 additions & 0 deletions database/src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ where
Ok(self.cache.contains_key(&key) || self.db.get_pinned(DbKey::new(&self.prefix, key))?.is_some())
}

pub fn has_bucket(&self, bucket: &[u8]) -> Result<bool, StoreError> {
let db_key = DbKey::prefix_only(&self.prefix).with_bucket(bucket);
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));

Ok(self.db.iterator_opt(IteratorMode::Start, read_opts).next().is_some())
}

pub fn read(&self, key: TKey) -> Result<TData, StoreError>
where
TKey: Clone + AsRef<[u8]> + ToString,
Expand Down Expand Up @@ -165,6 +173,37 @@ where
Ok(())
}

/// Deletes a prefix bucket from the db.
/// Note: This does not clear from the cache.
pub fn delete_bucket(&self, mut writer: impl DbWriter, bucket: &[u8]) -> Result<(), StoreError>
where {
let db_key = DbKey::new_with_bucket(&self.prefix, &bucket, []);
let (from, to) = rocksdb::PrefixRange(db_key.as_ref()).into_bounds();
writer.delete_range(from.unwrap(), to.unwrap())?;
Ok(())
}

pub fn read_bucket(&self, bucket: &[u8]) -> Result<Vec<TData>, StoreError>
where
TData: DeserializeOwned,
{
let db_key = DbKey::prefix_only(&self.prefix).with_bucket(bucket);
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));

self.db
.iterator_opt(IteratorMode::Start, read_opts)
.into_iter()
.map(|item| match item {
Ok((_, value_bytes)) => match bincode::deserialize::<TData>(value_bytes.as_ref()) {
Ok(value) => Ok(value),
Err(err) => Err(err.into()),
},
Err(err) => Err(err.into()),
})
.collect()
}

/// A dynamic iterator that can iterate through a specific prefix / bucket, or from a certain start point.
//TODO: loop and chain iterators for multi-prefix / bucket iterator.
pub fn seek_iterator(
Expand Down
3 changes: 3 additions & 0 deletions database/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub enum StoreError {
#[error("key {0} not found in store")]
KeyNotFound(DbKey),

#[error("bucket {0} not found in store")]
BucketNotFound(String),

#[error("key {0} already exists in store")]
KeyAlreadyExists(String),

Expand Down
9 changes: 9 additions & 0 deletions database/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ impl DbKey {
self.prefix_len += bucket.as_ref().len();
}

pub fn with_bucket<TBucket>(mut self, bucket: TBucket) -> Self
where
TBucket: Copy + AsRef<[u8]>,
{
self.path.extend(bucket.as_ref().iter().copied());
self.prefix_len += bucket.as_ref().len();
self
}

pub fn add_key<TKey>(&mut self, key: TKey)
where
TKey: Clone + AsRef<[u8]>,
Expand Down

0 comments on commit 821c7f2

Please sign in to comment.