diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9c7ded313b6..79d7d372987 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2422,6 +2422,7 @@ impl BeaconChain { proposer_slashing: ProposerSlashing, ) -> Result, Error> { let wall_clock_state = self.wall_clock_state()?; + Ok(self.observed_proposer_slashings.lock().verify_and_observe( proposer_slashing, &wall_clock_state, @@ -2434,6 +2435,14 @@ impl BeaconChain { &self, proposer_slashing: SigVerifiedOp, ) { + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_proposer_slashing_subscribers() { + event_handler.register(EventKind::ProposerSlashing(Box::new( + proposer_slashing.clone().into_inner(), + ))); + } + } + if self.eth1_chain.is_some() { self.op_pool.insert_proposer_slashing(proposer_slashing) } @@ -2445,6 +2454,7 @@ impl BeaconChain { attester_slashing: AttesterSlashing, ) -> Result, T::EthSpec>, Error> { let wall_clock_state = self.wall_clock_state()?; + Ok(self.observed_attester_slashings.lock().verify_and_observe( attester_slashing, &wall_clock_state, @@ -2465,6 +2475,14 @@ impl BeaconChain { .fork_choice_write_lock() .on_attester_slashing(attester_slashing.as_inner()); + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_attester_slashing_subscribers() { + event_handler.register(EventKind::AttesterSlashing(Box::new( + attester_slashing.clone().into_inner(), + ))); + } + } + // Add to the op pool (if we have the ability to propose blocks). if self.eth1_chain.is_some() { self.op_pool.insert_attester_slashing(attester_slashing) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 376bc16c035..90461b8f03e 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -708,8 +708,8 @@ where .ok_or("Cannot build without a genesis state root")?; let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); - let beacon_proposer_cache: Arc> = <_>::default(); + let mut validator_monitor = ValidatorMonitor::new( validator_monitor_config, beacon_proposer_cache.clone(), diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 1fdcfdf8d07..8700675a66e 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -20,6 +20,8 @@ pub struct ServerSentEventHandler { light_client_finality_update_tx: Sender>, light_client_optimistic_update_tx: Sender>, block_reward_tx: Sender>, + proposer_slashing_tx: Sender>, + attester_slashing_tx: Sender>, log: Logger, } @@ -45,6 +47,8 @@ impl ServerSentEventHandler { let (light_client_finality_update_tx, _) = broadcast::channel(capacity); let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity); let (block_reward_tx, _) = broadcast::channel(capacity); + let (proposer_slashing_tx, _) = broadcast::channel(capacity); + let (attester_slashing_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -60,6 +64,8 @@ impl ServerSentEventHandler { light_client_finality_update_tx, light_client_optimistic_update_tx, block_reward_tx, + proposer_slashing_tx, + attester_slashing_tx, log, } } @@ -126,6 +132,14 @@ impl ServerSentEventHandler { .block_reward_tx .send(kind) .map(|count| log_count("block reward", count)), + EventKind::ProposerSlashing(_) => self + .proposer_slashing_tx + .send(kind) + .map(|count| log_count("proposer slashing", count)), + EventKind::AttesterSlashing(_) => self + .attester_slashing_tx + .send(kind) + .map(|count| log_count("attester slashing", count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -184,6 +198,14 @@ impl ServerSentEventHandler { self.block_reward_tx.subscribe() } + pub fn subscribe_attester_slashing(&self) -> Receiver> { + self.attester_slashing_tx.subscribe() + } + + pub fn subscribe_proposer_slashing(&self) -> Receiver> { + self.proposer_slashing_tx.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -227,4 +249,12 @@ impl ServerSentEventHandler { pub fn has_block_reward_subscribers(&self) -> bool { self.block_reward_tx.receiver_count() > 0 } + + pub fn has_proposer_slashing_subscribers(&self) -> bool { + self.proposer_slashing_tx.receiver_count() > 0 + } + + pub fn has_attester_slashing_subscribers(&self) -> bool { + self.attester_slashing_tx.receiver_count() > 0 + } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5f4620589eb..024e268e2a5 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4358,6 +4358,12 @@ pub fn serve( api_types::EventTopic::BlockReward => { event_handler.subscribe_block_reward() } + api_types::EventTopic::AttesterSlashing => { + event_handler.subscribe_attester_slashing() + } + api_types::EventTopic::ProposerSlashing => { + event_handler.subscribe_proposer_slashing() + } }; receivers.push( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index d44b9a688ce..ace54714b26 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5214,6 +5214,8 @@ impl ApiTester { EventTopic::Block, EventTopic::Head, EventTopic::FinalizedCheckpoint, + EventTopic::AttesterSlashing, + EventTopic::ProposerSlashing, ]; let mut events_future = self .client @@ -5353,6 +5355,42 @@ impl ApiTester { .await; assert_eq!(reorg_event.as_slice(), &[expected_reorg]); + // Test attester slashing event + let mut attester_slashing_event_future = self + .client + .get_events::(&[EventTopic::AttesterSlashing]) + .await + .unwrap(); + + self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap(); + + let attester_slashing_event = poll_events( + &mut attester_slashing_event_future, + 1, + Duration::from_millis(10000), + ) + .await; + + assert!(attester_slashing_event.len() == 1); + + // Test proposer slashing event + let mut proposer_slashing_event_future = self + .client + .get_events::(&[EventTopic::ProposerSlashing]) + .await + .unwrap(); + + self.harness.add_proposer_slashing(1).unwrap(); + + let proposer_slashing_event = poll_events( + &mut proposer_slashing_event_future, + 1, + Duration::from_millis(10000), + ) + .await; + + assert!(proposer_slashing_event.len() == 1); + self } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 838be4beffb..b15246e7fdb 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1080,6 +1080,8 @@ pub enum EventKind { #[cfg(feature = "lighthouse")] BlockReward(BlockReward), PayloadAttributes(VersionedSsePayloadAttributes), + ProposerSlashing(Box), + AttesterSlashing(Box>), } impl EventKind { @@ -1099,6 +1101,8 @@ impl EventKind { EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update", #[cfg(feature = "lighthouse")] EventKind::BlockReward(_) => "block_reward", + EventKind::ProposerSlashing(_) => "proposer_slashing", + EventKind::AttesterSlashing(_) => "attester_slashing", } } @@ -1179,6 +1183,16 @@ impl EventKind { "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), )?)), + "attester_slashing" => Ok(EventKind::AttesterSlashing( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e)) + })?, + )), + "proposer_slashing" => Ok(EventKind::ProposerSlashing( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Proposer Slashing: {:?}", e)) + })?, + )), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1210,6 +1224,8 @@ pub enum EventTopic { LightClientOptimisticUpdate, #[cfg(feature = "lighthouse")] BlockReward, + AttesterSlashing, + ProposerSlashing, } impl FromStr for EventTopic { @@ -1231,6 +1247,8 @@ impl FromStr for EventTopic { "light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventTopic::BlockReward), + "attester_slashing" => Ok(EventTopic::AttesterSlashing), + "proposer_slashing" => Ok(EventTopic::ProposerSlashing), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1253,6 +1271,8 @@ impl fmt::Display for EventTopic { EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"), #[cfg(feature = "lighthouse")] EventTopic::BlockReward => write!(f, "block_reward"), + EventTopic::AttesterSlashing => write!(f, "attester_slashing"), + EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), } } }