Skip to content

Commit

Permalink
Allow per validator fee recipient via flag or file in validator clien…
Browse files Browse the repository at this point in the history
…t (similar to graffiti / graffiti-file) (#2924)

## Issue Addressed

#2883 

## Proposed Changes

* Added `suggested-fee-recipient` & `suggested-fee-recipient-file` flags to validator client (similar to graffiti / graffiti-file implementation).
* Added proposer preparation service to VC, which sends the fee-recipient of all known validators to the BN via [/eth/v1/validator/prepare_beacon_proposer](ethereum/beacon-APIs#178) api once per slot
* Added [/eth/v1/validator/prepare_beacon_proposer](ethereum/beacon-APIs#178) api endpoint and preparation data caching
* Added cleanup routine to remove cached proposer preparations when not updated for 2 epochs

## Additional Info

Changed the Implementation following the discussion in #2883.



Co-authored-by: pk910 <[email protected]>
Co-authored-by: Paul Hauner <[email protected]>
Co-authored-by: Philipp K <[email protected]>
  • Loading branch information
3 people committed Feb 8, 2022
1 parent d172c0b commit 89b286c
Show file tree
Hide file tree
Showing 33 changed files with 1,060 additions and 40 deletions.
12 changes: 9 additions & 3 deletions account_manager/src/validator/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,15 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin
eprintln!("Successfully imported keystore.");
num_imported_keystores += 1;

let validator_def =
ValidatorDefinition::new_keystore_with_password(&dest_keystore, password_opt, None)
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;
let graffiti = None;
let suggested_fee_recipient = None;
let validator_def = ValidatorDefinition::new_keystore_with_password(
&dest_keystore,
password_opt,
graffiti,
suggested_fee_recipient,
)
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;

defs.push(validator_def);

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
BeaconState::Merge(_) => {
let sync_aggregate = get_sync_aggregate()?;
let execution_payload = get_execution_payload(self, &state)?;
let execution_payload = get_execution_payload(self, &state, proposer_index)?;
BeaconBlock::Merge(BeaconBlockMerge {
slot,
proposer_index,
Expand Down
10 changes: 8 additions & 2 deletions beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,26 @@ pub fn validate_execution_payload_for_gossip<T: BeaconChainTypes>(
pub fn get_execution_payload<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<ExecutionPayload<T::EthSpec>, BlockProductionError> {
Ok(prepare_execution_payload_blocking(chain, state)?.unwrap_or_default())
Ok(prepare_execution_payload_blocking(chain, state, proposer_index)?.unwrap_or_default())
}

/// Wraps the async `prepare_execution_payload` function as a blocking task.
pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BlockProductionError::ExecutionLayerMissing)?;

execution_layer
.block_on_generic(|_| async { prepare_execution_payload(chain, state).await })
.block_on_generic(|_| async {
prepare_execution_payload(chain, state, proposer_index).await
})
.map_err(BlockProductionError::BlockingFailed)?
}

Expand All @@ -240,6 +244,7 @@ pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
pub async fn prepare_execution_payload<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
state: &BeaconState<T::EthSpec>,
proposer_index: u64,
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
let spec = &chain.spec;
let execution_layer = chain
Expand Down Expand Up @@ -300,6 +305,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes>(
timestamp,
random,
finalized_block_hash.unwrap_or_else(Hash256::zero),
proposer_index,
)
.await
.map_err(BlockProductionError::GetPayloadFailed)?;
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ where

// Spawn a routine that tracks the status of the execution engines.
execution_layer.spawn_watchdog_routine(beacon_chain.slot_clock.clone());

// Spawn a routine that removes expired proposer preparations.
execution_layer.spawn_clean_proposer_preparation_routine::<TSlotClock, TEthSpec>(
beacon_chain.slot_clock.clone(),
);
}
}

Expand Down
151 changes: 143 additions & 8 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use lru::LruCache;
use sensitive_url::SensitiveUrl;
use slog::{crit, debug, error, info, Logger};
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,7 +19,7 @@ use tokio::{
sync::{Mutex, MutexGuard},
time::{sleep, sleep_until, Instant},
};
use types::ChainSpec;
use types::{ChainSpec, Epoch, ProposerPreparationData};

pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};

Expand All @@ -30,6 +31,16 @@ pub mod test_utils;
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;

/// A fee recipient address for use during block production. Only used as a very last resort if
/// there is no address provided by the user.
///
/// ## Note
///
/// This is *not* the zero-address, since Geth has been known to return errors for a coinbase of
/// 0x00..00.
const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];

#[derive(Debug)]
pub enum Error {
NoEngines,
Expand All @@ -46,9 +57,16 @@ impl From<ApiError> for Error {
}
}

#[derive(Clone)]
pub struct ProposerPreparationDataEntry {
update_epoch: Epoch,
preparation_data: ProposerPreparationData,
}

struct Inner {
engines: Engines<HttpJsonRpc>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
executor: TaskExecutor,
log: Logger,
Expand Down Expand Up @@ -96,6 +114,7 @@ impl ExecutionLayer {
log: log.clone(),
},
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
executor,
log,
Expand All @@ -116,17 +135,18 @@ impl ExecutionLayer {
&self.inner.executor
}

fn suggested_fee_recipient(&self) -> Result<Address, Error> {
self.inner
.suggested_fee_recipient
.ok_or(Error::FeeRecipientUnspecified)
}

/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
self.inner.execution_blocks.lock().await
}

/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
async fn proposer_preparation_data(
&self,
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationDataEntry>> {
self.inner.proposer_preparation_data.lock().await
}

fn log(&self) -> &Logger {
&self.inner.log
}
Expand Down Expand Up @@ -234,11 +254,124 @@ impl ExecutionLayer {
self.engines().upcheck_not_synced(Logging::Disabled).await;
}

/// Spawns a routine which cleans the cached proposer preparations periodically.
pub fn spawn_clean_proposer_preparation_routine<S: SlotClock + 'static, T: EthSpec>(
&self,
slot_clock: S,
) {
let preparation_cleaner = |el: ExecutionLayer| async move {
// Start the loop to periodically clean proposer preparation cache.
loop {
if let Some(duration_to_next_epoch) =
slot_clock.duration_to_next_epoch(T::slots_per_epoch())
{
// Wait for next epoch
sleep(duration_to_next_epoch).await;

match slot_clock
.now()
.map(|slot| slot.epoch(T::slots_per_epoch()))
{
Some(current_epoch) => el
.clean_proposer_preparation(current_epoch)
.await
.map_err(|e| {
error!(
el.log(),
"Failed to clean proposer preparation cache";
"error" => format!("{:?}", e)
)
})
.unwrap_or(()),
None => error!(el.log(), "Failed to get current epoch from slot clock"),
}
} else {
error!(el.log(), "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot and retry.
sleep(slot_clock.slot_duration()).await;
}
}
};

self.spawn(preparation_cleaner, "exec_preparation_cleanup");
}

/// Returns `true` if there is at least one synced and reachable engine.
pub async fn is_synced(&self) -> bool {
self.engines().any_synced().await
}

/// Updates the proposer preparation data provided by validators
pub fn update_proposer_preparation_blocking(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
self.block_on_generic(|_| async move {
self.update_proposer_preparation(update_epoch, preparation_data)
.await
})?
}

/// Updates the proposer preparation data provided by validators
async fn update_proposer_preparation(
&self,
update_epoch: Epoch,
preparation_data: &[ProposerPreparationData],
) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;
for preparation_entry in preparation_data {
proposer_preparation_data.insert(
preparation_entry.validator_index,
ProposerPreparationDataEntry {
update_epoch,
preparation_data: preparation_entry.clone(),
},
);
}

Ok(())
}

/// Removes expired entries from cached proposer preparations
async fn clean_proposer_preparation(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut proposer_preparation_data = self.proposer_preparation_data().await;

// Keep all entries that have been updated in the last 2 epochs
let retain_epoch = current_epoch.saturating_sub(Epoch::new(2));
proposer_preparation_data.retain(|_validator_index, preparation_entry| {
preparation_entry.update_epoch >= retain_epoch
});

Ok(())
}

/// Returns the fee-recipient address that should be used to build a block
async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
if let Some(preparation_data_entry) =
self.proposer_preparation_data().await.get(&proposer_index)
{
// The values provided via the API have first priority.
preparation_data_entry.preparation_data.fee_recipient
} else if let Some(address) = self.inner.suggested_fee_recipient {
// If there has been no fee recipient provided via the API, but the BN has been provided
// with a global default address, use that.
address
} else {
// If there is no user-provided fee recipient, use a junk value and complain loudly.
crit!(
self.log(),
"Fee recipient unknown";
"msg" => "the suggested_fee_recipient was unknown during block production. \
a junk address was used, rewards were lost! \
check the --suggested-fee-recipient flag and VC configuration.",
"proposer_index" => ?proposer_index
);

Address::from_slice(&DEFAULT_SUGGESTED_FEE_RECIPIENT)
}
}

