Skip to content

Commit

Permalink
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ef…
Browse files Browse the repository at this point in the history
…-tests-electra
  • Loading branch information
realbigsean committed Jul 16, 2024
2 parents 064c461 + bf2f0b0 commit cf01312
Show file tree
Hide file tree
Showing 25 changed files with 356 additions and 498 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ serde_json = "1"
serde_repr = "0.1"
serde_yaml = "0.9"
sha2 = "0.9"
slog = { version = "2", features = ["max_level_trace", "release_max_level_trace", "nested-values"] }
slog = { version = "2", features = ["max_level_debug", "release_max_level_debug", "nested-values"] }
slog-async = "2"
slog-term = "2"
sloggers = { version = "2", features = ["json"] }
Expand Down
21 changes: 14 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3088,14 +3088,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
notify_execution_layer,
)?;
publish_fn()?;

// Record the time it took to complete consensus verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_consensus_verified(block_root, block_slot, timestamp)
}

let executed_block = chain.into_executed_block(execution_pending).await?;
// Record the time it took to ask the execution layer.
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_execution_time(
block_root,
block_slot,
seen_timestamp,
)

// Record the *additional* time it took to wait for execution layer verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
self.block_times_cache
.write()
.set_time_executed(block_root, block_slot, timestamp)
}

match executed_block {
Expand Down
122 changes: 87 additions & 35 deletions beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ type BlockRoot = Hash256;
pub struct Timestamps {
pub observed: Option<Duration>,
pub all_blobs_observed: Option<Duration>,
pub execution_time: Option<Duration>,
pub consensus_verified: Option<Duration>,
pub started_execution: Option<Duration>,
pub executed: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
Expand All @@ -32,7 +34,9 @@ pub struct BlockDelays {
pub observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs.
pub all_blobs_observed: Option<Duration>,
/// The time it took to get verification from the EL for the block.
/// The time it took to complete consensus verification of the block.
pub consensus_verification_time: Option<Duration>,
/// The time it took to complete execution verification of the block.
pub execution_time: Option<Duration>,
/// The delay from the start of the slot before the block became available
///
Expand All @@ -58,13 +62,16 @@ impl BlockDelays {
let all_blobs_observed = times
.all_blobs_observed
.and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time));
let consensus_verification_time = times
.consensus_verified
.and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?));
let execution_time = times
.execution_time
.and_then(|execution_time| execution_time.checked_sub(times.observed?));
.executed
.and_then(|executed| executed.checked_sub(times.started_execution?));
// Duration since UNIX epoch at which block became available.
let available_time = times.execution_time.map(|execution_time| {
std::cmp::max(execution_time, times.all_blobs_observed.unwrap_or_default())
});
let available_time = times
.executed
.map(|executed| std::cmp::max(executed, times.all_blobs_observed.unwrap_or_default()));
// Duration from the start of the slot until the block became available.
let available_delay =
available_time.and_then(|available_time| available_time.checked_sub(slot_start_time));
Expand All @@ -80,6 +87,7 @@ impl BlockDelays {
BlockDelays {
observed,
all_blobs_observed,
consensus_verification_time,
execution_time,
available: available_delay,
attestable,
Expand Down Expand Up @@ -155,6 +163,9 @@ impl BlockTimesCache {
slot: Slot,
timestamp: Duration,
) {
// Unlike other functions in this file, we update the blob observed time only if it is
// *greater* than existing blob observation times. This allows us to know the observation
// time of the last blob to arrive.
let block_times = self
.cache
.entry(block_root)
Expand All @@ -168,48 +179,89 @@ impl BlockTimesCache {
}
}

pub fn set_execution_time(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
/// Set the timestamp for `field` if that timestamp is less than any previously known value.
///
/// If no previous value is known for the field, then the supplied timestamp will always be
/// stored.
pub fn set_time_if_less(
&mut self,
block_root: BlockRoot,
slot: Slot,
field: impl Fn(&mut Timestamps) -> &mut Option<Duration>,
timestamp: Duration,
) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.execution_time
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.execution_time = Some(timestamp);
let existing_timestamp = field(&mut block_times.timestamps);
if existing_timestamp.map_or(true, |prev| timestamp < prev) {
*existing_timestamp = Some(timestamp);
}
}

pub fn set_time_consensus_verified(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.consensus_verified,
timestamp,
)
}

pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.executed,
timestamp,
)
}

pub fn set_time_started_execution(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.started_execution,
timestamp,
)
}

pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.attestable
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.attestable = Some(timestamp);
}
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.attestable,
timestamp,
)
}

pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.imported = Some(timestamp);
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.imported,
timestamp,
)
}

pub fn set_time_set_as_head(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
block_times.timestamps.set_as_head = Some(timestamp);
self.set_time_if_less(
block_root,
slot,
|timestamps| &mut timestamps.set_as_head,
timestamp,
)
}

