Skip to content

Commit

Permalink
Proposer and attester slashing sse events (#5327)
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit ea8373b
Merge: cf72485 c33edc8
Author: realbigsean <[email protected]>
Date:   Mon Apr 29 17:33:23 2024 -0400

    Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events

commit cf72485
Merge: e1a6414 feb531f
Author: realbigsean <[email protected]>
Date:   Thu Apr 4 11:48:00 2024 -0400

    Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events

commit e1a6414
Author: Eitan Seri-Levi <[email protected]>
Date:   Thu Apr 4 00:29:50 2024 +0300

    leftover debugging

commit ec2eacc
Merge: be5fa49 d6ab56e
Author: Eitan Seri-Levi <[email protected]>
Date:   Thu Apr 4 00:28:26 2024 +0300

    Merge branch 'proposer-and-attester-slashing-sse-events' of https://github.com/eserilev/lighthouse into proposer-and-attester-slashing-sse-events

commit be5fa49
Merge: e2fdbc3 969d12d
Author: Eitan Seri-Levi <[email protected]>
Date:   Thu Apr 4 00:27:15 2024 +0300

    resolve merge conflicts

commit e2fdbc3
Author: Eitan Seri-Levi <[email protected]>
Date:   Wed Apr 3 23:52:46 2024 +0300

    remove todo, fix test

commit d6ab56e
Merge: a3e42d3 969d12d
Author: Lion - dapplion <[email protected]>
Date:   Wed Apr 3 11:07:19 2024 +0900

    Merge branch 'unstable' into proposer-and-attester-slashing-sse-events

commit a3e42d3
Author: Eitan Seri-Levi <[email protected]>
Date:   Tue Mar 12 17:10:47 2024 +0200

    remove double event tracking

commit 504635d
Author: Eitan Seri-Levi <[email protected]>
Date:   Wed Feb 28 16:09:25 2024 +0200

    revert

commit d0f0f82
Author: Eitan Seri-Levi <[email protected]>
Date:   Wed Feb 28 16:07:29 2024 +0200

    revert

commit b4b734d
Merge: c1bacad 64e563f
Author: Eitan Seri-Levi <[email protected]>
Date:   Wed Feb 28 15:57:43 2024 +0200

    Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events

commit c1bacad
Author: Eitan Seri-Levi <[email protected]>
Date:   Wed Feb 28 15:55:35 2024 +0200

    add tests, event triggers

commit da00e0e
Author: Eitan Seri-Levi <[email protected]>
Date:   Tue Feb 27 18:36:43 2024 +0200

    add TOOOs

commit 0273c05
Author: Eitan Seri-Levi <[email protected]>
Date:   Tue Feb 27 18:31:28 2024 +0200

    add proposer and attester event variants

commit 7c13a8f
Merge: 3ea8476 abd9965
Author: Eitan Seri-Levi <[email protected]>
Date:   Tue Feb 27 18:10:22 2024 +0200

    Merge branch 'unstable' of https://github.com/sigp/lighthouse into unstable

commit 3ea8476
Author: Eitan Seri-Levi <[email protected]>
Date:   Sun Feb 25 14:53:04 2024 +0200

    default vc to block v3 endpoint and deprecate block-v3 flag
  • Loading branch information
michaelsproul committed Apr 30, 2024
1 parent c8ffafb commit 7fe3945
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 1 deletion.
18 changes: 18 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
proposer_slashing: ProposerSlashing,
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

Ok(self.observed_proposer_slashings.lock().verify_and_observe(
proposer_slashing,
&wall_clock_state,
Expand All @@ -2434,6 +2435,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
) {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_proposer_slashing_subscribers() {
event_handler.register(EventKind::ProposerSlashing(Box::new(
proposer_slashing.clone().into_inner(),
)));
}
}

if self.eth1_chain.is_some() {
self.op_pool.insert_proposer_slashing(proposer_slashing)
}
Expand All @@ -2445,6 +2454,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;

Ok(self.observed_attester_slashings.lock().verify_and_observe(
attester_slashing,
&wall_clock_state,
Expand All @@ -2465,6 +2475,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_write_lock()
.on_attester_slashing(attester_slashing.as_inner());

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attester_slashing_subscribers() {
event_handler.register(EventKind::AttesterSlashing(Box::new(
attester_slashing.clone().into_inner(),
)));
}
}

// Add to the op pool (if we have the ability to propose blocks).
if self.eth1_chain.is_some() {
self.op_pool.insert_attester_slashing(attester_slashing)
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,8 @@ where
.ok_or("Cannot build without a genesis state root")?;
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());

let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();

let mut validator_monitor = ValidatorMonitor::new(
validator_monitor_config,
beacon_proposer_cache.clone(),
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct ServerSentEventHandler<E: EthSpec> {
light_client_finality_update_tx: Sender<EventKind<E>>,
light_client_optimistic_update_tx: Sender<EventKind<E>>,
block_reward_tx: Sender<EventKind<E>>,
proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>,
log: Logger,
}

Expand All @@ -45,6 +47,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity);

Self {
attestation_tx,
Expand All @@ -60,6 +64,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
proposer_slashing_tx,
attester_slashing_tx,
log,
}
}
Expand Down Expand Up @@ -126,6 +132,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_reward_tx
.send(kind)
.map(|count| log_count("block reward", count)),
EventKind::ProposerSlashing(_) => self
.proposer_slashing_tx
.send(kind)
.map(|count| log_count("proposer slashing", count)),
EventKind::AttesterSlashing(_) => self
.attester_slashing_tx
.send(kind)
.map(|count| log_count("attester slashing", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand Down Expand Up @@ -184,6 +198,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.block_reward_tx.subscribe()
}

pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
self.attester_slashing_tx.subscribe()
}

pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
self.proposer_slashing_tx.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand Down Expand Up @@ -227,4 +249,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_block_reward_subscribers(&self) -> bool {
self.block_reward_tx.receiver_count() > 0
}

pub fn has_proposer_slashing_subscribers(&self) -> bool {
self.proposer_slashing_tx.receiver_count() > 0
}

pub fn has_attester_slashing_subscribers(&self) -> bool {
self.attester_slashing_tx.receiver_count() > 0
}
}
6 changes: 6 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4358,6 +4358,12 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlockReward => {
event_handler.subscribe_block_reward()
}
api_types::EventTopic::AttesterSlashing => {
event_handler.subscribe_attester_slashing()
}
api_types::EventTopic::ProposerSlashing => {
event_handler.subscribe_proposer_slashing()
}
};

