Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Prioritise important parts of block processing #3696

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
711 changes: 433 additions & 278 deletions beacon_node/beacon_chain/src/beacon_chain.rs

Large diffs are not rendered by default.

239 changes: 140 additions & 99 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOC
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{
BeaconForkChoice, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
BeaconForkChoice, ForkChoiceError, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT,
MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::EventKind;
use execution_layer::PayloadStatus;
use fork_choice::PayloadVerificationStatus;
use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
use proto_array::Block as ProtoBlock;
use safe_arith::ArithError;
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::per_block_processing::is_merge_transition_block;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
Expand Down Expand Up @@ -550,8 +550,22 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);

let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len());

for (block_root, block) in &chain_segment {
signature_verifier.include_all_signatures(block, Some(*block_root), None)?;
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(*block_root);

signature_verifier.include_all_signatures(block, &mut consensus_context)?;

// Save the block and its consensus context. The context will have had its proposer index
// and attesting indices filled in, which can be used to accelerate later block processing.
signature_verified_blocks.push(SignatureVerifiedBlock {
block: block.clone(),
block_root: *block_root,
parent: None,
consensus_context,
});
}

if signature_verifier.verify().is_err() {
Expand All @@ -560,22 +574,6 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(

drop(pubkey_cache);

let mut signature_verified_blocks = chain_segment
.into_iter()
.map(|(block_root, block)| {
// Proposer index has already been verified above during signature verification.
let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index());
SignatureVerifiedBlock {
block,
block_root,
parent: None,
consensus_context,
}
})
.collect::<Vec<_>>();

if let Some(signature_verified_block) = signature_verified_blocks.first_mut() {
signature_verified_block.parent = Some(parent);
}
Expand Down Expand Up @@ -625,6 +623,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<T::EthSpec>,
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
}

Expand Down Expand Up @@ -944,13 +943,14 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);

signature_verifier.include_all_signatures(&block, Some(block_root), None)?;
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(block_root);

signature_verifier.include_all_signatures(&block, &mut consensus_context)?;

if signature_verifier.verify().is_ok() {
Ok(Self {
consensus_context: ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index()),
consensus_context,
block,
block_root,
parent: Some(parent),
Expand Down Expand Up @@ -995,16 +995,16 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

// Gossip verification has already checked the proposer index. Use it to check the RANDAO
// signature.
let verified_proposer_index = Some(block.message().proposer_index());
let mut consensus_context = from.consensus_context;
signature_verifier
.include_all_signatures_except_proposal(&block, verified_proposer_index)?;
.include_all_signatures_except_proposal(&block, &mut consensus_context)?;

if signature_verifier.verify().is_ok() {
Ok(Self {
block,
block_root: from.block_root,
parent: Some(parent),
consensus_context: from.consensus_context,
consensus_context,
})
} else {
Err(BlockError::InvalidSignature)
Expand Down Expand Up @@ -1127,6 +1127,78 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

check_block_relevancy(&block, block_root, chain)?;

// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
// with the payload verification.
let payload_notifier =
PayloadNotifier::new(chain.clone(), block.clone(), &parent.pre_state)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&parent.pre_state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();

// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};

// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
//
// It is important that this function is called *after* `per_slot_processing`, since the
// `randao` may change.
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
let payload_verification_status = payload_notifier.notify_new_payload().await?;

// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);

// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}

Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;

/*
* Advance the given `parent.beacon_state` to the slot of the given `block`.
*/
Expand Down Expand Up @@ -1231,79 +1303,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
summaries.push(summary);
}
}
metrics::stop_timer(catchup_timer);

let block_slot = block.slot();
let state_current_epoch = state.current_epoch();

// Define a future that will verify the execution payload with an execution engine (but
// don't execute it yet).
let payload_notifier = PayloadNotifier::new(chain.clone(), block.clone(), &state)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();

// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};

// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
//
// It is important that this function is called *after* `per_slot_processing`, since the
// `randao` may change.
let payload_verification_status = payload_notifier.notify_new_payload().await?;

// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);

// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}

Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;

// If the block is sufficiently recent, notify the validator monitor.
if let Some(slot) = chain.slot_clock.now() {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
Expand All @@ -1330,8 +1334,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

metrics::stop_timer(catchup_timer);

/*
* Build the committee caches on the state.
*/
Expand Down Expand Up @@ -1421,13 +1423,52 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
});
}

/*
* Apply the block's attestations to fork choice.
*
* We're running in parallel with the payload verification at this point, so this is
* free real estate.
*/
let current_slot = chain.slot()?;
let mut fork_choice = chain.canonical_head.fork_choice_write_lock();
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved

// Register each attester slashing in the block with fork choice.
for attester_slashing in block.message().body().attester_slashings() {
fork_choice.on_attester_slashing(attester_slashing);
}

// Register each attestation in the block with fork choice.
for (i, attestation) in block.message().body().attestations().iter().enumerate() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);

let indexed_attestation = consensus_context
.get_indexed_attestation(&state, attestation)
.map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?;

match fork_choice.on_attestation(
current_slot,
indexed_attestation,
AttestationFromBlock::True,
&chain.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The
// block might be very old and therefore the attestations useless to fork choice.
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
}
drop(fork_choice);

Ok(Self {
block,
block_root,
state,
parent_block: parent.beacon_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_handle,
})
}
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
// are cheap and doing them here ensures we protect the execution engine from junk.
partially_verify_execution_payload(
state,
block.slot(),
block.message().execution_payload()?,
&chain.spec,
)
Expand Down Expand Up @@ -357,7 +358,8 @@ pub fn get_execution_payload<
let spec = &chain.spec;
let current_epoch = state.current_epoch();
let is_merge_transition_complete = is_merge_transition_complete(state);
let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?;
let timestamp =
compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?;
let random = *state.get_randao_mix(current_epoch)?;
let latest_execution_payload_header_block_hash =
state.latest_execution_payload_header()?.block_hash;
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ lazy_static! {
"beacon_block_processing_state_root_seconds",
"Time spent calculating the state root when processing a block."
);
pub static ref BLOCK_PROCESSING_POST_EXEC_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_processing_post_exec_pre_attestable_seconds",
"Time between finishing execution processing and the block becoming attestable",
linear_buckets(5e-3, 5e-3, 10)
);
pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_db_write_seconds",
"Time spent writing a newly processed block and state to DB"
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ where

pub fn get_timestamp_at_slot(&self) -> u64 {
let state = self.get_current_state();
compute_timestamp_at_slot(&state, &self.spec).unwrap()
compute_timestamp_at_slot(&state, state.slot(), &self.spec).unwrap()
}

pub fn get_current_state_and_root(&self) -> (BeaconState<E>, Hash256) {
Expand Down
Loading