From 5d984f0c583873b0abd60064e147a183e01d2c71 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Wed, 16 Aug 2023 14:20:03 +0300 Subject: [PATCH] the full flow... --- beacon_node/beacon_chain/src/beacon_chain.rs | 282 ++++++++---------- .../beacon_chain/src/execution_payload.rs | 5 + 2 files changed, 124 insertions(+), 163 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 29f45751f5a..f20c2c98993 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -276,11 +276,6 @@ pub trait BeaconChainTypes: Send + Sync + 'static { type EthSpec: types::EthSpec; } -pub enum PartialBeaconBlockType { - Full(PartialBeaconBlock>), - Blinded(PartialBeaconBlock>), -} - /// Used internally to split block production into discrete functions. struct PartialBeaconBlock> { state: BeaconState, @@ -4309,52 +4304,86 @@ impl BeaconChain { let chain = self.clone(); let mut partial_beacon_block = self .task_executor - .spawn_blocking_handle( - move || { - chain.determine_and_produce_partial_beacon_block( - state, - state_root_opt, - produce_at_slot, - randao_reveal, - validator_graffiti, - ) + .clone() + .spawn_handle( + async move { + chain + .determine_and_produce_partial_beacon_block( + state, + state_root_opt, + produce_at_slot, + randao_reveal, + validator_graffiti, + ) + .await }, "produce_partial_beacon_block", ) .ok_or(BlockProductionError::ShuttingDown)? .await - .map_err(BlockProductionError::TokioJoin)??; + .map_err(BlockProductionError::TokioJoin)? + .ok_or(BlockProductionError::ShuttingDown)??; // Part 2/3 (async) // // Wait for the execution layer to return an execution payload (if one is required). let prepare_payload_handle = partial_beacon_block.prepare_payload_handle.take(); - let block_contents_type = if let Some(prepare_payload_handle) = prepare_payload_handle { - Some( - prepare_payload_handle - .await - .map_err(BlockProductionError::TokioJoin)? - .ok_or(BlockProductionError::ShuttingDown)??, - ) - } else { - None - }; + let block_contents_type_option = + if let Some(prepare_payload_handle) = prepare_payload_handle { + Some( + prepare_payload_handle + .await + .map_err(BlockProductionError::TokioJoin)? + .ok_or(BlockProductionError::ShuttingDown)??, + ) + } else { + None + }; - let chain = self.clone(); - self.task_executor - .spawn_blocking_handle( - move || { - chain.complete_determined_partial_beacon_block( - partial_beacon_block, - block_contents_type, - verification, - ) - }, - "complete_partial_beacon_block", - ) - .ok_or(BlockProductionError::ShuttingDown)? - .await - .map_err(BlockProductionError::TokioJoin)? + if let Some(block_contents_type) = block_contents_type_option { + match block_contents_type { + BlockProposalContentsType::Full(block_contents) => { + let chain = self.clone(); + let beacon_block_and_state = self.task_executor + .spawn_blocking_handle( + move || { + chain.complete_partial_beacon_block_v3( + partial_beacon_block, + block_contents, + verification, + ) + }, + "complete_partial_beacon_block", + ) + .ok_or(BlockProductionError::ShuttingDown)? + .await + .map_err(BlockProductionError::TokioJoin)??; + + Ok(BeaconBlockAndStateResponse::Full(beacon_block_and_state)) + } + BlockProposalContentsType::Blinded(block_contents) => { + let chain = self.clone(); + let beacon_block_and_state = self.task_executor + .spawn_blocking_handle( + move || { + chain.complete_partial_beacon_block_v3( + partial_beacon_block, + block_contents, + verification, + ) + }, + "complete_partial_beacon_block", + ) + .ok_or(BlockProductionError::ShuttingDown)? + .await + .map_err(BlockProductionError::TokioJoin)??; + + Ok(BeaconBlockAndStateResponse::Blinded(beacon_block_and_state)) + } + } + } else { + Err(BlockProductionError::FailedToFetchBlock) + } } /// Produce a block for some `slot` upon the given `state`. @@ -4435,7 +4464,7 @@ impl BeaconChain { .map_err(BlockProductionError::TokioJoin)? } - fn determine_and_produce_partial_beacon_block( + async fn determine_and_produce_partial_beacon_block( self: &Arc, mut state: BeaconState, state_root_opt: Option, @@ -4917,12 +4946,12 @@ impl BeaconChain { }) } - fn complete_determined_partial_beacon_block( + fn complete_partial_beacon_block_v3>( &self, partial_beacon_block: PartialBeaconBlockV3, - block_contents: Option>, + block_contents: BlockProposalContents, verification: ProduceBlockVerification, - ) -> Result, BlockProductionError> { + ) -> Result, BlockProductionError> { let PartialBeaconBlockV3 { mut state, slot, @@ -4944,8 +4973,6 @@ impl BeaconChain { bls_to_execution_changes, } = partial_beacon_block; - let block_type = BlockType::Full; - let inner_block = match &state { BeaconState::Base(_) => BeaconBlock::Base(BeaconBlockBase { slot, @@ -4983,119 +5010,51 @@ impl BeaconChain { _phantom: PhantomData, }, }), - BeaconState::Merge(_) => { - let block_contents_type = - block_contents.ok_or(BlockProductionError::MissingExecutionPayload)?; - match block_contents_type { - BlockProposalContentsType::Full(block_content) => { - BeaconBlock::Merge(BeaconBlockMerge { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyMerge { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations: attestations.into(), - deposits: deposits.into(), - voluntary_exits: voluntary_exits.into(), - sync_aggregate: sync_aggregate - .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_content - .to_payload() - .try_into() - .map_err(|_| BlockProductionError::InvalidPayloadFork)?, - }, - }) - } - BlockProposalContentsType::Blinded(block_content) => { - block_type = BlockType::Blinded; - BeaconBlock::Merge(BeaconBlockMerge { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyMerge { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations: attestations.into(), - deposits: deposits.into(), - voluntary_exits: voluntary_exits.into(), - sync_aggregate: sync_aggregate - .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: BlindedPayloadMerge { - execution_payload_header: block_content - .to_payload() - .into() - }, - }, - }) - } - } - } - BeaconState::Capella(_) => { - let block_contents_type = - block_contents.ok_or(BlockProductionError::MissingExecutionPayload)?; - match block_contents_type { - BlockProposalContentsType::Full(block_content) => { - BeaconBlock::Capella(BeaconBlockCapella { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyCapella { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations: attestations.into(), - deposits: deposits.into(), - voluntary_exits: voluntary_exits.into(), - sync_aggregate: sync_aggregate - .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_content - .to_payload() - .try_into() - .map_err(|_| BlockProductionError::InvalidPayloadFork)?, - bls_to_execution_changes: bls_to_execution_changes.into(), - }, - }) - } - BlockProposalContentsType::Blinded(block_content) => { - block_type = BlockType::Blinded; - BeaconBlock::Capella(BeaconBlockCapella { - slot, - proposer_index, - parent_root, - state_root: Hash256::zero(), - body: BeaconBlockBodyCapella { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings: proposer_slashings.into(), - attester_slashings: attester_slashings.into(), - attestations: attestations.into(), - deposits: deposits.into(), - voluntary_exits: voluntary_exits.into(), - sync_aggregate: sync_aggregate - .ok_or(BlockProductionError::MissingSyncAggregate)?, - execution_payload: block_content - .to_payload() - .try_into() - .map_err(|_| BlockProductionError::InvalidPayloadFork)?, - bls_to_execution_changes: bls_to_execution_changes.into(), - }, - }) - } - } - } + BeaconState::Merge(_) => BeaconBlock::Merge(BeaconBlockMerge { + slot, + proposer_index, + parent_root, + state_root: Hash256::zero(), + body: BeaconBlockBodyMerge { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings: proposer_slashings.into(), + attester_slashings: attester_slashings.into(), + attestations: attestations.into(), + deposits: deposits.into(), + voluntary_exits: voluntary_exits.into(), + sync_aggregate: sync_aggregate + .ok_or(BlockProductionError::MissingSyncAggregate)?, + execution_payload: block_contents + .to_payload() + .try_into() + .map_err(|_| BlockProductionError::InvalidPayloadFork)?, + }, + }), + BeaconState::Capella(_) => BeaconBlock::Capella(BeaconBlockCapella { + slot, + proposer_index, + parent_root, + state_root: Hash256::zero(), + body: BeaconBlockBodyCapella { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings: proposer_slashings.into(), + attester_slashings: attester_slashings.into(), + attestations: attestations.into(), + deposits: deposits.into(), + voluntary_exits: voluntary_exits.into(), + sync_aggregate: sync_aggregate + .ok_or(BlockProductionError::MissingSyncAggregate)?, + execution_payload: block_contents + .to_payload() + .try_into() + .map_err(|_| BlockProductionError::InvalidPayloadFork)?, + bls_to_execution_changes: bls_to_execution_changes.into(), + }, + }), }; let block = SignedBeaconBlock::from_block( @@ -5152,10 +5111,7 @@ impl BeaconChain { "slot" => block.slot() ); - match block_type { - BlockType::Blinded => Ok(BeaconBlockAndStateResponse::Full((block, state))), - BlockType::Full => Ok(BeaconBlockAndStateResponse::Blinded((block, state))), - } + Ok((block, state)) } fn complete_partial_beacon_block>( diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 167f1208229..2b44f5c6f83 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -33,6 +33,11 @@ use types::*; pub type PreparePayloadResultV3 = Result, BlockProductionError>; pub type PreparePayloadHandleV3 = JoinHandle>>; +pub enum PreparePayloadHandleType { + Full(JoinHandle>>>), + Blinded(JoinHandle>>>), +} + pub type PreparePayloadResult = Result, BlockProductionError>; pub type PreparePayloadHandle = JoinHandle>>;