Skip to content

Commit

Permalink
refactor based on feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Oct 14, 2023
1 parent dc746c4 commit 1667b32
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 149 deletions.
193 changes: 54 additions & 139 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub enum Error {
InvalidForkForPayload,
InvalidPayloadBody(String),
BeaconStateError(BeaconStateError),
PayloadTypeMismatch,
}

impl From<BeaconStateError> for Error {
Expand Down Expand Up @@ -134,6 +135,23 @@ pub enum BlockProposalContents<T: EthSpec, Payload: AbstractExecPayload<T>> {
},
}

impl<T: EthSpec> From<BlockProposalContents<T, FullPayload<T>>>
for BlockProposalContents<T, BlindedPayload<T>>
{
fn from(item: BlockProposalContents<T, FullPayload<T>>) -> Self {
let block_value = item.block_value().to_owned();

let blinded_payload: BlockProposalContents<T, BlindedPayload<T>> =
BlockProposalContents::Payload {
payload: item.to_payload().execution_payload().into(),
block_value,
_phantom: PhantomData,
};

blinded_payload
}
}

impl<T: EthSpec, Payload: AbstractExecPayload<T>> BlockProposalContents<T, Payload> {
pub fn payload(&self) -> &Payload {
match self {
Expand Down Expand Up @@ -703,7 +721,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await?
}
BlockProductionVersion::FullV2 => self
.get_full_payload_with_v3(
.get_full_payload_with(
parent_hash,
payload_attributes,
forkchoice_update_params,
Expand All @@ -729,7 +747,13 @@ impl<T: EthSpec> ExecutionLayer<T> {
&metrics::EXECUTION_LAYER_GET_PAYLOAD_SOURCE,
&[metrics::LOCAL],
);
Ok(BlockProposalContentsType::Full(block_proposal_contents))
if matches!(block_production_version, BlockProductionVersion::BlindedV2) {
Ok(BlockProposalContentsType::Blinded(
block_proposal_contents.into(),
))
} else {
Ok(BlockProposalContentsType::Full(block_proposal_contents))
}
}
BlockProposalContentsType::Blinded(block_proposal_contents) => {
metrics::inc_counter_vec(
Expand Down Expand Up @@ -768,7 +792,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
);

// Wait for the builder *and* local EL to produce a payload (or return an error).
let ((relay_result, relay_duration), (local_result, local_duration)) = tokio::join!(
let ((relay_result, relay_duration), (local_result_type, local_duration)) = tokio::join!(
timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async {
builder
.get_builder_header::<T, BlindedPayload<T>>(
Expand All @@ -779,7 +803,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await
}),
timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async {
self.get_full_payload_caching::<FullPayload<T>>(
self.get_full_payload_caching(
parent_hash,
payload_attributes,
forkchoice_update_params,
Expand All @@ -789,6 +813,11 @@ impl<T: EthSpec> ExecutionLayer<T> {
})
);

let local_result = match local_result_type? {
BlockProposalContentsType::Full(payload) => Ok(payload),
BlockProposalContentsType::Blinded(_) => Err(Error::PayloadTypeMismatch),
};

info!(
self.log(),
"Requested blinded execution payload";
Expand Down Expand Up @@ -1016,7 +1045,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
),
}
}
self.get_full_payload_caching_v3(
self.get_full_payload_caching(
parent_hash,
payload_attributes,
forkchoice_update_params,
Expand Down Expand Up @@ -1050,7 +1079,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
);

// Wait for the builder *and* local EL to produce a payload (or return an error).
let ((relay_result, relay_duration), (local_result, local_duration)) = tokio::join!(
let ((relay_result, relay_duration), (local_result_type, local_duration)) = tokio::join!(
timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async {
builder
.get_builder_header::<T, BlindedPayload<T>>(
Expand All @@ -1061,7 +1090,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await
}),
timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async {
self.get_full_payload_caching::<BlindedPayload<T>>(
self.get_full_payload_caching(
parent_hash,
payload_attributes,
forkchoice_update_params,
Expand All @@ -1071,6 +1100,12 @@ impl<T: EthSpec> ExecutionLayer<T> {
})
);

let local_result: Result<BlockProposalContents<T, BlindedPayload<T>>, Error> =
match local_result_type? {
BlockProposalContentsType::Full(payload) => Ok(payload.into()),
BlockProposalContentsType::Blinded(payload) => Ok(payload),
};

info!(
self.log(),
"Requested blinded execution payload";
Expand Down Expand Up @@ -1298,46 +1333,32 @@ impl<T: EthSpec> ExecutionLayer<T> {
),
}
}
println!("YOOOO");
let payload = self
.get_full_payload_caching::<BlindedPayload<T>>(
let payload_type = self
.get_full_payload_caching(
parent_hash,
payload_attributes,
forkchoice_update_params,
current_fork,
)
.await?;
Ok(ProvenancedPayload::Local(
BlockProposalContentsType::Blinded(payload),
))
match payload_type {
BlockProposalContentsType::Full(payload) => Ok(ProvenancedPayload::Local(
BlockProposalContentsType::Blinded(payload.into()),
)),
BlockProposalContentsType::Blinded(payload) => Ok(ProvenancedPayload::Local(
BlockProposalContentsType::Blinded(payload),
)),
}
}

/// Get a full payload and cache its result in the execution layer's payload cache.
async fn get_full_payload_caching_v3(
async fn get_full_payload_caching(
&self,
parent_hash: ExecutionBlockHash,
payload_attributes: &PayloadAttributes,
forkchoice_update_params: ForkchoiceUpdateParameters,
current_fork: ForkName,
) -> Result<BlockProposalContentsType<T>, Error> {
self.get_full_payload_with_v3(
parent_hash,
payload_attributes,
forkchoice_update_params,
current_fork,
Self::cache_payload,
)
.await
}

/// Get a full payload and cache its result in the execution layer's payload cache.
async fn get_full_payload_caching<Payload: AbstractExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
payload_attributes: &PayloadAttributes,
forkchoice_update_params: ForkchoiceUpdateParameters,
current_fork: ForkName,
) -> Result<BlockProposalContents<T, Payload>, Error> {
self.get_full_payload_with(
parent_hash,
payload_attributes,
Expand All @@ -1348,7 +1369,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await
}

async fn get_full_payload_with_v3(
async fn get_full_payload_with(
&self,
parent_hash: ExecutionBlockHash,
payload_attributes: &PayloadAttributes,
Expand Down Expand Up @@ -1454,112 +1475,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
.map_err(Error::EngineError)
}

async fn get_full_payload_with<Payload: AbstractExecPayload<T>>(
&self,
parent_hash: ExecutionBlockHash,
payload_attributes: &PayloadAttributes,
forkchoice_update_params: ForkchoiceUpdateParameters,
current_fork: ForkName,
f: fn(&ExecutionLayer<T>, ExecutionPayloadRef<T>) -> Option<ExecutionPayload<T>>,
) -> Result<BlockProposalContents<T, Payload>, Error> {
self.engine()
.request(move |engine| async move {
let payload_id = if let Some(id) = engine
.get_payload_id(&parent_hash, payload_attributes)
.await
{
// The payload id has been cached for this engine.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::HIT],
);
id
} else {
// The payload id has *not* been cached. Trigger an artificial
// fork choice update to retrieve a payload ID.
metrics::inc_counter_vec(
&metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID,
&[metrics::MISS],
);
let fork_choice_state = ForkchoiceState {
head_block_hash: parent_hash,
safe_block_hash: forkchoice_update_params
.justified_hash
.unwrap_or_else(ExecutionBlockHash::zero),
finalized_block_hash: forkchoice_update_params
.finalized_hash
.unwrap_or_else(ExecutionBlockHash::zero),
};

let response = engine
.notify_forkchoice_updated(
fork_choice_state,
Some(payload_attributes.clone()),
self.log(),
)
.await?;

match response.payload_id {
Some(payload_id) => payload_id,
None => {
error!(
self.log(),
"Exec engine unable to produce payload";
"msg" => "No payload ID, the engine is likely syncing. \
This has the potential to cause a missed block proposal.",
"status" => ?response.payload_status
);
return Err(ApiError::PayloadIdUnavailable);
}
}
};

let payload_fut = async {
debug!(
self.log(),
"Issuing engine_getPayload";
"suggested_fee_recipient" => ?payload_attributes.suggested_fee_recipient(),
"prev_randao" => ?payload_attributes.prev_randao(),
"timestamp" => payload_attributes.timestamp(),
"parent_hash" => ?parent_hash,
);
engine.api.get_payload::<T>(current_fork, payload_id).await
};
let payload_response = payload_fut.await;
let (execution_payload, block_value) = payload_response.map(|payload_response| {
if payload_response.execution_payload_ref().fee_recipient() != payload_attributes.suggested_fee_recipient() {
error!(
self.log(),
"Inconsistent fee recipient";
"msg" => "The fee recipient returned from the Execution Engine differs \
from the suggested_fee_recipient set on the beacon node. This could \
indicate that fees are being diverted to another address. Please \
ensure that the value of suggested_fee_recipient is set correctly and \
that the Execution Engine is trusted.",
"fee_recipient" => ?payload_response.execution_payload_ref().fee_recipient(),
"suggested_fee_recipient" => ?payload_attributes.suggested_fee_recipient(),
);
}
if f(self, payload_response.execution_payload_ref()).is_some() {
warn!(
self.log(),
"Duplicate payload cached, this might indicate redundant proposal \
attempts."
);
}
payload_response.into()
})?;
Ok(BlockProposalContents::Payload {
payload: execution_payload.into(),
block_value,
_phantom: PhantomData,
})
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// Maps to the `engine_newPayload` JSON-RPC call.
pub async fn notify_new_payload(
&self,
Expand Down
17 changes: 12 additions & 5 deletions beacon_node/execution_layer/src/test_utils/mock_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,25 @@ pub fn serve<E: EthSpec>(
finalized_hash: Some(finalized_execution_hash),
};

let payload = builder
let payload_type = builder
.el
.get_full_payload_caching::<BlindedPayload<E>>(
.get_full_payload_caching(
head_execution_hash,
&payload_attributes,
forkchoice_update_params,
fork,
)
.await
.map_err(|_| reject("couldn't get payload"))?
.to_payload()
.to_execution_payload_header();
.map_err(|_| reject("couldn't get payload"))?;

let payload = match payload_type {
crate::BlockProposalContentsType::Full(payload) => {
payload.to_payload().to_execution_payload_header()
}
crate::BlockProposalContentsType::Blinded(payload) => {
payload.to_payload().to_execution_payload_header()
}
};

let mut message = BuilderBid {
header: BlindedPayload::from(payload),
Expand Down
Loading

0 comments on commit 1667b32

Please sign in to comment.