diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3a5763c7ffd..4198425a7e7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2869,7 +2869,6 @@ impl BeaconChain { randao_reveal: Signature, slot: Slot, validator_graffiti: Option, - validator_fee_recipient: Option
, ) -> Result, BlockProductionError> { metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS); let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES); @@ -2927,7 +2926,6 @@ impl BeaconChain { slot, randao_reveal, validator_graffiti, - validator_fee_recipient, ) } @@ -2950,7 +2948,6 @@ impl BeaconChain { produce_at_slot: Slot, randao_reveal: Signature, validator_graffiti: Option, - validator_fee_recipient: Option
, ) -> Result, BlockProductionError> { let eth1_chain = self .eth1_chain @@ -3099,8 +3096,7 @@ impl BeaconChain { } BeaconState::Merge(_) => { let sync_aggregate = get_sync_aggregate()?; - let execution_payload = - get_execution_payload(self, &state, validator_fee_recipient)?; + let execution_payload = get_execution_payload(self, &state)?; BeaconBlock::Merge(BeaconBlockMerge { slot, proposer_index, diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 92169061ca9..c19bba61268 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -204,16 +204,14 @@ pub fn validate_execution_payload_for_gossip( pub fn get_execution_payload( chain: &BeaconChain, state: &BeaconState, - fee_recipient: Option
, ) -> Result, BlockProductionError> { - Ok(prepare_execution_payload_blocking(chain, state, fee_recipient)?.unwrap_or_default()) + Ok(prepare_execution_payload_blocking(chain, state)?.unwrap_or_default()) } /// Wraps the async `prepare_execution_payload` function as a blocking task. pub fn prepare_execution_payload_blocking( chain: &BeaconChain, state: &BeaconState, - fee_recipient: Option
, ) -> Result>, BlockProductionError> { let execution_layer = chain .execution_layer @@ -221,9 +219,7 @@ pub fn prepare_execution_payload_blocking( .ok_or(BlockProductionError::ExecutionLayerMissing)?; execution_layer - .block_on_generic(|_| async { - prepare_execution_payload(chain, state, fee_recipient).await - }) + .block_on_generic(|_| async { prepare_execution_payload(chain, state).await }) .map_err(BlockProductionError::BlockingFailed)? } @@ -244,7 +240,6 @@ pub fn prepare_execution_payload_blocking( pub async fn prepare_execution_payload( chain: &BeaconChain, state: &BeaconState, - fee_recipient: Option
, ) -> Result>, BlockProductionError> { let spec = &chain.spec; let execution_layer = chain @@ -305,7 +300,6 @@ pub async fn prepare_execution_payload( timestamp, random, finalized_block_hash.unwrap_or_else(Hash256::zero), - fee_recipient, ) .await .map_err(BlockProductionError::GetPayloadFailed)?; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2f1a9c19a3c..574895296dd 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -587,7 +587,7 @@ where let (block, state) = self .chain - .produce_block_on_state(state, None, slot, randao_reveal, Some(graffiti), None) + .produce_block_on_state(state, None, slot, randao_reveal, Some(graffiti)) .unwrap(); let signed_block = block.sign( @@ -641,7 +641,7 @@ where let (block, state) = self .chain - .produce_block_on_state(state, None, slot, randao_reveal, Some(graffiti), None) + .produce_block_on_state(state, None, slot, randao_reveal, Some(graffiti)) .unwrap(); let signed_block = block.sign( diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 95de7c954b9..5c069f0b0b1 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -254,14 +254,8 @@ impl ExecutionLayer { timestamp: u64, random: Hash256, finalized_block_hash: Hash256, - validator_fee_recipient: Option
, ) -> Result, Error> { - // Override the beacon node's suggested fee-recipient with fee-recipient from the validator, if present. - let suggested_fee_recipient = match validator_fee_recipient { - Some(fee_recipient) => fee_recipient, - None => self.suggested_fee_recipient()?, - }; - + let suggested_fee_recipient = self.suggested_fee_recipient()?; debug!( self.log(), "Issuing engine_getPayload"; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index f06dd98e0e5..59345bc01f2 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -129,7 +129,7 @@ impl MockExecutionLayer { let payload = self .el - .get_payload::(parent_hash, timestamp, random, finalized_block_hash, None) + .get_payload::(parent_hash, timestamp, random, finalized_block_hash) .await .unwrap(); let block_hash = payload.block_hash; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 1811b758f7c..85c464466c1 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1869,12 +1869,7 @@ pub fn serve( })?; let (block, _) = chain - .produce_block( - randao_reveal, - slot, - query.graffiti.map(Into::into), - query.fee_recipient, - ) + .produce_block(randao_reveal, slot, query.graffiti.map(Into::into)) .map_err(warp_utils::reject::block_production_error)?; let fork_name = block .to_ref() diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 7f9796c6d82..878af7a0395 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1880,7 +1880,7 @@ impl ApiTester { let block = self .client - .get_validator_blocks::(slot, &randao_reveal, None, None) + .get_validator_blocks::(slot, &randao_reveal, None) .await .unwrap() .data; diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 7c7b7a2ea8a..98187114676 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -881,6 +881,23 @@ impl BeaconNodeHttpClient { Ok(()) } + /// `POST validator/prepare_beacon_proposer` + pub async fn post_validator_prepare_beacon_proposer( + &self, + preparation_data: &[ProposerPreparationData], + ) -> Result<(), Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("prepare_beacon_proposer"); + + self.post(path, &preparation_data).await?; + + Ok(()) + } + /// `GET config/fork_schedule` pub async fn get_config_fork_schedule(&self) -> Result>, Error> { let mut path = self.eth_path(V1)?; @@ -1127,7 +1144,6 @@ impl BeaconNodeHttpClient { slot: Slot, randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, - fee_recipient: Option<&Address>, ) -> Result>, Error> { let mut path = self.eth_path(V2)?; @@ -1145,11 +1161,6 @@ impl BeaconNodeHttpClient { .append_pair("graffiti", &graffiti.to_string()); } - if let Some(fee_recipient) = fee_recipient { - path.query_pairs_mut() - .append_pair("fee_recipient", &format!("{:?}", fee_recipient)); - } - self.get(path).await } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 10aa927ac8d..be65dd8776c 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -575,7 +575,6 @@ pub struct ProposerData { pub struct ValidatorBlocksQuery { pub randao_reveal: SignatureBytes, pub graffiti: Option, - pub fee_recipient: Option
, } #[derive(Clone, Serialize, Deserialize)] diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 5b1d3707ae8..c0e623f292a 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -47,6 +47,7 @@ pub mod graffiti; pub mod historical_batch; pub mod indexed_attestation; pub mod pending_attestation; +pub mod proposer_preparation_data; pub mod proposer_slashing; pub mod relative_epoch; pub mod selection_proof; @@ -126,6 +127,7 @@ pub use crate::participation_flags::ParticipationFlags; pub use crate::participation_list::ParticipationList; pub use crate::pending_attestation::PendingAttestation; pub use crate::preset::{AltairPreset, BasePreset}; +pub use crate::proposer_preparation_data::ProposerPreparationData; pub use crate::proposer_slashing::ProposerSlashing; pub use crate::relative_epoch::{Error as RelativeEpochError, RelativeEpoch}; pub use crate::selection_proof::SelectionProof; diff --git a/consensus/types/src/proposer_preparation_data.rs b/consensus/types/src/proposer_preparation_data.rs new file mode 100644 index 00000000000..f2ea967114f --- /dev/null +++ b/consensus/types/src/proposer_preparation_data.rs @@ -0,0 +1,12 @@ +use crate::*; +use serde::{Deserialize, Serialize}; + +/// A proposer preparation, created when a validator prepares the beacon node for potential proposers +/// by supplying information required when proposing blocks for the given validators. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct ProposerPreparationData { + /// The validators index. + pub validator_index: u64, + /// The fee-recipient address. + pub fee_recipient: Address, +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 6e91fb56da7..0cba70481f1 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,6 +1,5 @@ use crate::{ beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, - fee_recipient_file::FeeRecipientFile, graffiti_file::GraffitiFile, }; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; @@ -11,7 +10,7 @@ use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Address, EthSpec, PublicKeyBytes, Slot}; +use types::{EthSpec, PublicKeyBytes, Slot}; /// Builds a `BlockService`. pub struct BlockServiceBuilder { @@ -21,8 +20,6 @@ pub struct BlockServiceBuilder { context: Option>, graffiti: Option, graffiti_file: Option, - fee_recipient: Option
, - fee_recipient_file: Option, } impl BlockServiceBuilder { @@ -34,8 +31,6 @@ impl BlockServiceBuilder { context: None, graffiti: None, graffiti_file: None, - fee_recipient: None, - fee_recipient_file: None, } } @@ -69,16 +64,6 @@ impl BlockServiceBuilder { self } - pub fn fee_recipient(mut self, fee_recipient: Option
) -> Self { - self.fee_recipient = fee_recipient; - self - } - - pub fn fee_recipient_file(mut self, fee_recipient_file: Option) -> Self { - self.fee_recipient_file = fee_recipient_file; - self - } - pub fn build(self) -> Result, String> { Ok(BlockService { inner: Arc::new(Inner { @@ -96,8 +81,6 @@ impl BlockServiceBuilder { .ok_or("Cannot build BlockService without runtime_context")?, graffiti: self.graffiti, graffiti_file: self.graffiti_file, - fee_recipient: self.fee_recipient, - fee_recipient_file: self.fee_recipient_file, }), }) } @@ -111,8 +94,6 @@ pub struct Inner { context: RuntimeContext, graffiti: Option, graffiti_file: Option, - fee_recipient: Option
, - fee_recipient_file: Option, } /// Attempts to produce attestations for any block producer(s) at the start of the epoch. @@ -276,18 +257,6 @@ impl BlockService { .or_else(|| self.validator_store.graffiti(&validator_pubkey)) .or(self.graffiti); - let fee_recipient = self - .fee_recipient_file - .clone() - .and_then(|mut g| match g.load_fee_recipient(&validator_pubkey) { - Ok(g) => g, - Err(e) => { - warn!(log, "Failed to read fee-recipient file"; "error" => ?e); - None - } - }) - .or(self.fee_recipient); - let randao_reveal_ref = &randao_reveal; let self_ref = &self; let proposer_index = self.validator_store.validator_index(&validator_pubkey); @@ -300,12 +269,7 @@ impl BlockService { &[metrics::BEACON_BLOCK_HTTP_GET], ); let block = beacon_node - .get_validator_blocks( - slot, - randao_reveal_ref, - graffiti.as_ref(), - fee_recipient.as_ref(), - ) + .get_validator_blocks(slot, randao_reveal_ref, graffiti.as_ref()) .await .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))? .data; diff --git a/validator_client/src/fee_recipient_file.rs b/validator_client/src/fee_recipient_file.rs index f912bd48d94..2c5ac20e0e7 100644 --- a/validator_client/src/fee_recipient_file.rs +++ b/validator_client/src/fee_recipient_file.rs @@ -40,6 +40,21 @@ impl FeeRecipientFile { } } + /// Returns the fee-recipient corresponding to the given public key if present, else returns the + /// default fee-recipient. + /// + /// Returns an error if loading from the fee-recipient file fails. + pub fn get_fee_recipient( + &mut self, + public_key: &PublicKeyBytes, + ) -> Result, Error> { + Ok(self + .fee_recipients + .get(public_key) + .copied() + .or(self.default)) + } + /// Loads the fee-recipient file and populates the default fee-recipient and `fee_recipients` hashmap. /// Returns the fee-recipient corresponding to the given public key if present, else returns the /// default fee-recipient. diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index a5a3ce13418..c58ac25f1f9 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -10,6 +10,7 @@ mod graffiti_file; mod http_metrics; mod key_cache; mod notifier; +mod preparation_service; mod signing_method; mod sync_committee_service; @@ -39,6 +40,7 @@ use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts}; use http_api::ApiSecret; use notifier::spawn_notifier; use parking_lot::RwLock; +use preparation_service::{PreparationService, PreparationServiceBuilder}; use reqwest::Certificate; use slog::{error, info, warn, Logger}; use slot_clock::SlotClock; @@ -83,6 +85,7 @@ pub struct ProductionValidatorClient { attestation_service: AttestationService, sync_committee_service: SyncCommitteeService, doppelganger_service: Option>, + preparation_service: PreparationService, validator_store: Arc>, http_api_listen_addr: Option, config: Config, @@ -397,8 +400,6 @@ impl ProductionValidatorClient { .runtime_context(context.service_context("block".into())) .graffiti(config.graffiti) .graffiti_file(config.graffiti_file.clone()) - .fee_recipient(config.fee_recipient) - .fee_recipient_file(config.fee_recipient_file.clone()) .build()?; let attestation_service = AttestationServiceBuilder::new() @@ -409,6 +410,15 @@ impl ProductionValidatorClient { .runtime_context(context.service_context("attestation".into())) .build()?; + let preparation_service = PreparationServiceBuilder::new() + .slot_clock(slot_clock.clone()) + .validator_store(validator_store.clone()) + .beacon_nodes(beacon_nodes.clone()) + .runtime_context(context.service_context("preparation".into())) + .fee_recipient(config.fee_recipient) + .fee_recipient_file(config.fee_recipient_file.clone()) + .build()?; + let sync_committee_service = SyncCommitteeService::new( duties_service.clone(), validator_store.clone(), @@ -430,6 +440,7 @@ impl ProductionValidatorClient { attestation_service, sync_committee_service, doppelganger_service, + preparation_service, validator_store, config, http_api_listen_addr: None, @@ -461,6 +472,11 @@ impl ProductionValidatorClient { .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start sync committee service: {}", e))?; + self.preparation_service + .clone() + .start_update_service(&self.context.eth2_config.spec) + .map_err(|e| format!("Unable to start preparation service: {}", e))?; + if let Some(doppelganger_service) = self.doppelganger_service.clone() { DoppelgangerService::start_update_service( doppelganger_service, diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs new file mode 100644 index 00000000000..fda1bc0ed9d --- /dev/null +++ b/validator_client/src/preparation_service.rs @@ -0,0 +1,228 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::{ + fee_recipient_file::FeeRecipientFile, + validator_store::{DoppelgangerStatus, ValidatorStore}, +}; +use environment::RuntimeContext; +use slog::{error, info}; +use slot_clock::SlotClock; +use std::ops::Deref; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; +use types::{Address, ChainSpec, EthSpec, ProposerPreparationData}; + +/// Builds an `PreparationService`. +pub struct PreparationServiceBuilder { + validator_store: Option>>, + slot_clock: Option, + beacon_nodes: Option>>, + context: Option>, + fee_recipient: Option
, + fee_recipient_file: Option, +} + +impl PreparationServiceBuilder { + pub fn new() -> Self { + Self { + validator_store: None, + slot_clock: None, + beacon_nodes: None, + context: None, + fee_recipient: None, + fee_recipient_file: None, + } + } + + pub fn validator_store(mut self, store: Arc>) -> Self { + self.validator_store = Some(store); + self + } + + pub fn slot_clock(mut self, slot_clock: T) -> Self { + self.slot_clock = Some(slot_clock); + self + } + + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); + self + } + + pub fn runtime_context(mut self, context: RuntimeContext) -> Self { + self.context = Some(context); + self + } + + pub fn fee_recipient(mut self, fee_recipient: Option
) -> Self { + self.fee_recipient = fee_recipient; + self + } + + pub fn fee_recipient_file(mut self, fee_recipient_file: Option) -> Self { + self.fee_recipient_file = fee_recipient_file; + self + } + + pub fn build(self) -> Result, String> { + Ok(PreparationService { + inner: Arc::new(Inner { + validator_store: self + .validator_store + .ok_or("Cannot build PreparationService without validator_store")?, + slot_clock: self + .slot_clock + .ok_or("Cannot build PreparationService without slot_clock")?, + beacon_nodes: self + .beacon_nodes + .ok_or("Cannot build PreparationService without beacon_nodes")?, + context: self + .context + .ok_or("Cannot build PreparationService without runtime_context")?, + fee_recipient: self.fee_recipient, + fee_recipient_file: self.fee_recipient_file, + }), + }) + } +} + +/// Helper to minimise `Arc` usage. +pub struct Inner { + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + context: RuntimeContext, + fee_recipient: Option
, + fee_recipient_file: Option, +} + +/// Attempts to produce proposer preparations for all known validators at the beginning of each epoch. +pub struct PreparationService { + inner: Arc>, +} + +impl Clone for PreparationService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Deref for PreparationService { + type Target = Inner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +impl PreparationService { + /// Starts the service which periodically produces proposer preparations. + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); + + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let duration_to_next_epoch = self + .slot_clock + .duration_to_next_epoch(E::slots_per_epoch()) + .ok_or("Unable to determine duration to next epoch")?; + + info!( + log, + "Proposer preparation service started"; + "next_update_millis" => duration_to_next_epoch.as_millis() + ); + + let executor = self.context.executor.clone(); + + let interval_fut = async move { + loop { + if let Some(duration_to_next_epoch) = + self.slot_clock.duration_to_next_epoch(E::slots_per_epoch()) + { + sleep(duration_to_next_epoch).await; + self.prepare_proposers_and_publish().await.unwrap(); + } else { + error!(log, "Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + sleep(slot_duration).await; + continue; + } + } + }; + + executor.spawn(interval_fut, "preparation_service"); + Ok(()) + } + + /// Prepare proposer preparations and send to beacon node + async fn prepare_proposers_and_publish(&self) -> Result<(), String> { + let log = self.context.log(); + + let all_pubkeys: Vec<_> = self + .validator_store + .voting_pubkeys(DoppelgangerStatus::ignored); + + if self.fee_recipient_file.is_some() { + self.fee_recipient_file + .clone() + .unwrap() + .read_fee_recipient_file() + .unwrap(); + } + + let preparation_data: Vec<_> = all_pubkeys + .into_iter() + .filter_map(|pubkey| { + let validator_index = self.validator_store.validator_index(&pubkey); + if let Some(validator_index) = validator_index { + + let fee_recipient = self + .fee_recipient_file + .clone() + .and_then(|mut g| match g.get_fee_recipient(&pubkey) { + Ok(g) => g, + Err(_e) => None, + }) + .or(self.fee_recipient); + + fee_recipient.map(|fee_recipient| ProposerPreparationData { + validator_index, + fee_recipient, + }) + } else { + None + } + }) + .collect(); + + let proposal_preparation = preparation_data.as_slice(); + + let preparation_data_len = preparation_data.len(); + if preparation_data_len > 0 { + // Post the proposer preparations to the BN. + match self + .beacon_nodes + .first_success(RequireSynced::Yes, |beacon_node| async move { + beacon_node + .post_validator_prepare_beacon_proposer(proposal_preparation) + .await + }) + .await + { + Ok(()) => info!( + log, + "Successfully published proposer preparation"; + "count" => preparation_data_len, + ), + Err(e) => error!( + log, + "Unable to publish proposer preparation"; + "error" => %e, + ), + } + } + + Ok(()) + } +}