diff --git a/Cargo.lock b/Cargo.lock index df817dc2035..aeedd4d7b2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,7 @@ dependencies = [ "clap", "clap_utils", "environment", + "hex", "logging", "slog", "sloggers", diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 5f590735004..603bc312446 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -119,15 +119,14 @@ impl BeaconChain { prev_block_slot = block.slot(); expected_block_root = block.message().parent_root(); - // If we've reached genesis, add the genesis block root to the batch and set the - // anchor slot to 0 to indicate completion. + // If we've reached genesis, add the genesis block root to the batch for all slots + // between 0 and the first block slot, and set the anchor slot to 0 to indicate + // completion. if expected_block_root == self.genesis_block_root { let genesis_slot = self.spec.genesis_slot; - chunk_writer.set( - genesis_slot.as_usize(), - self.genesis_block_root, - &mut cold_batch, - )?; + for slot in genesis_slot.as_usize()..block.slot().as_usize() { + chunk_writer.set(slot, self.genesis_block_root, &mut cold_batch)?; + } prev_block_slot = genesis_slot; expected_block_root = Hash256::zero(); break; diff --git a/beacon_node/beacon_chain/src/otb_verification_service.rs b/beacon_node/beacon_chain/src/otb_verification_service.rs index 805b61dd9c0..b934c553e6c 100644 --- a/beacon_node/beacon_chain/src/otb_verification_service.rs +++ b/beacon_node/beacon_chain/src/otb_verification_service.rs @@ -119,10 +119,13 @@ pub fn start_otb_verification_service( pub fn load_optimistic_transition_blocks( chain: &BeaconChain, ) -> Result, StoreError> { - process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| { - iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) - .collect() - })? + process_results( + chain.store.hot_db.iter_column::(OTBColumn), + |iter| { + iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) + .collect() + }, + )? } #[derive(Debug)] diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ab54af42c78..5bf665a5551 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2074,6 +2074,18 @@ async fn weak_subjectivity_sync_unaligned_unadvanced_checkpoint() { weak_subjectivity_sync_test(slots, checkpoint_slot).await } +// Regression test for https://github.com/sigp/lighthouse/issues/4817 +// Skip 3 slots immediately after genesis, creating a gap between the genesis block and the first +// real block. +#[tokio::test] +async fn weak_subjectivity_sync_skips_at_genesis() { + let start_slot = 4; + let end_slot = E::slots_per_epoch() * 4; + let slots = (start_slot..end_slot).map(Slot::new).collect(); + let checkpoint_slot = Slot::new(E::slots_per_epoch() * 2); + weak_subjectivity_sync_test(slots, checkpoint_slot).await +} + async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Build an initial chain on one harness, representing a synced node with full history. let num_final_blocks = E::slots_per_epoch() * 2; diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index fcc40706b30..bfeb2b943b6 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -43,6 +43,8 @@ pub enum Error { BlockReplayError(BlockReplayError), AddPayloadLogicError, SlotClockUnavailableForMigration, + InvalidKey, + InvalidBytes, UnableToDowngrade, InconsistentFork(InconsistentFork), } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 87f8e0ffc36..869c143ac72 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1251,10 +1251,17 @@ impl, Cold: ItemStore> HotColdDB let split_slot = self.get_split_slot(); let anchor = self.get_anchor_info(); - // There are no restore points stored if the state upper limit lies in the hot database. - // It hasn't been reached yet, and may never be. - if anchor.map_or(false, |a| a.state_upper_limit >= split_slot) { + // There are no restore points stored if the state upper limit lies in the hot database, + // and the lower limit is zero. It hasn't been reached yet, and may never be. + if anchor.as_ref().map_or(false, |a| { + a.state_upper_limit >= split_slot && a.state_lower_limit == 0 + }) { None + } else if let Some(lower_limit) = anchor + .map(|a| a.state_lower_limit) + .filter(|limit| *limit > 0) + { + Some(lower_limit) } else { Some( (split_slot - 1) / self.config.slots_per_restore_point diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 7aac9f72d91..f8ead8963e9 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,7 +1,6 @@ use super::*; use crate::hot_cold_store::HotColdDBError; use crate::metrics; -use db_key::Key; use leveldb::compaction::Compaction; use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; @@ -176,10 +175,8 @@ impl KeyValueStore for LevelDB { Ok(()) } - /// Iterate through all keys and values in a particular column. - fn iter_column(&self, column: DBColumn) -> ColumnIter { - let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from)); let iter = self.db.iter(self.read_options()); iter.seek(&start_key); @@ -187,13 +184,12 @@ impl KeyValueStore for LevelDB { Box::new( iter.take_while(move |(key, _)| key.matches_column(column)) .map(move |(bytes_key, value)| { - let key = - bytes_key - .remove_column(column) - .ok_or(HotColdDBError::IterationError { - unexpected_key: bytes_key, - })?; - Ok((key, value)) + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + Ok((K::from_bytes(key)?, value)) }), ) } @@ -224,12 +220,12 @@ impl KeyValueStore for LevelDB { impl ItemStore for LevelDB {} /// Used for keying leveldb. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct BytesKey { key: Vec, } -impl Key for BytesKey { +impl db_key::Key for BytesKey { fn from_u8(key: &[u8]) -> Self { Self { key: key.to_vec() } } @@ -245,12 +241,20 @@ impl BytesKey { self.key.starts_with(column.as_bytes()) } - /// Remove the column from a key, returning its `Hash256` portion. + /// Remove the column from a 32 byte key, yielding the `Hash256` key. pub fn remove_column(&self, column: DBColumn) -> Option { + let key = self.remove_column_variable(column)?; + (column.key_size() == 32).then(|| Hash256::from_slice(key)) + } + + /// Remove the column from a key. + /// + /// Will return `None` if the value doesn't match the column or has the wrong length. + pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> { if self.matches_column(column) { let subkey = &self.key[column.as_bytes().len()..]; - if subkey.len() == 32 { - return Some(Hash256::from_slice(subkey)); + if subkey.len() == column.key_size() { + return Some(subkey); } } None diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ee01fa1ae15..35479e37103 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,7 +43,7 @@ use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; -pub type ColumnIter<'a> = Box), Error>> + 'a>; +pub type ColumnIter<'a, K> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a> = Box> + 'a>; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -80,15 +80,33 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn compact(&self) -> Result<(), Error>; /// Iterate through all keys and values in a particular column. - fn iter_column(&self, _column: DBColumn) -> ColumnIter { - // Default impl for non LevelDB databases - Box::new(std::iter::empty()) + fn iter_column(&self, column: DBColumn) -> ColumnIter { + self.iter_column_from(column, &vec![0; column.key_size()]) } + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter; + /// Iterate through all keys in a particular column. - fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter { - // Default impl for non LevelDB databases - Box::new(std::iter::empty()) + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; +} + +pub trait Key: Sized + 'static { + fn from_bytes(key: &[u8]) -> Result; +} + +impl Key for Hash256 { + fn from_bytes(key: &[u8]) -> Result { + if key.len() == 32 { + Ok(Hash256::from_slice(key)) + } else { + Err(Error::InvalidKey) + } + } +} + +impl Key for Vec { + fn from_bytes(key: &[u8]) -> Result { + Ok(key.to_vec()) } } @@ -230,6 +248,33 @@ impl DBColumn { pub fn as_bytes(self) -> &'static [u8] { self.as_str().as_bytes() } + + /// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes. + /// + /// This function returns the number of bytes used by keys in a given column. + pub fn key_size(self) -> usize { + match self { + Self::BeaconMeta + | Self::BeaconBlock + | Self::BeaconState + | Self::BeaconStateSummary + | Self::BeaconStateTemporary + | Self::ExecPayload + | Self::BeaconChain + | Self::OpPool + | Self::Eth1Cache + | Self::ForkChoice + | Self::PubkeyCache + | Self::BeaconRestorePoint + | Self::DhtEnrs + | Self::OptimisticTransitionBlock => 32, + Self::BeaconBlockRoots + | Self::BeaconStateRoots + | Self::BeaconHistoricalRoots + | Self::BeaconHistoricalSummaries + | Self::BeaconRandaoMixes => 8, + } + } } /// An item that may stored in a `Store` by serializing and deserializing from bytes. diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 3f6a9e514f2..7a276a1fad9 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,17 +1,17 @@ -use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; -use crate::{ColumnIter, DBColumn}; +use crate::{ + get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, + ItemStore, Key, KeyValueStore, KeyValueStoreOp, +}; use parking_lot::{Mutex, MutexGuard, RwLock}; -use std::collections::{HashMap, HashSet}; +use std::collections::BTreeMap; use std::marker::PhantomData; use types::*; -type DBHashMap = HashMap, Vec>; -type DBKeyMap = HashMap, HashSet>>; +type DBMap = BTreeMap>; -/// A thread-safe `HashMap` wrapper. +/// A thread-safe `BTreeMap` wrapper. pub struct MemoryStore { - db: RwLock, - col_keys: RwLock, + db: RwLock, transaction_mutex: Mutex<()>, _phantom: PhantomData, } @@ -20,36 +20,24 @@ impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { - db: RwLock::new(HashMap::new()), - col_keys: RwLock::new(HashMap::new()), + db: RwLock::new(BTreeMap::new()), transaction_mutex: Mutex::new(()), _phantom: PhantomData, } } - - fn get_key_for_col(col: &str, key: &[u8]) -> Vec { - let mut col = col.as_bytes().to_vec(); - col.append(&mut key.to_vec()); - col - } } impl KeyValueStore for MemoryStore { /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); Ok(self.db.read().get(&column_key).cloned()) } /// Puts a key in the database. fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); self.db.write().insert(column_key, val.to_vec()); - self.col_keys - .write() - .entry(col.as_bytes().to_vec()) - .or_default() - .insert(key.to_vec()); Ok(()) } @@ -64,18 +52,14 @@ impl KeyValueStore for MemoryStore { /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); Ok(self.db.read().contains_key(&column_key)) } /// Delete some key from the database. fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { - let column_key = Self::get_key_for_col(col, key); + let column_key = BytesKey::from_vec(get_key_for_col(col, key)); self.db.write().remove(&column_key); - self.col_keys - .write() - .get_mut(&col.as_bytes().to_vec()) - .map(|set| set.remove(key)); Ok(()) } @@ -83,35 +67,41 @@ impl KeyValueStore for MemoryStore { for op in batch { match op { KeyValueStoreOp::PutKeyValue(key, value) => { - self.db.write().insert(key, value); + self.db.write().insert(BytesKey::from_vec(key), value); } - KeyValueStoreOp::DeleteKey(hash) => { - self.db.write().remove(&hash); + KeyValueStoreOp::DeleteKey(key) => { + self.db.write().remove(&BytesKey::from_vec(key)); } } } Ok(()) } - // pub type ColumnIter<'a> = Box), Error>> + 'a>; - fn iter_column(&self, column: DBColumn) -> ColumnIter { + fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { + // We use this awkward pattern because we can't lock the `self.db` field *and* maintain a + // reference to the lock guard across calls to `.next()`. This would be require a + // struct with a field (the iterator) which references another field (the lock guard). + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), from)); let col = column.as_str(); - if let Some(keys) = self - .col_keys + let keys = self + .db .read() - .get(col.as_bytes()) - .map(|set| set.iter().cloned().collect::>()) - { - Box::new(keys.into_iter().filter_map(move |key| { - let hash = Hash256::from_slice(&key); - self.get_bytes(col, &key) - .transpose() - .map(|res| res.map(|bytes| (hash, bytes))) - })) - } else { - Box::new(std::iter::empty()) - } + .range(start_key..) + .take_while(|(k, _)| k.remove_column_variable(column).is_some()) + .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) + .collect::>(); + Box::new(keys.into_iter().filter_map(move |key| { + self.get_bytes(col, &key).transpose().map(|res| { + let k = K::from_bytes(&key)?; + let v = res?; + Ok((k, v)) + }) + })) + } + + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } fn begin_rw_transaction(&self) -> MutexGuard<()> { diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index 1570c171cb6..07045dd95c2 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -9,6 +9,7 @@ beacon_node = { workspace = true } clap = { workspace = true } clap_utils = { workspace = true } environment = { workspace = true } +hex = { workspace = true } logging = { workspace = true } sloggers = { workspace = true } store = { workspace = true } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index ce0b094b772..b5669783116 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -60,6 +60,24 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("sizes") .possible_values(InspectTarget::VARIANTS), ) + .arg( + Arg::with_name("skip") + .long("skip") + .value_name("N") + .help("Skip over the first N keys"), + ) + .arg( + Arg::with_name("limit") + .long("limit") + .value_name("N") + .help("Output at most N keys"), + ) + .arg( + Arg::with_name("freezer") + .long("freezer") + .help("Inspect the freezer DB rather than the hot DB") + .takes_value(false), + ) .arg( Arg::with_name("output-dir") .long("output-dir") @@ -171,6 +189,9 @@ pub enum InspectTarget { pub struct InspectConfig { column: DBColumn, target: InspectTarget, + skip: Option, + limit: Option, + freezer: bool, /// Configures where the inspect output should be stored. output_dir: PathBuf, } @@ -178,11 +199,18 @@ pub struct InspectConfig { fn parse_inspect_config(cli_args: &ArgMatches) -> Result { let column = clap_utils::parse_required(cli_args, "column")?; let target = clap_utils::parse_required(cli_args, "output")?; + let skip = clap_utils::parse_optional(cli_args, "skip")?; + let limit = clap_utils::parse_optional(cli_args, "limit")?; + let freezer = cli_args.is_present("freezer"); + let output_dir: PathBuf = clap_utils::parse_optional(cli_args, "output-dir")?.unwrap_or_else(PathBuf::new); Ok(InspectConfig { column, target, + skip, + limit, + freezer, output_dir, }) } @@ -208,6 +236,17 @@ pub fn inspect_db( .map_err(|e| format!("{:?}", e))?; let mut total = 0; + let mut num_keys = 0; + + let sub_db = if inspect_config.freezer { + &db.cold_db + } else { + &db.hot_db + }; + + let skip = inspect_config.skip.unwrap_or(0); + let limit = inspect_config.limit.unwrap_or(usize::MAX); + let base_path = &inspect_config.output_dir; if let InspectTarget::Values = inspect_config.target { @@ -215,20 +254,24 @@ pub fn inspect_db( .map_err(|e| format!("Unable to create import directory: {:?}", e))?; } - for res in db.hot_db.iter_column(inspect_config.column) { + for res in sub_db + .iter_column::>(inspect_config.column) + .skip(skip) + .take(limit) + { let (key, value) = res.map_err(|e| format!("{:?}", e))?; match inspect_config.target { InspectTarget::ValueSizes => { - println!("{:?}: {} bytes", key, value.len()); - total += value.len(); - } - InspectTarget::ValueTotal => { - total += value.len(); + println!("{}: {} bytes", hex::encode(&key), value.len()); } + InspectTarget::ValueTotal => (), InspectTarget::Values => { - let file_path = - base_path.join(format!("{}_{}.ssz", inspect_config.column.as_str(), key)); + let file_path = base_path.join(format!( + "{}_{}.ssz", + inspect_config.column.as_str(), + hex::encode(&key) + )); let write_result = fs::OpenOptions::new() .create(true) @@ -244,17 +287,14 @@ pub fn inspect_db( } else { println!("Successfully saved values to file: {:?}", file_path); } - - total += value.len(); } } + total += value.len(); + num_keys += 1; } - match inspect_config.target { - InspectTarget::ValueSizes | InspectTarget::ValueTotal | InspectTarget::Values => { - println!("Total: {} bytes", total); - } - } + println!("Num keys: {}", num_keys); + println!("Total: {} bytes", total); Ok(()) }