Skip to content

Commit

Permalink
the full flow...
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Aug 16, 2023
1 parent a55e91d commit 5d984f0
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 163 deletions.
282 changes: 119 additions & 163 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@ pub trait BeaconChainTypes: Send + Sync + 'static {
type EthSpec: types::EthSpec;
}

pub enum PartialBeaconBlockType<E: EthSpec> {
Full(PartialBeaconBlock<E, FullPayload<E>>),
Blinded(PartialBeaconBlock<E, BlindedPayload<E>>),
}

/// Used internally to split block production into discrete functions.
struct PartialBeaconBlock<E: EthSpec, Payload: AbstractExecPayload<E>> {
state: BeaconState<E>,
Expand Down Expand Up @@ -4309,52 +4304,86 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let chain = self.clone();
let mut partial_beacon_block = self
.task_executor
.spawn_blocking_handle(
move || {
chain.determine_and_produce_partial_beacon_block(
state,
state_root_opt,
produce_at_slot,
randao_reveal,
validator_graffiti,
)
.clone()
.spawn_handle(
async move {
chain
.determine_and_produce_partial_beacon_block(
state,
state_root_opt,
produce_at_slot,
randao_reveal,
validator_graffiti,
)
.await
},
"produce_partial_beacon_block",
)
.ok_or(BlockProductionError::ShuttingDown)?
.await
.map_err(BlockProductionError::TokioJoin)??;
.map_err(BlockProductionError::TokioJoin)?
.ok_or(BlockProductionError::ShuttingDown)??;

// Part 2/3 (async)
//
// Wait for the execution layer to return an execution payload (if one is required).
let prepare_payload_handle = partial_beacon_block.prepare_payload_handle.take();
let block_contents_type = if let Some(prepare_payload_handle) = prepare_payload_handle {
Some(
prepare_payload_handle
.await
.map_err(BlockProductionError::TokioJoin)?
.ok_or(BlockProductionError::ShuttingDown)??,
)
} else {
None
};
let block_contents_type_option =
if let Some(prepare_payload_handle) = prepare_payload_handle {
Some(
prepare_payload_handle
.await
.map_err(BlockProductionError::TokioJoin)?
.ok_or(BlockProductionError::ShuttingDown)??,
)
} else {
None
};

let chain = self.clone();
self.task_executor
.spawn_blocking_handle(
move || {
chain.complete_determined_partial_beacon_block(
partial_beacon_block,
block_contents_type,
verification,
)
},
"complete_partial_beacon_block",
)
.ok_or(BlockProductionError::ShuttingDown)?
.await
.map_err(BlockProductionError::TokioJoin)?
if let Some(block_contents_type) = block_contents_type_option {
match block_contents_type {
BlockProposalContentsType::Full(block_contents) => {
let chain = self.clone();
let beacon_block_and_state = self.task_executor
.spawn_blocking_handle(
move || {
chain.complete_partial_beacon_block_v3(
partial_beacon_block,
block_contents,
verification,
)
},
"complete_partial_beacon_block",
)
.ok_or(BlockProductionError::ShuttingDown)?
.await
.map_err(BlockProductionError::TokioJoin)??;

Ok(BeaconBlockAndStateResponse::Full(beacon_block_and_state))
}
BlockProposalContentsType::Blinded(block_contents) => {
let chain = self.clone();
let beacon_block_and_state = self.task_executor
.spawn_blocking_handle(
move || {
chain.complete_partial_beacon_block_v3(
partial_beacon_block,
block_contents,
verification,
)
},
"complete_partial_beacon_block",
)
.ok_or(BlockProductionError::ShuttingDown)?
.await
.map_err(BlockProductionError::TokioJoin)??;

Ok(BeaconBlockAndStateResponse::Blinded(beacon_block_and_state))
}
}
} else {
Err(BlockProductionError::FailedToFetchBlock)
}
}

/// Produce a block for some `slot` upon the given `state`.
Expand Down Expand Up @@ -4435,7 +4464,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BlockProductionError::TokioJoin)?
}

