diff --git a/Cargo.lock b/Cargo.lock index e9e7547d7c2..35715d3a58c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,6 +445,7 @@ dependencies = [ "task_executor", "tempfile", "tokio", + "tokio-stream", "tree_hash", "types", "unused_port", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index af4d85e11ce..8e91d1fed25 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -45,6 +45,7 @@ state_processing = { path = "../../consensus/state_processing" } tree_hash = "0.4.1" types = { path = "../../consensus/types" } tokio = "1.14.0" +tokio-stream = "0.1.3" eth1 = { path = "../eth1" } futures = "0.3.7" genesis = { path = "../genesis" } diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs new file mode 100644 index 00000000000..e43f2a8dd81 --- /dev/null +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -0,0 +1,973 @@ +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; +use slog::{crit, debug, Logger}; +use std::collections::HashMap; +use std::sync::Arc; +use store::DatabaseBlock; +use task_executor::TaskExecutor; +use tokio::sync::{ + mpsc::{self, UnboundedSender}, + RwLock, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream}; +use types::{ + ChainSpec, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, Hash256, SignedBeaconBlock, + SignedBlindedBeaconBlock, Slot, +}; +use types::{ + ExecutionPayload, ExecutionPayloadCapella, ExecutionPayloadHeader, ExecutionPayloadMerge, +}; + +#[derive(PartialEq)] +pub enum CheckEarlyAttesterCache { + Yes, + No, +} + +#[derive(Debug)] +pub enum Error { + PayloadReconstruction(String), + BlocksByRangeFailure(Box), + RequestNotFound, + BlockResultNotFound, +} + +const BLOCKS_PER_RANGE_REQUEST: u64 = 32; + +// This is the same as a DatabaseBlock but the Arc allows us to avoid an unnecessary clone. +enum LoadedBeaconBlock { + Full(Arc>), + Blinded(Box>), +} +type LoadResult = Result>, BeaconChainError>; +type BlockResult = Result>>, BeaconChainError>; + +enum RequestState { + UnSent(Vec>), + Sent(HashMap>>), +} + +struct BodiesByRange { + start: u64, + count: u64, + state: RequestState, +} + +// stores the components of a block for future re-construction in a small form +struct BlockParts { + blinded_block: Box>, + header: Box>, + body: Option>>, +} + +impl BlockParts { + pub fn new( + blinded: Box>, + header: ExecutionPayloadHeader, + ) -> Self { + Self { + blinded_block: blinded, + header: Box::new(header), + body: None, + } + } + + pub fn root(&self) -> Hash256 { + self.blinded_block.canonical_root() + } + + pub fn slot(&self) -> Slot { + self.blinded_block.message().slot() + } + + pub fn block_hash(&self) -> ExecutionBlockHash { + self.header.block_hash() + } +} + +fn reconstruct_default_header_block( + blinded_block: Box>, + header_from_block: ExecutionPayloadHeader, + spec: &ChainSpec, +) -> BlockResult { + let fork = blinded_block + .fork_name(spec) + .map_err(BeaconChainError::InconsistentFork)?; + + let payload: ExecutionPayload = match fork { + ForkName::Merge => ExecutionPayloadMerge::default().into(), + ForkName::Capella => ExecutionPayloadCapella::default().into(), + ForkName::Base | ForkName::Altair => { + return Err(Error::PayloadReconstruction(format!( + "Block with fork variant {} has execution payload", + fork + )) + .into()) + } + }; + + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == header_from_block { + blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some) + } else { + Err(BeaconChainError::InconsistentPayloadReconstructed { + slot: blinded_block.slot(), + exec_block_hash: header_from_block.block_hash(), + canonical_transactions_root: header_from_block.transactions_root(), + reconstructed_transactions_root: header_from_payload.transactions_root(), + }) + } +} + +fn reconstruct_blocks( + block_map: &mut HashMap>>, + block_parts_with_bodies: HashMap>, + log: &Logger, +) { + for (root, block_parts) in block_parts_with_bodies { + if let Some(payload_body) = block_parts.body { + match payload_body.to_payload(block_parts.header.as_ref().clone()) { + Ok(payload) => { + let header_from_payload = ExecutionPayloadHeader::from(payload.to_ref()); + if header_from_payload == *block_parts.header { + block_map.insert( + root, + Arc::new( + block_parts + .blinded_block + .try_into_full_block(Some(payload)) + .ok_or(BeaconChainError::AddPayloadLogicError) + .map(Arc::new) + .map(Some), + ), + ); + } else { + let error = BeaconChainError::InconsistentPayloadReconstructed { + slot: block_parts.blinded_block.slot(), + exec_block_hash: block_parts.header.block_hash(), + canonical_transactions_root: block_parts.header.transactions_root(), + reconstructed_transactions_root: header_from_payload + .transactions_root(), + }; + debug!(log, "Failed to reconstruct block"; "root" => ?root, "error" => ?error); + block_map.insert(root, Arc::new(Err(error))); + } + } + Err(string) => { + block_map.insert( + root, + Arc::new(Err(Error::PayloadReconstruction(string).into())), + ); + } + } + } else { + block_map.insert( + root, + Arc::new(Err(BeaconChainError::BlockHashMissingFromExecutionLayer( + block_parts.block_hash(), + ))), + ); + } + } +} + +impl BodiesByRange { + pub fn new(maybe_block_parts: Option>) -> Self { + if let Some(block_parts) = maybe_block_parts { + Self { + start: block_parts.header.block_number(), + count: 1, + state: RequestState::UnSent(vec![block_parts]), + } + } else { + Self { + start: 0, + count: 0, + state: RequestState::UnSent(vec![]), + } + } + } + + pub fn is_unsent(&self) -> bool { + matches!(self.state, RequestState::UnSent(_)) + } + + pub fn push_block_parts(&mut self, block_parts: BlockParts) -> Result<(), BlockParts> { + if self.count == BLOCKS_PER_RANGE_REQUEST { + return Err(block_parts); + } + + match &mut self.state { + RequestState::Sent(_) => Err(block_parts), + RequestState::UnSent(blocks_parts_vec) => { + let block_number = block_parts.header.block_number(); + if self.count == 0 { + self.start = block_number; + self.count = 1; + blocks_parts_vec.push(block_parts); + Ok(()) + } else { + // need to figure out if this block fits in the request + if block_number < self.start + || self.start + BLOCKS_PER_RANGE_REQUEST <= block_number + { + return Err(block_parts); + } + + blocks_parts_vec.push(block_parts); + if self.start + self.count <= block_number { + self.count = block_number - self.start + 1; + } + + Ok(()) + } + } + } + } + + async fn execute(&mut self, execution_layer: &ExecutionLayer, log: &Logger) { + if let RequestState::UnSent(blocks_parts_ref) = &mut self.state { + let block_parts_vec = std::mem::take(blocks_parts_ref); + + let mut block_map = HashMap::new(); + match execution_layer + .get_payload_bodies_by_range(self.start, self.count) + .await + { + Ok(bodies) => { + let mut range_map = (self.start..(self.start + self.count)) + .zip(bodies.into_iter().chain(std::iter::repeat(None))) + .collect::>(); + + let mut with_bodies = HashMap::new(); + for mut block_parts in block_parts_vec { + with_bodies + // it's possible the same block is requested twice, using + // or_insert_with() skips duplicates + .entry(block_parts.root()) + .or_insert_with(|| { + let block_number = block_parts.header.block_number(); + block_parts.body = + range_map.remove(&block_number).flatten().map(Box::new); + + block_parts + }); + } + + reconstruct_blocks(&mut block_map, with_bodies, log); + } + Err(e) => { + let block_result = + Arc::new(Err(Error::BlocksByRangeFailure(Box::new(e)).into())); + debug!(log, "Payload bodies by range failure"; "error" => ?block_result); + for block_parts in block_parts_vec { + block_map.insert(block_parts.root(), block_result.clone()); + } + } + } + self.state = RequestState::Sent(block_map); + } + } + + pub async fn get_block_result( + &mut self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Option>> { + self.execute(execution_layer, log).await; + if let RequestState::Sent(map) = &self.state { + return map.get(root).cloned(); + } + // Shouldn't reach this point + None + } +} + +#[derive(Clone)] +enum EngineRequest { + ByRange(Arc>>), + // When we already have the data or there's an error + NoRequest(Arc>>>>), +} + +impl EngineRequest { + pub fn new_by_range() -> Self { + Self::ByRange(Arc::new(RwLock::new(BodiesByRange::new(None)))) + } + pub fn new_no_request() -> Self { + Self::NoRequest(Arc::new(RwLock::new(HashMap::new()))) + } + + pub async fn is_unsent(&self) -> bool { + match self { + Self::ByRange(bodies_by_range) => bodies_by_range.read().await.is_unsent(), + Self::NoRequest(_) => false, + } + } + + pub async fn push_block_parts(&mut self, block_parts: BlockParts, log: &Logger) { + match self { + Self::ByRange(bodies_by_range) => { + let mut request = bodies_by_range.write().await; + + if let Err(block_parts) = request.push_block_parts(block_parts) { + drop(request); + let new_by_range = BodiesByRange::new(Some(block_parts)); + *self = Self::ByRange(Arc::new(RwLock::new(new_by_range))); + } + } + Self::NoRequest(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_parts called on NoRequest Variant", + ); + } + } + } + + pub async fn push_block_result( + &mut self, + root: Hash256, + block_result: BlockResult, + log: &Logger, + ) { + // this function will only fail if something is seriously wrong + match self { + Self::ByRange(_) => { + // this should _never_ happen + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "push_block_result called on ByRange", + ); + } + Self::NoRequest(results) => { + results.write().await.insert(root, Arc::new(block_result)); + } + } + } + + pub async fn get_block_result( + &self, + root: &Hash256, + execution_layer: &ExecutionLayer, + log: &Logger, + ) -> Arc> { + match self { + Self::ByRange(by_range) => { + by_range + .write() + .await + .get_block_result(root, execution_layer, log) + .await + } + Self::NoRequest(map) => map.read().await.get(root).cloned(), + } + .unwrap_or_else(|| { + crit!( + log, + "Please notify the devs"; + "beacon_block_streamer" => "block_result not found in request", + "root" => ?root, + ); + Arc::new(Err(Error::BlockResultNotFound.into())) + }) + } +} + +pub struct BeaconBlockStreamer { + execution_layer: ExecutionLayer, + check_early_attester_cache: CheckEarlyAttesterCache, + beacon_chain: Arc>, +} + +impl BeaconBlockStreamer { + pub fn new( + beacon_chain: &Arc>, + check_early_attester_cache: CheckEarlyAttesterCache, + ) -> Result { + let execution_layer = beacon_chain + .execution_layer + .as_ref() + .ok_or(BeaconChainError::ExecutionLayerMissing)? + .clone(); + + Ok(Self { + execution_layer, + check_early_attester_cache, + beacon_chain: beacon_chain.clone(), + }) + } + + fn check_early_attester_cache( + &self, + root: Hash256, + ) -> Option>> { + if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes { + self.beacon_chain.early_attester_cache.get_block(root) + } else { + None + } + } + + fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { + let mut db_blocks = Vec::new(); + + for root in block_roots { + if let Some(cached_block) = self + .check_early_attester_cache(root) + .map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; + } + + match self.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + + db_blocks + } + + /// Pre-process the loaded blocks into execution engine requests. + /// + /// The purpose of this function is to separate the blocks into 2 categories: + /// 1) no_request - when we already have the full block or there's an error + /// 2) blocks_by_range - used for blinded blocks + /// + /// The function returns a vector of block roots in the same order as requested + /// along with the engine request that each root corresponds to. + async fn get_requests( + &self, + payloads: Vec<(Hash256, LoadResult)>, + ) -> Vec<(Hash256, EngineRequest)> { + let mut ordered_block_roots = Vec::new(); + let mut requests = HashMap::new(); + + // we sort the by range blocks by slot before adding them to the + // request as it should *better* optimize the number of blocks that + // can fit in the same request + let mut by_range_blocks: Vec> = vec![]; + let mut no_request = EngineRequest::new_no_request(); + + for (root, load_result) in payloads { + // preserve the order of the requested blocks + ordered_block_roots.push(root); + + let block_result = match load_result { + Err(e) => Err(e), + Ok(None) => Ok(None), + Ok(Some(LoadedBeaconBlock::Full(full_block))) => Ok(Some(full_block)), + Ok(Some(LoadedBeaconBlock::Blinded(blinded_block))) => { + match blinded_block + .message() + .execution_payload() + .map(|payload| payload.to_execution_payload_header()) + { + Ok(header) => { + if header.block_hash() == ExecutionBlockHash::zero() { + reconstruct_default_header_block( + blinded_block, + header, + &self.beacon_chain.spec, + ) + } else { + // Add the block to the set requiring a by-range request. + let block_parts = BlockParts::new(blinded_block, header); + by_range_blocks.push(block_parts); + continue; + } + } + Err(e) => Err(BeaconChainError::BeaconStateError(e)), + } + } + }; + + no_request + .push_block_result(root, block_result, &self.beacon_chain.log) + .await; + requests.insert(root, no_request.clone()); + } + + // Now deal with the by_range requests. Sort them in order of increasing slot + let mut by_range = EngineRequest::::new_by_range(); + by_range_blocks.sort_by_key(|block_parts| block_parts.slot()); + for block_parts in by_range_blocks { + let root = block_parts.root(); + by_range + .push_block_parts(block_parts, &self.beacon_chain.log) + .await; + requests.insert(root, by_range.clone()); + } + + let mut result = vec![]; + for root in ordered_block_roots { + if let Some(request) = requests.get(&root) { + result.push((root, request.clone())) + } else { + crit!( + self.beacon_chain.log, + "Please notify the devs"; + "beacon_block_streamer" => "request not found", + "root" => ?root, + ); + no_request + .push_block_result( + root, + Err(Error::RequestNotFound.into()), + &self.beacon_chain.log, + ) + .await; + result.push((root, no_request.clone())); + } + } + + result + } + + // used when the execution engine doesn't support the payload bodies methods + async fn stream_blocks_fallback( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + debug!( + self.beacon_chain.log, + "Using slower fallback method of eth_getBlockByHash()" + ); + for root in block_roots { + let cached_block = self.check_early_attester_cache(root); + let block_result = if cached_block.is_some() { + Ok(cached_block) + } else { + self.beacon_chain + .get_block(&root) + .await + .map(|opt_block| opt_block.map(Arc::new)) + }; + + if sender.send((root, Arc::new(block_result))).is_err() { + break; + } + } + } + + async fn stream_blocks( + &self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + let n_roots = block_roots.len(); + let mut n_success = 0usize; + let mut n_sent = 0usize; + let mut engine_requests = 0usize; + + let payloads = self.load_payloads(block_roots); + let requests = self.get_requests(payloads).await; + + for (root, request) in requests { + if request.is_unsent().await { + engine_requests += 1; + } + + let result = request + .get_block_result(&root, &self.execution_layer, &self.beacon_chain.log) + .await; + + let successful = result + .as_ref() + .as_ref() + .map(|opt| opt.is_some()) + .unwrap_or(false); + + if sender.send((root, result)).is_err() { + break; + } else { + n_sent += 1; + if successful { + n_success += 1; + } + } + } + + debug!( + self.beacon_chain.log, + "BeaconBlockStreamer finished"; + "requested blocks" => n_roots, + "sent" => n_sent, + "succeeded" => n_success, + "failed" => (n_sent - n_success), + "engine requests" => engine_requests, + ); + } + + pub async fn stream( + self, + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + ) { + match self + .execution_layer + .get_engine_capabilities(None) + .await + .map_err(Box::new) + .map_err(BeaconChainError::EngineGetCapabilititesFailed) + { + Ok(engine_capabilities) => { + if engine_capabilities.get_payload_bodies_by_range_v1 { + self.stream_blocks(block_roots, sender).await; + } else { + // use the fallback method + self.stream_blocks_fallback(block_roots, sender).await; + } + } + Err(e) => { + send_errors(block_roots, sender, e).await; + } + } + } + + pub fn launch_stream( + self, + block_roots: Vec, + executor: &TaskExecutor, + ) -> impl Stream>)> { + let (block_tx, block_rx) = mpsc::unbounded_channel(); + debug!( + self.beacon_chain.log, + "Launching a BeaconBlockStreamer"; + "blocks" => block_roots.len(), + ); + executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); + UnboundedReceiverStream::new(block_rx) + } +} + +async fn send_errors( + block_roots: Vec, + sender: UnboundedSender<(Hash256, Arc>)>, + beacon_chain_error: BeaconChainError, +) { + let result = Arc::new(Err(beacon_chain_error)); + for root in block_roots { + if sender.send((root, result.clone())).is_err() { + break; + } + } +} + +impl From for BeaconChainError { + fn from(value: Error) -> Self { + BeaconChainError::BlockStreamerError(value) + } +} + +#[cfg(test)] +mod tests { + use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; + use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType}; + use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES}; + use execution_layer::EngineCapabilities; + use lazy_static::lazy_static; + use std::time::Duration; + use tokio::sync::mpsc; + use types::{ChainSpec, Epoch, EthSpec, Hash256, Keypair, MinimalEthSpec, Slot}; + + const VALIDATOR_COUNT: usize = 48; + lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); + } + + fn get_harness( + validator_count: usize, + spec: ChainSpec, + ) -> BeaconChainHarness> { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .logger(logging::test_logger()) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness + } + + #[tokio::test] + async fn check_all_blocks_from_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } + + #[tokio::test] + async fn check_fallback_altair_to_capella() { + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + let num_epochs = 8; + let bellatrix_fork_epoch = 2usize; + let capella_fork_epoch = 4usize; + let num_blocks_produced = num_epochs * slots_per_epoch; + + let mut spec = test_spec::(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(bellatrix_fork_epoch as u64)); + spec.capella_fork_epoch = Some(Epoch::new(capella_fork_epoch as u64)); + + let harness = get_harness(VALIDATOR_COUNT, spec); + + // modify execution engine so it doesn't support engine_payloadBodiesBy* methods + let mock_execution_layer = harness.mock_execution_layer.as_ref().unwrap(); + mock_execution_layer + .server + .set_engine_capabilities(EngineCapabilities { + get_payload_bodies_by_hash_v1: false, + get_payload_bodies_by_range_v1: false, + ..DEFAULT_ENGINE_CAPABILITIES + }); + // refresh capabilities cache + harness + .chain + .execution_layer + .as_ref() + .unwrap() + .get_engine_capabilities(Some(Duration::ZERO)) + .await + .unwrap(); + + // go to bellatrix fork + harness + .extend_slots(bellatrix_fork_epoch * slots_per_epoch) + .await; + // extend half an epoch + harness.extend_slots(slots_per_epoch / 2).await; + // trigger merge + harness + .execution_block_generator() + .move_to_terminal_block() + .expect("should move to terminal block"); + let timestamp = harness.get_timestamp_at_slot() + harness.spec.seconds_per_slot; + harness + .execution_block_generator() + .modify_last_block(|block| { + if let Block::PoW(terminal_block) = block { + terminal_block.timestamp = timestamp; + } + }); + // finish out merge epoch + harness.extend_slots(slots_per_epoch / 2).await; + // finish rest of epochs + harness + .extend_slots((num_epochs - 1 - bellatrix_fork_epoch) * slots_per_epoch) + .await; + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + + assert_eq!( + state.slot(), + Slot::new(num_blocks_produced as u64), + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced as u64 / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint().epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint().epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); + + let block_roots: Vec = harness + .chain + .forwards_iter_block_roots(Slot::new(0)) + .expect("should get iter") + .map(Result::unwrap) + .map(|(root, _)| root) + .collect(); + + let mut expected_blocks = vec![]; + // get all blocks the old fashioned way + for root in &block_roots { + let block = harness + .chain + .get_block(root) + .await + .expect("should get block") + .expect("block should exist"); + expected_blocks.push(block); + } + + for epoch in 0..num_epochs { + let start = epoch * slots_per_epoch; + let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch]; + epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]); + let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No) + .expect("should create streamer"); + let (block_tx, mut block_rx) = mpsc::unbounded_channel(); + streamer.stream(epoch_roots.clone(), block_tx).await; + + for (i, expected_root) in epoch_roots.into_iter().enumerate() { + let (found_root, found_block_result) = + block_rx.recv().await.expect("should get block"); + + assert_eq!( + found_root, expected_root, + "expected block root should match" + ); + match found_block_result.as_ref() { + Ok(maybe_block) => { + let found_block = maybe_block.clone().expect("should have a block"); + let expected_block = expected_blocks + .get(start + i) + .expect("should get expected block"); + assert_eq!( + found_block.as_ref(), + expected_block, + "expected block should match found block" + ); + } + Err(e) => panic!("Error retrieving block {}: {:?}", expected_root, e), + } + } + } + } +} diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 06f1262be4a..1cdd0ab89ed 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,6 +4,7 @@ use crate::attestation_verification::{ VerifiedUnaggregatedAttestation, }; use crate::attester_cache::{AttesterCache, AttesterCacheKey}; +use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::block_times_cache::BlockTimesCache; @@ -101,6 +102,7 @@ use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; +use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::consts::merge::INTERVALS_PER_SLOT; @@ -934,14 +936,42 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub async fn get_block_checking_early_attester_cache( - &self, - block_root: &Hash256, - ) -> Result>>, Error> { - if let Some(block) = self.early_attester_cache.get_block(*block_root) { - return Ok(Some(block)); - } - Ok(self.get_block(block_root).await?.map(Arc::new)) + pub fn get_blocks_checking_early_attester_cache( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::Yes)? + .launch_stream(block_roots, executor), + ) + } + + pub fn get_blocks( + self: &Arc, + block_roots: Vec, + executor: &TaskExecutor, + ) -> Result< + impl Stream< + Item = ( + Hash256, + Arc>>, Error>>, + ), + >, + Error, + > { + Ok( + BeaconBlockStreamer::::new(self, CheckEarlyAttesterCache::No)? + .launch_stream(block_roots, executor), + ) } /// Returns the block at the given root, if any. diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index ffc0065b9fd..2f60b1481f1 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,4 +1,5 @@ use crate::attester_cache::Error as AttesterCacheError; +use crate::beacon_block_streamer::Error as BlockStreamerError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; use crate::eth1_chain::Error as Eth1ChainError; @@ -143,6 +144,7 @@ pub enum BeaconChainError { ExecutionLayerMissing, BlockVariantLacksExecutionPayload(Hash256), ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, Box), + EngineGetCapabilititesFailed(Box), BlockHashMissingFromExecutionLayer(ExecutionBlockHash), InconsistentPayloadReconstructed { slot: Slot, @@ -150,6 +152,7 @@ pub enum BeaconChainError { canonical_transactions_root: Hash256, reconstructed_transactions_root: Hash256, }, + BlockStreamerError(BlockStreamerError), AddPayloadLogicError, ExecutionForkChoiceUpdateFailed(execution_layer::Error), PrepareProposerFailed(BlockProcessingError), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 69c5bcc24ee..b39ef784ccd 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -2,6 +2,7 @@ pub mod attestation_rewards; pub mod attestation_verification; mod attester_cache; pub mod beacon_block_reward; +mod beacon_block_streamer; mod beacon_chain; mod beacon_fork_choice_store; pub mod beacon_proposer_cache; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 14cdacae503..ed1e0f822df 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1444,6 +1444,37 @@ impl ExecutionLayer { } } + pub async fn get_payload_bodies_by_hash( + &self, + hashes: Vec, + ) -> Result>>, Error> { + self.engine() + .request(|engine: &Engine| async move { + engine.api.get_payload_bodies_by_hash_v1(hashes).await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + + pub async fn get_payload_bodies_by_range( + &self, + start: u64, + count: u64, + ) -> Result>>, Error> { + let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE); + self.engine() + .request(|engine: &Engine| async move { + engine + .api + .get_payload_bodies_by_range_v1(start, count) + .await + }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } + pub async fn get_payload_by_block_hash( &self, hash: ExecutionBlockHash, diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index 287050f66be..3ed99ca6068 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -45,6 +45,10 @@ lazy_static::lazy_static! { "execution_layer_get_payload_by_block_hash_time", "Time to reconstruct a payload from the EE using eth_getBlockByHash" ); + pub static ref EXECUTION_LAYER_GET_PAYLOAD_BODIES_BY_RANGE: Result = try_create_histogram( + "execution_layer_get_payload_bodies_by_range_time", + "Time to fetch a range of payload bodies from the EE" + ); pub static ref EXECUTION_LAYER_VERIFY_BLOCK_HASH: Result = try_create_histogram_with_buckets( "execution_layer_verify_block_hash_time", "Time to verify the execution block hash in Lighthouse, without the EL", diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index 63893375db6..fe3c6f274b8 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -205,6 +205,14 @@ impl ExecutionBlockGenerator { .and_then(|block| block.as_execution_block_with_tx()) } + pub fn execution_block_with_txs_by_number( + &self, + number: u64, + ) -> Option> { + self.block_by_number(number) + .and_then(|block| block.as_execution_block_with_tx()) + } + pub fn move_to_block_prior_to_terminal_block(&mut self) -> Result<(), String> { let target_block = self .terminal_block_number diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index 52a794ba1ae..3c0763a8fb9 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -40,6 +40,8 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { new_payload_v3: true, forkchoice_updated_v1: true, forkchoice_updated_v2: true, + get_payload_bodies_by_hash_v1: true, + get_payload_bodies_by_range_v1: true, get_payload_v1: true, get_payload_v2: true, get_payload_v3: true, diff --git a/consensus/types/src/execution_payload.rs b/consensus/types/src/execution_payload.rs index 6a8c41410ae..800a238d4de 100644 --- a/consensus/types/src/execution_payload.rs +++ b/consensus/types/src/execution_payload.rs @@ -179,3 +179,12 @@ impl ForkVersionDeserialize for ExecutionPayload { }) } } + +impl ExecutionPayload { + pub fn fork_name(&self) -> ForkName { + match self { + ExecutionPayload::Merge(_) => ForkName::Merge, + ExecutionPayload::Capella(_) => ForkName::Capella, + } + } +} diff --git a/testing/execution_engine_integration/src/nethermind.rs b/testing/execution_engine_integration/src/nethermind.rs index 720a4a73b95..485485c6fe3 100644 --- a/testing/execution_engine_integration/src/nethermind.rs +++ b/testing/execution_engine_integration/src/nethermind.rs @@ -11,7 +11,7 @@ use unused_port::unused_tcp4_port; /// We've pinned the Nethermind version since our method of using the `master` branch to /// find the latest tag isn't working. It appears Nethermind don't always tag on `master`. /// We should fix this so we always pull the latest version of Nethermind. -const NETHERMIND_BRANCH: &str = "release/1.14.6"; +const NETHERMIND_BRANCH: &str = "release/1.17.1"; const NETHERMIND_REPO_URL: &str = "https://github.com/NethermindEth/nethermind"; fn build_result(repo_dir: &Path) -> Output { @@ -67,7 +67,7 @@ impl NethermindEngine { .join("Nethermind.Runner") .join("bin") .join("Release") - .join("net6.0") + .join("net7.0") .join("Nethermind.Runner") } } @@ -95,7 +95,7 @@ impl GenericExecutionEngine for NethermindEngine { .arg("--datadir") .arg(datadir.path().to_str().unwrap()) .arg("--config") - .arg("kiln") + .arg("hive") .arg("--Init.ChainSpecPath") .arg(genesis_json_path.to_str().unwrap()) .arg("--Merge.TerminalTotalDifficulty") diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 0179c5c7a1d..5ad82a2f602 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -15,8 +15,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::time::sleep; use types::{ - Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ForkName, FullPayload, - Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, + Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, + ForkName, FullPayload, Hash256, MainnetEthSpec, PublicKeyBytes, Slot, Uint256, }; const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(30); @@ -624,12 +624,32 @@ async fn check_payload_reconstruction( ) { let reconstructed = ee .execution_layer - // FIXME: handle other forks here? - .get_payload_by_block_hash(payload.block_hash(), ForkName::Merge) + .get_payload_by_block_hash(payload.block_hash(), payload.fork_name()) .await .unwrap() .unwrap(); assert_eq!(reconstructed, *payload); + // also check via payload bodies method + let capabilities = ee + .execution_layer + .get_engine_capabilities(None) + .await + .unwrap(); + assert!( + // if the engine doesn't have these capabilities, we need to update the client in our tests + capabilities.get_payload_bodies_by_hash_v1 && capabilities.get_payload_bodies_by_range_v1, + "Testing engine does not support payload bodies methods" + ); + let mut bodies = ee + .execution_layer + .get_payload_bodies_by_hash(vec![payload.block_hash()]) + .await + .unwrap(); + assert_eq!(bodies.len(), 1); + let body = bodies.pop().unwrap().unwrap(); + let header = ExecutionPayloadHeader::from(payload.to_ref()); + let reconstructed_from_body = body.to_payload(header).unwrap(); + assert_eq!(reconstructed_from_body, *payload); } /// Returns the duration since the unix epoch.