pub fn get_block_delays(
Expand Down
19 changes: 18 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::{EventKind, PublishBlockRequest};
use eth2::types::{BlockGossip, EventKind, PublishBlockRequest};
use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
Expand Down Expand Up @@ -974,6 +974,16 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Validate the block's execution_payload (if any).
validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?;

// Beacon API block_gossip events
if let Some(event_handler) = chain.event_handler.as_ref() {
if event_handler.has_block_gossip_subscribers() {
event_handler.register(EventKind::BlockGossip(Box::new(BlockGossip {
slot: block.slot(),
block: block_root,
})));
}
}

// Having checked the proposer index and the block root we can cache them.
let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
Expand Down Expand Up @@ -1334,6 +1344,13 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
if let Some(started_execution) = chain.slot_clock.now_duration() {
chain.block_times_cache.write().set_time_started_execution(
block_root,
block.slot(),
started_execution,
);
}
let payload_verification_status = payload_notifier.notify_new_payload().await?;

// If the payload did not validate or invalidate the block, check to see if this block is
Expand Down
11 changes: 11 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,15 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
.as_millis() as i64,
);

// The time it took to check the validity within Lighthouse
metrics::set_gauge(
&metrics::BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME,
block_delays
.consensus_verification_time
.unwrap_or_else(|| Duration::from_secs(0))
.as_millis() as i64,
);

// The time it took to check the validity with the EL
metrics::set_gauge(
&metrics::BEACON_BLOCK_DELAY_EXECUTION_TIME,
Expand Down Expand Up @@ -1447,6 +1456,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),
Expand All @@ -1463,6 +1473,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
"total_delay_ms" => block_delay_total.as_millis(),
"observed_delay_ms" => format_delay(&block_delays.observed),
"blob_delay_ms" => format_delay(&block_delays.all_blobs_observed),
"consensus_time_ms" => format_delay(&block_delays.consensus_verification_time),
"execution_time_ms" => format_delay(&block_delays.execution_time),
"available_delay_ms" => format_delay(&block_delays.available),
"attestable_delay_ms" => format_delay(&block_delays.attestable),
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
log: Logger,
}

Expand Down Expand Up @@ -51,6 +52,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);

Self {
attestation_tx,
Expand All @@ -69,6 +71,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
proposer_slashing_tx,
attester_slashing_tx,
bls_to_execution_change_tx,
block_gossip_tx,
log,
}
}
Expand Down Expand Up @@ -147,6 +150,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.bls_to_execution_change_tx
.send(kind)
.map(|count| log_count("bls to execution change", count)),
EventKind::BlockGossip(_) => self
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand Down Expand Up @@ -217,6 +224,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.bls_to_execution_change_tx.subscribe()
}

pub fn subscribe_block_gossip(&self) -> Receiver<EventKind<E>> {
self.block_gossip_tx.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand Down Expand Up @@ -272,4 +283,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_bls_to_execution_change_subscribers(&self) -> bool {
self.bls_to_execution_change_tx.receiver_count() > 0
}

pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}
}
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,11 @@ lazy_static! {
"Duration between the start of the block's slot and the time the block was observed.",
);

pub static ref BEACON_BLOCK_DELAY_CONSENSUS_VERIFICATION_TIME: Result<IntGauge> = try_create_int_gauge(
"beacon_block_delay_consensus_verification_time",
"The time taken to verify the block within Lighthouse",
);

pub static ref BEACON_BLOCK_DELAY_EXECUTION_TIME: Result<IntGauge> = try_create_int_gauge(
"beacon_block_delay_execution_time",
"The duration in verifying the block with the execution layer.",
Expand Down
11 changes: 7 additions & 4 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use warp::hyper::Body;
use warp::sse::Event;
use warp::Reply;
use warp::{http::Response, Filter, Rejection};
use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};
use warp_utils::{query::multi_key_query, reject::convert_rejection, uor::UnifyingOrFilter};

const API_PREFIX: &str = "eth";

Expand Down Expand Up @@ -1802,7 +1802,7 @@ pub fn serve<T: BeaconChainTypes>(
)
.await
.map(|()| warp::reply::json(&()));
task_spawner::convert_rejection(result).await
convert_rejection(result).await
},
);

Expand Down Expand Up @@ -3817,12 +3817,12 @@ pub fn serve<T: BeaconChainTypes>(
.await;

if initial_result.is_err() {
return task_spawner::convert_rejection(initial_result).await;
return convert_rejection(initial_result).await;
}

// Await a response from the builder without blocking a
// `BeaconProcessor` worker.
task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| {
convert_rejection(rx.await.unwrap_or_else(|_| {
Ok(warp::reply::with_status(
warp::reply::json(&"No response from channel"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -4474,6 +4474,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlsToExecutionChange => {
event_handler.subscribe_bls_to_execution_change()
}
api_types::EventTopic::BlockGossip => {
event_handler.subscribe_block_gossip()
}
};

receivers.push(
Expand Down
Loading

0 comments on commit cf01312

Please sign in to comment.