Skip to content

Commit

Permalink
add tests, event triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Feb 28, 2024
1 parent da00e0e commit c1bacad
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 16 deletions.
34 changes: 34 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
proposer_slashing: ProposerSlashing,
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_proposer_slashing_subscribers() {
event_handler.register(EventKind::ProposerSlashing(Box::new(
proposer_slashing.clone(),
)));
}
}

Ok(self.observed_proposer_slashings.lock().verify_and_observe(
proposer_slashing,
&wall_clock_state,
Expand All @@ -2452,6 +2461,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
) {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_proposer_slashing_subscribers() {
event_handler.register(EventKind::ProposerSlashing(Box::new(
proposer_slashing.clone().into_inner(),
)));
}
}

if self.eth1_chain.is_some() {
self.op_pool.insert_proposer_slashing(proposer_slashing)
}
Expand All @@ -2463,6 +2480,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attester_slashing_subscribers() {
event_handler.register(EventKind::AttesterSlashing(Box::new(
attester_slashing.clone(),
)));
}
}

Ok(self.observed_attester_slashings.lock().verify_and_observe(
attester_slashing,
&wall_clock_state,
Expand All @@ -2483,6 +2509,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_write_lock()
.on_attester_slashing(attester_slashing.as_inner());

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attester_slashing_subscribers() {
event_handler.register(EventKind::AttesterSlashing(Box::new(
attester_slashing.clone().into_inner(),
)));
}
}

// Add to the op pool (if we have the ability to propose blocks).
if self.eth1_chain.is_some() {
self.op_pool.insert_attester_slashing(attester_slashing)
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ where
.ok_or("Cannot build without a genesis state root")?;
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());

let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();

let mut validator_monitor = ValidatorMonitor::new(
validator_monitor_config,
beacon_proposer_cache.clone(),
Expand Down
16 changes: 16 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.block_reward_tx.subscribe()
}

pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<T>> {
self.attester_slashing_tx.subscribe()
}

pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<T>> {
self.proposer_slashing_tx.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand Down Expand Up @@ -241,4 +249,12 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn has_block_reward_subscribers(&self) -> bool {
self.block_reward_tx.receiver_count() > 0
}

pub fn has_proposer_slashing_subscribers(&self) -> bool {
self.proposer_slashing_tx.receiver_count() > 0
}

pub fn has_attester_slashing_subscribers(&self) -> bool {
self.attester_slashing_tx.receiver_count() > 0
}
}
6 changes: 6 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4348,6 +4348,12 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
api_types::EventTopic::AttesterSlashing => {
event_handler.subscribe_attester_slashing()
}
api_types::EventTopic::ProposerSlashing => {
event_handler.subscribe_proposer_slashing()
}
};

receivers.push(
Expand Down
40 changes: 40 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5168,6 +5168,8 @@ impl ApiTester {
EventTopic::Block,
EventTopic::Head,
EventTopic::FinalizedCheckpoint,
EventTopic::AttesterSlashing,
EventTopic::ProposerSlashing,
];
let mut events_future = self
.client
Expand Down Expand Up @@ -5307,6 +5309,44 @@ impl ApiTester {
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);

let validator_indices = self.interesting_validator_indices();

// Test attester slashing event
let mut attester_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::AttesterSlashing])
.await
.unwrap();

self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap();

let attester_slashing_event = poll_events(
&mut attester_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(attester_slashing_event.len() > 0);

// Test proposer slashing event
let mut proposer_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::ProposerSlashing])
.await
.unwrap();

self.harness.add_proposer_slashing(1).unwrap();

let proposer_slashing_event = poll_events(
&mut proposer_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(proposer_slashing_event.len() > 0);

self
}

Expand Down
34 changes: 19 additions & 15 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,19 +983,6 @@ pub struct SseLateHead {
pub execution_optimistic: bool,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseProposerSlashing {
pub signed_header_1: SignedBeaconBlockHeader,
pub signed_header_2: SignedBeaconBlockHeader,
}

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
#[serde(bound = "E: EthSpec")]
pub struct SseAttesterSlashing<E: EthSpec> {
pub attestation_1: Attestation<E>,
pub attestation_2: Attestation<E>,
}

#[superstruct(
variants(V1, V2, V3),
variant_attributes(derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize))
Expand Down Expand Up @@ -1091,8 +1078,8 @@ pub enum EventKind<T: EthSpec> {
#[cfg(feature = "lighthouse")]
BlockReward(BlockReward),
PayloadAttributes(VersionedSsePayloadAttributes),
ProposerSlashing(SseProposerSlashing),
AttesterSlashing(SseAttesterSlashing<T>),
ProposerSlashing(Box<ProposerSlashing>),
AttesterSlashing(Box<AttesterSlashing<T>>),
}

impl<T: EthSpec> EventKind<T> {
Expand Down Expand Up @@ -1125,6 +1112,7 @@ impl<T: EthSpec> EventKind<T> {
let event = split
.next()
.ok_or_else(|| {
println!("{}", s);
ServerError::InvalidServerSentEvent("Could not parse event tag".to_string())
})?
.trim_start_matches("event:");
Expand Down Expand Up @@ -1194,6 +1182,16 @@ impl<T: EthSpec> EventKind<T> {
"block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)),
)?)),
"attester_slashing" => Ok(EventKind::AttesterSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e))
})?,
)),
"proposer_slashing" => Ok(EventKind::ProposerSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Proposer Slashing: {:?}", e))
})?,
)),
_ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(),
)),
Expand Down Expand Up @@ -1225,6 +1223,8 @@ pub enum EventTopic {
LightClientOptimisticUpdate,
#[cfg(feature = "lighthouse")]
BlockReward,
AttesterSlashing,
ProposerSlashing,
}

impl FromStr for EventTopic {
Expand All @@ -1246,6 +1246,8 @@ impl FromStr for EventTopic {
"light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate),
#[cfg(feature = "lighthouse")]
"block_reward" => Ok(EventTopic::BlockReward),
"attester_slashing" => Ok(EventTopic::AttesterSlashing),
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
Expand All @@ -1268,6 +1270,8 @@ impl fmt::Display for EventTopic {
EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"),
#[cfg(feature = "lighthouse")]
EventTopic::BlockReward => write!(f, "block_reward"),
EventTopic::AttesterSlashing => write!(f, "attester_slashing"),
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
}
}
}
Expand Down

0 comments on commit c1bacad

Please sign in to comment.