Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob_sidecar event to SSE #4790

Merged
merged 7 commits into from
Oct 12, 2023
28 changes: 27 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::{
kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore,
BeaconSnapshot, CachedHead,
};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use execution_layer::{
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
PayloadAttributes, PayloadStatus,
Expand Down Expand Up @@ -2809,6 +2809,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar {
block_root: blob.block_root(),
index: blob.index(),
slot: blob.slot(),
kzg_commitment: blob.kzg_commitment(),
versioned_hash: blob.kzg_commitment().calculate_versioned_hash(),
}));
}
}

self.data_availability_checker
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
Expand All @@ -2833,6 +2845,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar {
block_root: blob.block_root,
index: blob.index,
slot: blob.slot,
kzg_commitment: blob.kzg_commitment,
versioned_hash: blob.kzg_commitment.calculate_versioned_hash(),
}));
}
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::block_verification::cheap_state_advance_to_obtain_committees;
use crate::data_availability_checker::AvailabilityCheckError;
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::{metrics, BeaconChainError};
use kzg::Kzg;
use kzg::{Kzg, KzgCommitment};
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
Expand Down Expand Up @@ -182,6 +182,12 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn slot(&self) -> Slot {
self.blob.message.slot
}
pub fn index(&self) -> u64 {
self.blob.message.index
}
pub fn kzg_commitment(&self) -> KzgCommitment {
self.blob.message.kzg_commitment
}
pub fn proposer_index(&self) -> u64 {
self.blob.message.proposer_index
}
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const DEFAULT_CHANNEL_CAPACITY: usize = 16;
pub struct ServerSentEventHandler<T: EthSpec> {
attestation_tx: Sender<EventKind<T>>,
block_tx: Sender<EventKind<T>>,
blob_sidecar_tx: Sender<EventKind<T>>,
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
Expand All @@ -31,6 +32,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
Expand All @@ -43,6 +45,7 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
Self {
attestation_tx,
block_tx,
blob_sidecar_tx,
finalized_tx,
head_tx,
exit_tx,
Expand Down Expand Up @@ -73,6 +76,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.block_tx
.send(kind)
.map(|count| log_count("block", count)),
EventKind::BlobSidecar(_) => self
.blob_sidecar_tx
.send(kind)
.map(|count| log_count("blob sidecar", count)),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
Expand Down Expand Up @@ -119,6 +126,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.subscribe()
}

pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<T>> {
self.blob_sidecar_tx.subscribe()
}

pub fn subscribe_finalized(&self) -> Receiver<EventKind<T>> {
self.finalized_tx.subscribe()
}
Expand Down Expand Up @@ -159,6 +170,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_tx.receiver_count() > 0
}

pub fn has_blob_sidecar_subscribers(&self) -> bool {
self.blob_sidecar_tx.receiver_count() > 0
}

pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
Expand Down
90 changes: 90 additions & 0 deletions beacon_node/beacon_chain/tests/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use beacon_chain::blob_verification::GossipVerifiedBlob;
use beacon_chain::test_utils::BeaconChainHarness;
use bls::Signature;
use eth2::types::EventKind;
use rand::rngs::StdRng;
use rand::SeedableRng;
use std::marker::PhantomData;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar};

type E = MinimalEthSpec;

/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API.
#[tokio::test]
async fn blob_sidecar_event_on_process_gossip_blob() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process a gossip verified blob
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let signed_sidecar = SignedBlobSidecar {
message: BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap(),
signature: Signature::empty(),
_phantom: PhantomData,
};
let gossip_verified_blob = GossipVerifiedBlob::__assumed_valid(signed_sidecar);
let _ = harness
.chain
.process_gossip_blob(gossip_verified_blob)
.await
.unwrap();

let sidecar_event = blob_event_receiver.try_recv().unwrap();
assert!(matches!(sidecar_event, EventKind::BlobSidecar(..)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! You don't have to do this but since you seeded the RNG with a known value, the blob generated should be the same each time. You could check that the output matches the static SseBlobSidecar here so that this test breaks if something goes wrong in the code we use to generate it. Just an idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Addressed in 39cc2fe. Thanks 🙏

}

/// Verifies that a blob event is emitted when blobs are received via RPC.
#[tokio::test]
async fn blob_sidecar_event_on_process_rpc_blobs() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process multiple rpc blobs
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);