fn determine_and_produce_partial_beacon_block(
async fn determine_and_produce_partial_beacon_block(
self: &Arc<Self>,
mut state: BeaconState<T::EthSpec>,
state_root_opt: Option<Hash256>,
Expand Down Expand Up @@ -4917,12 +4946,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

fn complete_determined_partial_beacon_block(
fn complete_partial_beacon_block_v3<Payload: AbstractExecPayload<T::EthSpec>>(
&self,
partial_beacon_block: PartialBeaconBlockV3<T::EthSpec>,
block_contents: Option<BlockProposalContentsType<T::EthSpec>>,
block_contents: BlockProposalContents<T::EthSpec, Payload>,
verification: ProduceBlockVerification,
) -> Result<BeaconBlockAndStateResponse<T::EthSpec>, BlockProductionError> {
) -> Result<BeaconBlockAndState<T::EthSpec, Payload>, BlockProductionError> {
let PartialBeaconBlockV3 {
mut state,
slot,
Expand All @@ -4944,8 +4973,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
bls_to_execution_changes,
} = partial_beacon_block;

let block_type = BlockType::Full;

let inner_block = match &state {
BeaconState::Base(_) => BeaconBlock::Base(BeaconBlockBase {
slot,
Expand Down Expand Up @@ -4983,119 +5010,51 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
_phantom: PhantomData,
},
}),
BeaconState::Merge(_) => {
let block_contents_type =
block_contents.ok_or(BlockProductionError::MissingExecutionPayload)?;
match block_contents_type {
BlockProposalContentsType::Full(block_content) => {
BeaconBlock::Merge(BeaconBlockMerge {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyMerge {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: block_content
.to_payload()
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
},
})
}
BlockProposalContentsType::Blinded(block_content) => {
block_type = BlockType::Blinded;
BeaconBlock::Merge(BeaconBlockMerge {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyMerge {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: BlindedPayloadMerge {
execution_payload_header: block_content
.to_payload()
.into()
},
},
})
}
}
}
BeaconState::Capella(_) => {
let block_contents_type =
block_contents.ok_or(BlockProductionError::MissingExecutionPayload)?;
match block_contents_type {
BlockProposalContentsType::Full(block_content) => {
BeaconBlock::Capella(BeaconBlockCapella {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyCapella {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: block_content
.to_payload()
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
bls_to_execution_changes: bls_to_execution_changes.into(),
},
})
}
BlockProposalContentsType::Blinded(block_content) => {
block_type = BlockType::Blinded;
BeaconBlock::Capella(BeaconBlockCapella {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyCapella {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: block_content
.to_payload()
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
bls_to_execution_changes: bls_to_execution_changes.into(),
},
})
}
}
}
BeaconState::Merge(_) => BeaconBlock::Merge(BeaconBlockMerge {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyMerge {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: block_contents
.to_payload()
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
},
}),
BeaconState::Capella(_) => BeaconBlock::Capella(BeaconBlockCapella {
slot,
proposer_index,
parent_root,
state_root: Hash256::zero(),
body: BeaconBlockBodyCapella {
randao_reveal,
eth1_data,
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: attestations.into(),
deposits: deposits.into(),
voluntary_exits: voluntary_exits.into(),
sync_aggregate: sync_aggregate
.ok_or(BlockProductionError::MissingSyncAggregate)?,
execution_payload: block_contents
.to_payload()
.try_into()
.map_err(|_| BlockProductionError::InvalidPayloadFork)?,
bls_to_execution_changes: bls_to_execution_changes.into(),
},
}),
};

let block = SignedBeaconBlock::from_block(
Expand Down Expand Up @@ -5152,10 +5111,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"slot" => block.slot()
);

match block_type {
BlockType::Blinded => Ok(BeaconBlockAndStateResponse::Full((block, state))),
BlockType::Full => Ok(BeaconBlockAndStateResponse::Blinded((block, state))),
}
Ok((block, state))
}

fn complete_partial_beacon_block<Payload: AbstractExecPayload<T::EthSpec>>(
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ use types::*;
pub type PreparePayloadResultV3<E> = Result<BlockProposalContentsType<E>, BlockProductionError>;
pub type PreparePayloadHandleV3<E> = JoinHandle<Option<PreparePayloadResultV3<E>>>;

pub enum PreparePayloadHandleType<E: EthSpec> {
Full(JoinHandle<Option<PreparePayloadResult<E, FullPayload<E>>>>),
Blinded(JoinHandle<Option<PreparePayloadResult<E, BlindedPayload<E>>>>),
}

pub type PreparePayloadResult<E, Payload> =
Result<BlockProposalContents<E, Payload>, BlockProductionError>;
pub type PreparePayloadHandle<E, Payload> = JoinHandle<Option<PreparePayloadResult<E, Payload>>>;
Expand Down

0 comments on commit 5d984f0

Please sign in to comment.