/// Maps to the `engine_getPayload` JSON-RPC call.
///
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
Expand All @@ -254,8 +387,10 @@ impl ExecutionLayer {
timestamp: u64,
random: Hash256,
finalized_block_hash: Hash256,
proposer_index: u64,
) -> Result<ExecutionPayload<T>, Error> {
let suggested_fee_recipient = self.suggested_fee_recipient()?;
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;

debug!(
self.log(),
"Issuing engine_getPayload";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
.await
.unwrap();

let validator_index = 0;
let payload = self
.el
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash)
.get_payload::<T>(
parent_hash,
timestamp,
random,
finalized_block_hash,
validator_index,
)
.await
.unwrap();
let block_hash = payload.block_hash;
Expand Down
54 changes: 51 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{
Attestation, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ForkName, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock,
SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
EthSpec, ForkName, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
Slot, SyncCommitteeMessage, SyncContributionData,
};
use version::{
add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection,
Expand Down Expand Up @@ -2186,6 +2186,53 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST validator/prepare_beacon_proposer
let post_validator_prepare_beacon_proposer = eth1_v1
.and(warp::path("validator"))
.and(warp::path("prepare_beacon_proposer"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(warp::addr::remote())
.and(log_filter.clone())
.and(warp::body::json())
.and_then(
|chain: Arc<BeaconChain<T>>,
client_addr: Option<SocketAddr>,
log: Logger,
preparation_data: Vec<ProposerPreparationData>| {
blocking_json_task(move || {
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_epoch = chain
.epoch()
.map_err(warp_utils::reject::beacon_chain_error)?;

debug!(
log,
"Received proposer preparation data";
"count" => preparation_data.len(),
"client" => client_addr
.map(|a| a.to_string())
.unwrap_or_else(|| "unknown".to_string()),
);

execution_layer
.update_proposer_preparation_blocking(current_epoch, &preparation_data)
.map_err(|_e| {
warp_utils::reject::custom_bad_request(
"error processing proposer preparations".to_string(),
)
})?;

Ok(())
})
},
);

// POST validator/sync_committee_subscriptions
let post_validator_sync_committee_subscriptions = eth1_v1
.and(warp::path("validator"))
Expand Down Expand Up @@ -2710,6 +2757,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(post_validator_contribution_and_proofs.boxed())
.or(post_validator_beacon_committee_subscriptions.boxed())
.or(post_validator_sync_committee_subscriptions.boxed())
.or(post_validator_prepare_beacon_proposer.boxed())
.or(post_lighthouse_liveness.boxed())
.or(post_lighthouse_database_reconstruct.boxed())
.or(post_lighthouse_database_historical_blocks.boxed()),
Expand Down
Loading

0 comments on commit 89b286c

Please sign in to comment.