let blob_1 = BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap();
let blob_2 = Arc::new(BlobSidecar {
index: 1,
..BlobSidecar::random_valid(&mut rng, kzg).unwrap()
});
let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2)]);

let _ = harness
.chain
.process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs)
.await
.unwrap();

let mut events: Vec<EventKind<E>> = vec![];
while let Ok(sidecar_event) = blob_event_receiver.try_recv() {
assert!(matches!(sidecar_event, EventKind::BlobSidecar(..)));
events.push(sidecar_event);
}
assert!(events.len() == 2);
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod attestation_production;
mod attestation_verification;
mod block_verification;
mod capella;
mod events;
mod merge;
mod op_verification;
mod payload_invalidation;
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,9 @@ pub fn serve<T: BeaconChainTypes>(
let receiver = match topic {
api_types::EventTopic::Head => event_handler.subscribe_head(),
api_types::EventTopic::Block => event_handler.subscribe_block(),
api_types::EventTopic::BlobSidecar => {
event_handler.subscribe_blob_sidecar()
}
api_types::EventTopic::Attestation => {
event_handler.subscribe_attestation()
}
Expand Down
14 changes: 11 additions & 3 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = chain.process_gossip_blob(blob).await {
return Err(warp_utils::reject::custom_bad_request(format!(
"Invalid blob: {e}"
)));
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
} else {
error!(
log,
"Invalid blob provided to HTTP API";
"reason" => &msg
);
Err(warp_utils::reject::custom_bad_request(msg))
};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
block: FixedBlobSidecarList<T::EthSpec>,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
Expand Down
17 changes: 17 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,15 @@ pub struct SseBlock {
pub execution_optimistic: bool,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseBlobSidecar {
pub block_root: Hash256,
pub index: u64,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
pub slot: Slot,
pub kzg_commitment: KzgCommitment,
pub versioned_hash: VersionedHash,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseFinalizedCheckpoint {
pub block: Hash256,
Expand Down Expand Up @@ -1018,6 +1027,7 @@ impl ForkVersionDeserialize for SseExtendedPayloadAttributes {
pub enum EventKind<T: EthSpec> {
Attestation(Box<Attestation<T>>),
Block(SseBlock),
BlobSidecar(SseBlobSidecar),
FinalizedCheckpoint(SseFinalizedCheckpoint),
Head(SseHead),
VoluntaryExit(SignedVoluntaryExit),
Expand All @@ -1034,6 +1044,7 @@ impl<T: EthSpec> EventKind<T> {
match self {
EventKind::Head(_) => "head",
EventKind::Block(_) => "block",
EventKind::BlobSidecar(_) => "blob_sidecar",
EventKind::Attestation(_) => "attestation",
EventKind::VoluntaryExit(_) => "voluntary_exit",
EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint",
Expand Down Expand Up @@ -1071,6 +1082,9 @@ impl<T: EthSpec> EventKind<T> {
"block" => Ok(EventKind::Block(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)),
)?)),
"blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)),
)?)),
"chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)),
)?)),
Expand Down Expand Up @@ -1123,6 +1137,7 @@ pub struct EventQuery {
pub enum EventTopic {
Head,
Block,
BlobSidecar,
Attestation,
VoluntaryExit,
FinalizedCheckpoint,
Expand All @@ -1141,6 +1156,7 @@ impl FromStr for EventTopic {
match s {
"head" => Ok(EventTopic::Head),
"block" => Ok(EventTopic::Block),
"blob_sidecar" => Ok(EventTopic::BlobSidecar),
"attestation" => Ok(EventTopic::Attestation),
"voluntary_exit" => Ok(EventTopic::VoluntaryExit),
"finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint),
Expand All @@ -1160,6 +1176,7 @@ impl fmt::Display for EventTopic {
match self {
EventTopic::Head => write!(f, "head"),
EventTopic::Block => write!(f, "block"),
EventTopic::BlobSidecar => write!(f, "blob_sidecar"),
EventTopic::Attestation => write!(f, "attestation"),
EventTopic::VoluntaryExit => write!(f, "voluntary_exit"),
EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"),
Expand Down