Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob_sidecar event to SSE #4790

Merged
merged 7 commits into from
Oct 12, 2023
Merged
20 changes: 19 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use execution_layer::{
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
PayloadAttributes, PayloadStatus,
Expand Down Expand Up @@ -2809,6 +2809,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
blob.as_blob(),
)));
}
}

self.data_availability_checker
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
Expand All @@ -2833,6 +2841,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::block_verification::cheap_state_advance_to_obtain_committees;
use crate::data_availability_checker::AvailabilityCheckError;
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::{metrics, BeaconChainError};
use kzg::Kzg;
use kzg::{Kzg, KzgCommitment};
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
Expand Down Expand Up @@ -182,6 +182,12 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.message.slot
}
pub fn index(&self) -> u64 {
self.blob.message.index
}
pub fn kzg_commitment(&self) -> KzgCommitment {
self.blob.message.kzg_commitment
}
pub fn proposer_index(&self) -> u64 {
self.blob.message.proposer_index
}
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16;
pub struct ServerSentEventHandler<T: EthSpec> {
attestation_tx: Sender<EventKind<T>>,
block_tx: Sender<EventKind<T>>,
blob_sidecar_tx: Sender<EventKind<T>>,
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
Expand All @@ -31,6 +32,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
Expand All @@ -43,6 +45,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
Self {
attestation_tx,
block_tx,
blob_sidecar_tx,
finalized_tx,
head_tx,
exit_tx,
Expand Down Expand Up @@ -73,6 +76,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.block_tx
.send(kind)
.map(|count| log_count("block", count)),
EventKind::BlobSidecar(_) => self
.blob_sidecar_tx
.send(kind)
.map(|count| log_count("blob sidecar", count)),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
Expand Down Expand Up @@ -119,6 +126,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.subscribe()
}

pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<T>> {
self.blob_sidecar_tx.subscribe()
}

pub fn subscribe_finalized(&self) -> Receiver<EventKind<T>> {
self.finalized_tx.subscribe()
}
Expand Down Expand Up @@ -159,6 +170,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.receiver_count() > 0
}

pub fn has_blob_sidecar_subscribers(&self) -> bool {
self.blob_sidecar_tx.receiver_count() > 0
}

pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
Expand Down
99 changes: 99 additions & 0 deletions beacon_node/beacon_chain/tests/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::test_utils::BeaconChainHarness;
use bls::Signature;
use eth2::types::{EventKind, SseBlobSidecar};
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::marker::PhantomData;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar};

type E = MinimalEthSpec;

/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API.
#[tokio::test]
async fn blob_sidecar_event_on_process_gossip_blob() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process a gossip verified blob
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let signed_sidecar = SignedBlobSidecar {
message: BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap(),
signature: Signature::empty(),
_phantom: PhantomData,
};
let gossip_verified_blob = GossipVerifiedBlob::__assumed_valid(signed_sidecar);
let expected_sse_blobs = SseBlobSidecar::from_blob_sidecar(gossip_verified_blob.as_blob());

let _ = harness
.chain
.process_gossip_blob(gossip_verified_blob)
.await
.unwrap();

let sidecar_event = blob_event_receiver.try_recv().unwrap();
assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs));
}

/// Verifies that a blob event is emitted when blobs are received via RPC.
#[tokio::test]
async fn blob_sidecar_event_on_process_rpc_blobs() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process multiple rpc blobs
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);

let blob_1 = BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap();
let blob_2 = Arc::new(BlobSidecar {
index: 1,
..BlobSidecar::random_valid(&mut rng, kzg).unwrap()
});
let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2.clone())]);
let expected_sse_blobs = vec![
SseBlobSidecar::from_blob_sidecar(blob_1.as_ref()),
SseBlobSidecar::from_blob_sidecar(blob_2.as_ref()),
];

let _ = harness
.chain
.process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs)
.await
.unwrap();

let mut sse_blobs: Vec<SseBlobSidecar> = vec![];
while let Ok(sidecar_event) = blob_event_receiver.try_recv() {
if let EventKind::BlobSidecar(sse_blob_sidecar) = sidecar_event {
sse_blobs.push(sse_blob_sidecar);
} else {
panic!("`BlobSidecar` event kind expected.");
}
}
assert_eq!(sse_blobs, expected_sse_blobs);
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod attestation_production;
mod attestation_verification;
mod block_verification;
mod capella;
mod events;
mod merge;
mod op_verification;
mod payload_invalidation;
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,9 @@ pub fn serve<T: BeaconChainTypes>(
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::BlobSidecar => {
event_handler.subscribe_blob_sidecar()
}
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
Expand Down
14 changes: 11 additions & 3 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = chain.process_gossip_blob(blob).await {
return Err(warp_utils::reject::custom_bad_request(format!(
"Invalid blob: {e}"
)));
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
block: FixedBlobSidecarList<T::EthSpec>,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
Expand Down
30 changes: 30 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,28 @@ pub struct SseBlock {
pub execution_optimistic: bool,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseBlobSidecar {
pub block_root: Hash256,
#[serde(with = "serde_utils::quoted_u64")]
pub index: u64,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
pub slot: Slot,
pub kzg_commitment: KzgCommitment,
pub versioned_hash: VersionedHash,
}

impl SseBlobSidecar {
pub fn from_blob_sidecar<E: EthSpec>(blob_sidecar: &BlobSidecar<E>) -> SseBlobSidecar {
SseBlobSidecar {
block_root: blob_sidecar.block_root,
index: blob_sidecar.index,
slot: blob_sidecar.slot,
kzg_commitment: blob_sidecar.kzg_commitment,
versioned_hash: blob_sidecar.kzg_commitment.calculate_versioned_hash(),
}
}
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseFinalizedCheckpoint {
pub block: Hash256,
Expand Down Expand Up @@ -1018,6 +1040,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes {
pub enum EventKind<T: EthSpec> {
Attestation(Box<Attestation<T>>),
Block(SseBlock),
BlobSidecar(SseBlobSidecar),
FinalizedCheckpoint(SseFinalizedCheckpoint),
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
Expand All @@ -1034,6 +1057,7 @@ impl<T: EthSpec> EventKind<T> {
match self {
EventKind::Head(_) => "head",
EventKind::Block(_) => "block",
EventKind::BlobSidecar(_) => "blob_sidecar",
EventKind::Attestation(_) => "attestation",
EventKind::VoluntaryExit(_) => "voluntary_exit",
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
Expand Down Expand Up @@ -1071,6 +1095,9 @@ impl<T: EthSpec> EventKind<T> {
"block" => Ok(EventKind::Block(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)),
)?)),
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
)?)),
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
)?)),
Expand Down Expand Up @@ -1123,6 +1150,7 @@ pub struct EventQuery {
pub enum EventTopic {
Head,
Block,
BlobSidecar,
Attestation,
VoluntaryExit,
FinalizedCheckpoint,
Expand All @@ -1141,6 +1169,7 @@ impl FromStr for EventTopic {
match s {
"head" => Ok(EventTopic::Head),
"block" => Ok(EventTopic::Block),
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
"attestation" => Ok(EventTopic::Attestation),
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
Expand All @@ -1160,6 +1189,7 @@ impl fmt::Display for EventTopic {
match self {
EventTopic::Head => write!(f, "head"),
EventTopic::Block => write!(f, "block"),
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
EventTopic::Attestation => write!(f, "attestation"),
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
Expand Down
Loading