Skip to content

Commit

Permalink
cache proposer preparations
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jan 18, 2022
1 parent 2feef90 commit 815b4de
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes>(
let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?;
let random = *state.get_randao_mix(state.current_epoch())?;
let finalized_root = state.finalized_checkpoint().root;
let proposer_index = state.get_beacon_proposer_index(state.slot(), spec)? as u64;

// The finalized block hash is not included in the specification, however we provide this
// parameter so that the execution layer can produce a payload id if one is not already known
Expand Down Expand Up @@ -300,6 +301,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes>(
timestamp,
random,
finalized_block_hash.unwrap_or_else(Hash256::zero),
Some(proposer_index),
)
.await
.map_err(BlockProductionError::GetPayloadFailed)?;
Expand Down
50 changes: 46 additions & 4 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,7 +19,7 @@ use tokio::{
sync::{Mutex, MutexGuard},
time::{sleep, sleep_until, Instant},
};
use types::{ ChainSpec, ProposerPreparationData };
use types::{ChainSpec, ProposerPreparationData};

pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};

Expand Down Expand Up @@ -49,6 +50,7 @@ impl From<ApiError> for Error {
struct Inner {
engines: Engines<HttpJsonRpc>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationData>>,
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
executor: TaskExecutor,
log: Logger,
Expand Down Expand Up @@ -96,6 +98,7 @@ impl ExecutionLayer {
log: log.clone(),
},
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
Expand Down Expand Up @@ -127,6 +130,13 @@ impl ExecutionLayer {
self.inner.execution_blocks.lock().await
}

/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn proposer_preparation_data(
&self,
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationData>> {
self.inner.proposer_preparation_data.lock().await
}

fn log(&self) -> &Logger {
&self.inner.log
}
Expand Down Expand Up @@ -240,16 +250,37 @@ impl ExecutionLayer {
}

/// Updates the proposer preparation data provided by validators
pub fn update_proposer_preparation(&self, preparation_data: Vec<ProposerPreparationData>) -> Result<(), Error> {
pub fn update_proposer_preparation(
&self,
preparation_data: Vec<ProposerPreparationData>,
) -> Result<(), Error> {
self.block_on_generic(|_| async {
self.update_proposer_preparation_async(preparation_data.clone())
.await
})
.unwrap()
}

/// Updates the proposer preparation data provided by validators
async fn update_proposer_preparation_async(
&self,
preparation_data: Vec<ProposerPreparationData>,
) -> Result<(), Error> {
info!(
self.log(),
"Received proposer preperation data";
"count" => preparation_data.len(),
);

let mut proposer_preparation_data = self.proposer_preparation_data().await;
let mut preparation_entries = preparation_data.clone();
while let Some(preparation_entry) = preparation_entries.pop() {
proposer_preparation_data.insert(preparation_entry.validator_index, preparation_entry);
}

Ok(())
}


/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
Expand All @@ -265,8 +296,19 @@ impl ExecutionLayer {
timestamp: u64,
random: Hash256,
finalized_block_hash: Hash256,
proposer_index: Option<u64>,
) -> Result<ExecutionPayload<T>, Error> {
let suggested_fee_recipient = self.suggested_fee_recipient()?;
let suggested_fee_recipient = (if proposer_index.is_some() {
self.proposer_preparation_data()
.await
.get(&proposer_index.unwrap())
.map(|preparation_data| preparation_data.fee_recipient)
} else {
None
})
.or_else(|| self.suggested_fee_recipient().ok())
.unwrap();

debug!(
self.log(),
"Issuing engine_getPayload";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<T: EthSpec> MockExecutionLayer<T> {

let payload = self
.el
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash)
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash, None)
.await
.unwrap();
let block_hash = payload.block_hash;
Expand Down
24 changes: 13 additions & 11 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ForkName, ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
EthSpec, ForkName, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SyncCommitteeMessage, SyncContributionData,
};
use version::{
add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection,
Expand Down Expand Up @@ -2174,7 +2174,8 @@ pub fn serve<T: BeaconChainTypes>(
})
},
);


// POST validator/prepare_beacon_proposer
let post_validator_prepare_beacon_proposer = eth1_v1
.and(warp::path("validator"))
.and(warp::path("prepare_beacon_proposer"))
Expand All @@ -2183,22 +2184,23 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and(warp::body::json())
.and_then(
|chain: Arc<BeaconChain<T>>,
preparation_data: Vec<ProposerPreparationData>| {
|chain: Arc<BeaconChain<T>>, preparation_data: Vec<ProposerPreparationData>| {
blocking_json_task(move || {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;

execution_layer
.update_proposer_preparation(preparation_data)
.map_err(|_| {
warp_utils::reject::custom_not_found(
"error processing proposer preparation data".to_string(),
.map_err(|_e| {
warp_utils::reject::custom_bad_request(
"error processing proposer preparations".to_string(),
)
})
})?;

Ok(())
})
},
);
Expand Down
1 change: 0 additions & 1 deletion validator_client/src/preparation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl<T: SlotClock + 'static, E: EthSpec> PreparationService<T, E> {
.filter_map(|pubkey| {
let validator_index = self.validator_store.validator_index(&pubkey);
if let Some(validator_index) = validator_index {

let fee_recipient = self
.fee_recipient_file
.clone()
Expand Down

0 comments on commit 815b4de

Please sign in to comment.