Skip to content

Commit

Permalink
Simple Subnet Management (sigp#6146)
Browse files Browse the repository at this point in the history
* Initial temp commit

* Merge latest unstable

* First draft without tests

* Update tests for new version

* Correct comments and reviewers comments

* Merge latest unstable

* Fix errors

* Missed a comment, corrected it

* Fix lints

* Merge latest unstable

* Fix tests

* Merge latest unstable

* Reviewers comments

* Remove sync subnets from ENR on unsubscribe

* Merge branch 'unstable' into simple-peer-mapping

* Merge branch 'unstable' into simple-peer-mapping

* Merge branch 'unstable' into simple-peer-mapping

* Merge latest unstable

* Prevent clash with pin of rust_eth_kzg
  • Loading branch information
AgeManning authored Nov 26, 2024
1 parent 6e1945f commit 08e8b92
Show file tree
Hide file tree
Showing 10 changed files with 1,606 additions and 1,934 deletions.
1,158 changes: 733 additions & 425 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] }
c-kzg = { version = "1", default-features = false }
compare_fields_derive = { path = "common/compare_fields_derive" }
criterion = "0.5"
delay_map = "0.3"
delay_map = "0.4"
derivative = "2"
dirs = "3"
either = "1.9"
Expand Down
90 changes: 22 additions & 68 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ use crate::nat;
use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription};
use crate::NetworkConfig;
use crate::{error, metrics};
use crate::{
subnet_service::{AttestationService, SubnetServiceMessage},
NetworkConfig,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
use futures::channel::mpsc::Sender;
Expand Down Expand Up @@ -165,10 +162,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
/// The underlying libp2p service that drives all the network interactions.
libp2p: Network<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// A sync committeee subnet manager service.
sync_committee_service: SyncCommitteeService<T>,
/// An attestation and sync committee subnet manager service.
subnet_service: SubnetService<T>,
/// The receiver channel for lighthouse to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
/// The receiver channel for lighthouse to send validator subscription requests.
Expand Down Expand Up @@ -317,16 +312,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_log.clone(),
)?;

// attestation subnet service
let attestation_service = AttestationService::new(
// attestation and sync committee subnet service
let subnet_service = SubnetService::new(
beacon_chain.clone(),
network_globals.local_enr().node_id(),
&config,
&network_log,
);
// sync committee subnet service
let sync_committee_service =
SyncCommitteeService::new(beacon_chain.clone(), &config, &network_log);

// create a timer for updating network metrics
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
Expand All @@ -344,8 +336,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let network_service = NetworkService {
beacon_chain,
libp2p,
attestation_service,
sync_committee_service,
subnet_service,
network_recv,
validator_subscription_recv,
router_send,
Expand Down Expand Up @@ -460,11 +451,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// handle a message from a validator requesting a subscription to a subnet
Some(msg) = self.validator_subscription_recv.recv() => self.on_validator_subscription_msg(msg).await,

// process any attestation service events
Some(msg) = self.attestation_service.next() => self.on_attestation_service_msg(msg),

// process any sync committee service events
Some(msg) = self.sync_committee_service.next() => self.on_sync_committee_service_message(msg),
// process any subnet service events
Some(msg) = self.subnet_service.next() => self.on_subnet_service_msg(msg),

event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,

Expand Down Expand Up @@ -552,13 +540,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let subnet_id = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = self
.attestation_service
.should_process_attestation(subnet, attestation);
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
attestation,
);
self.send_to_router(RouterMessage::PubsubMessage(
id,
source,
Expand Down Expand Up @@ -832,20 +821,12 @@ impl<T: BeaconChainTypes> NetworkService<T> {
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = self
.attestation_service
.validator_subscriptions(subscriptions.into_iter())
{
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::Attestation);
self.subnet_service.validator_subscriptions(subscriptions)
}
ValidatorSubscriptionMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = self
.sync_committee_service
.validator_subscriptions(subscriptions)
{
warn!(self.log, "Sync committee calidator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::SyncCommittee);
self.subnet_service.validator_subscriptions(subscriptions)
}
}
}
Expand Down Expand Up @@ -881,7 +862,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) {
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
Expand All @@ -900,36 +881,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);
}
}
}

fn on_sync_committee_service_message(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
SubnetServiceMessage::EnrRemove(sync_subnet_id) => {
self.libp2p
.update_enr_subnet(Subnet::SyncCommittee(sync_subnet_id), false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);
Expand Down
9 changes: 3 additions & 6 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,18 @@ mod tests {
// Subscribe to the topics.
runtime.block_on(async {
while network_globals.gossipsub_subscriptions.read().len() < 2 {
if let Some(msg) = network_service.attestation_service.next().await {
network_service.on_attestation_service_msg(msg);
if let Some(msg) = network_service.subnet_service.next().await {
network_service.on_subnet_service_msg(msg);
}
}
});

// Make sure the service is subscribed to the topics.
let (old_topic1, old_topic2) = {
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
let mut subnets = SubnetId::compute_attestation_subnets(
network_globals.local_enr().node_id().raw(),
beacon_chain.epoch().unwrap(),
&spec,
)
.unwrap()
.0
.collect::<Vec<_>>();
assert_eq!(2, subnets.len());

Expand Down
Loading

0 comments on commit 08e8b92

Please sign in to comment.