Skip to content

Commit

Permalink
Emit BlobSidecar event when blobs are received over rpc.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Oct 10, 2023
1 parent 05fc32c commit 934a3fc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
14 changes: 14 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2845,6 +2845,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar {
block_root: blob.block_root,
index: blob.index,
slot: blob.slot,
kzg_commitment: blob.kzg_commitment,
versioned_hash: blob.kzg_commitment.calculate_versioned_hash(),
}));
}
}
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
Expand Down
46 changes: 43 additions & 3 deletions beacon_node/beacon_chain/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use rand::rngs::StdRng;
use rand::SeedableRng;
use std::marker::PhantomData;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec, SignedBlobSidecar};

type E = MinimalEthSpec;

/// This covers scenarios for gossip verified blobs received via gossip or the published block API.
/// Verifies that a blob event is emitted when a gossip verified blob is received via gossip or the publish block API.
#[tokio::test]
async fn blob_sidecar_event_on_process_gossip_blob() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
Expand Down Expand Up @@ -46,5 +47,44 @@ async fn blob_sidecar_event_on_process_gossip_blob() {
assert!(matches!(sidecar_event, EventKind::BlobSidecar(..)));
}

// TODO: emit an event when rpc blob processed?
// async fn blob_sidecar_event_when_rpc_blob_processed() {}
/// Verifies that a blob event is emitted when blobs are received via RPC.
#[tokio::test]
async fn blob_sidecar_event_on_process_rpc_blobs() {
let spec = ForkName::Deneb.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec)
.deterministic_keypairs(8)
.fresh_ephemeral_store()
.mock_execution_layer()
.build();

// subscribe to blob sidecar events
let event_handler = harness.chain.event_handler.as_ref().unwrap();
let mut blob_event_receiver = event_handler.subscribe_blob_sidecar();

// build and process multiple rpc blobs
let kzg = harness.chain.kzg.as_ref().unwrap();
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);

let blob_1 = BlobSidecar::random_valid(&mut rng, kzg)
.map(Arc::new)
.unwrap();
let blob_2 = Arc::new(BlobSidecar {
index: 1,
..BlobSidecar::random_valid(&mut rng, kzg).unwrap()
});
let blobs = FixedBlobSidecarList::from(vec![Some(blob_1.clone()), Some(blob_2)]);

let _ = harness
.chain
.process_rpc_blobs(blob_1.slot, blob_1.block_root, blobs)
.await
.unwrap();

let mut events: Vec<EventKind<E>> = vec![];
while let Ok(sidecar_event) = blob_event_receiver.try_recv() {
assert!(matches!(sidecar_event, EventKind::BlobSidecar(..)));
events.push(sidecar_event);
}
assert!(events.len() == 2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
pub fn generate_rpc_blobs_process_fn(
self: Arc<Self>,
block_root: Hash256,
block: FixedBlobSidecarList<T::EthSpec>,
blobs: FixedBlobSidecarList<T::EthSpec>,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> AsyncFn {
let process_fn = async move {
self.clone()
.process_rpc_blobs(block_root, block, seen_timestamp, process_type)
.process_rpc_blobs(block_root, blobs, seen_timestamp, process_type)
.await;
};
Box::pin(process_fn)
Expand Down

0 comments on commit 934a3fc

Please sign in to comment.