receivers.push(
Expand Down
38 changes: 38 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5214,6 +5214,8 @@ impl ApiTester {
EventTopic::Block,
EventTopic::Head,
EventTopic::FinalizedCheckpoint,
EventTopic::AttesterSlashing,
EventTopic::ProposerSlashing,
];
let mut events_future = self
.client
Expand Down Expand Up @@ -5353,6 +5355,42 @@ impl ApiTester {
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);

// Test attester slashing event
let mut attester_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::AttesterSlashing])
.await
.unwrap();

self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap();

let attester_slashing_event = poll_events(
&mut attester_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(attester_slashing_event.len() == 1);

// Test proposer slashing event
let mut proposer_slashing_event_future = self
.client
.get_events::<E>(&[EventTopic::ProposerSlashing])
.await
.unwrap();

self.harness.add_proposer_slashing(1).unwrap();

let proposer_slashing_event = poll_events(
&mut proposer_slashing_event_future,
1,
Duration::from_millis(10000),
)
.await;

assert!(proposer_slashing_event.len() == 1);

self
}

Expand Down
20 changes: 20 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,8 @@ pub enum EventKind<E: EthSpec> {
#[cfg(feature = "lighthouse")]
BlockReward(BlockReward),
PayloadAttributes(VersionedSsePayloadAttributes),
ProposerSlashing(Box<ProposerSlashing>),
AttesterSlashing(Box<AttesterSlashing<E>>),
}

impl<E: EthSpec> EventKind<E> {
Expand All @@ -1099,6 +1101,8 @@ impl<E: EthSpec> EventKind<E> {
EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update",
#[cfg(feature = "lighthouse")]
EventKind::BlockReward(_) => "block_reward",
EventKind::ProposerSlashing(_) => "proposer_slashing",
EventKind::AttesterSlashing(_) => "attester_slashing",
}
}

Expand Down Expand Up @@ -1179,6 +1183,16 @@ impl<E: EthSpec> EventKind<E> {
"block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)),
)?)),
"attester_slashing" => Ok(EventKind::AttesterSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e))
})?,
)),
"proposer_slashing" => Ok(EventKind::ProposerSlashing(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Proposer Slashing: {:?}", e))
})?,
)),
_ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(),
)),
Expand Down Expand Up @@ -1210,6 +1224,8 @@ pub enum EventTopic {
LightClientOptimisticUpdate,
#[cfg(feature = "lighthouse")]
BlockReward,
AttesterSlashing,
ProposerSlashing,
}

impl FromStr for EventTopic {
Expand All @@ -1231,6 +1247,8 @@ impl FromStr for EventTopic {
"light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate),
#[cfg(feature = "lighthouse")]
"block_reward" => Ok(EventTopic::BlockReward),
"attester_slashing" => Ok(EventTopic::AttesterSlashing),
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
Expand All @@ -1253,6 +1271,8 @@ impl fmt::Display for EventTopic {
EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"),
#[cfg(feature = "lighthouse")]
EventTopic::BlockReward => write!(f, "block_reward"),
EventTopic::AttesterSlashing => write!(f, "attester_slashing"),
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
}
}
}
Expand Down

0 comments on commit 7fe3945

Please sign in to comment.