Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Subscribe to altair gossip topics 2 slots before fork #2532

Closed
wants to merge 11 commits into from
9 changes: 9 additions & 0 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,15 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
self.unsubscribe(gossip_topic)
}

/// Subscribe to all currently subscribed topics with the new fork digest.
pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) {
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) {
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ mod tests {
type Spec = types::MainnetEthSpec;

fn fork_context() -> ForkContext {
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &Spec::default_spec())
let mut chain_spec = Spec::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &chain_spec)
}

fn base_block() -> SignedBeaconBlock<Spec> {
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/eth2_libp2p/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::runtime::Runtime;
use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec};
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec};

type E = MinimalEthSpec;
use tempfile::Builder as TempBuilder;

/// Returns a dummy fork context
fn fork_context() -> ForkContext {
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &ChainSpec::minimal())
let mut chain_spec = E::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &chain_spec)
}

pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);
Expand Down
37 changes: 30 additions & 7 deletions beacon_node/eth2_libp2p/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,16 @@ fn test_blocks_by_range_chunked_rpc() {
step: 0,
});

// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));

// BlocksByRange Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_base = Response::BlocksByRange(Some(Box::new(signed_full_block)));

let full_block = BeaconBlock::Altair(BeaconBlockAltair::<E>::full(&spec));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block)));

// keep count of the number of messages received
let mut messages_received = 0;
Expand All @@ -167,7 +172,11 @@ fn test_blocks_by_range_chunked_rpc() {
warn!(log, "Sender received a response");
match response {
Response::BlocksByRange(Some(_)) => {
assert_eq!(response, rpc_response.clone());
if messages_received < 5 {
assert_eq!(response, rpc_response_base.clone());
} else {
assert_eq!(response, rpc_response_altair.clone());
}
messages_received += 1;
warn!(log, "Chunk received");
}
Expand Down Expand Up @@ -197,7 +206,14 @@ fn test_blocks_by_range_chunked_rpc() {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
for i in 0..messages_to_send {
// Send first half of responses as base blocks and
// second half as altair blocks.
let rpc_response = if i < 5 {
rpc_response_base.clone()
} else {
rpc_response_altair.clone()
};
receiver.swarm.behaviour_mut().send_successful_response(
peer_id,
id,
Expand Down Expand Up @@ -481,7 +497,7 @@ fn test_blocks_by_root_chunked_rpc() {
let log_level = Level::Debug;
let enable_logging = false;

let messages_to_send = 3;
let messages_to_send = 10;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
Expand All @@ -497,6 +513,13 @@ fn test_blocks_by_root_chunked_rpc() {
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
]),
});

Expand Down
62 changes: 55 additions & 7 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
EthSpec, ForkContext, ForkName, RelativeEpoch, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, ForkName, RelativeEpoch, Slot, SubnetId,
SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription,
};

mod tests;

/// The interval (in seconds) that various network metrics will update.
const METRIC_UPDATE_INTERVAL: u64 = 1;
/// Number of slots before the fork when we should subscribe to the new fork topics.
const SUBSCRIBE_DELAY_SLOTS: u64 = 2;
/// Delay after a fork where we unsubscribe from pre-fork topics.
const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2;

Expand Down Expand Up @@ -129,6 +131,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
discovery_auto_update: bool,
/// A delay that expires when a new fork takes place.
next_fork_update: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to subscribe to a new fork's topics.
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the subnets once synced.
Expand Down Expand Up @@ -179,6 +183,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {

// keep track of when our fork_id needs to be updated
let next_fork_update = Box::pin(next_fork_delay(&beacon_chain).into());
let next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&beacon_chain).into());
let next_unsubscribe = Box::pin(None.into());

let current_slot = beacon_chain
Expand All @@ -192,6 +197,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));

debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork());

// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
executor.clone(),
Expand Down Expand Up @@ -254,6 +261,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
upnp_mappings: (None, None),
discovery_auto_update: config.discv5_config.enr_update,
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
Expand All @@ -274,12 +282,23 @@ impl<T: BeaconChainTypes> NetworkService<T> {
/// digests since we should be subscribed to post fork topics before the fork.
pub fn required_gossip_fork_digests(&self) -> Vec<[u8; 4]> {
let fork_context = &self.fork_context;
let spec = &self.beacon_chain.spec;
match fork_context.current_fork() {
ForkName::Base => {
if fork_context.fork_exists(ForkName::Altair) {
fork_context.all_fork_digests()
} else {
vec![fork_context.genesis_context_bytes()]
// If we are SUBSCRIBE_DELAY_SLOTS before the fork slot, subscribe only to Base,
// else subscribe to Base and Altair.
let current_slot = self.beacon_chain.slot().unwrap_or(spec.genesis_slot);
match spec.next_fork_epoch::<T::EthSpec>(current_slot) {
Some((_, fork_epoch)) => {
if current_slot.saturating_add(Slot::new(SUBSCRIBE_DELAY_SLOTS))
>= fork_epoch.start_slot(T::EthSpec::slots_per_epoch())
{
fork_context.all_fork_digests()
} else {
vec![fork_context.genesis_context_bytes()]
}
}
None => vec![fork_context.genesis_context_bytes()],
}
}
ForkName::Altair => vec![fork_context
Expand Down Expand Up @@ -619,6 +638,7 @@ fn spawn_service<T: BeaconChainTypes>(
} => {
// Update prometheus metrics.
metrics::expose_receive_metrics(&message);

match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
Expand Down Expand Up @@ -671,7 +691,7 @@ fn spawn_service<T: BeaconChainTypes>(
if let Some(new_fork_name) = fork_context.from_context_bytes(new_enr_fork_id.fork_digest) {
info!(
service.log,
"Updating enr fork version";
"Transitioned to new fork";
"old_fork" => ?fork_context.current_fork(),
"new_fork" => ?new_fork_name,
);
Expand Down Expand Up @@ -701,6 +721,18 @@ fn spawn_service<T: BeaconChainTypes>(
info!(service.log, "Unsubscribed from old fork topics");
service.next_unsubscribe = Box::pin(None.into());
}
Some(_) = &mut service.next_fork_subscriptions => {
if let Some((fork_name, _)) = service.beacon_chain.duration_to_next_fork() {
let fork_version = service.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, service.beacon_chain.genesis_validators_root);
info!(service.log, "Subscribing to new fork topics");
service.libp2p.swarm.behaviour_mut().subscribe_new_fork_topics(fork_digest);
}
else {
error!(service.log, "Fork subscription scheduled but no fork scheduled");
}
service.next_fork_subscriptions = Box::pin(next_fork_subscriptions_delay(&service.beacon_chain).into());
}
}
metrics::update_bandwidth_metrics(service.libp2p.bandwidth.clone());
}
Expand All @@ -717,6 +749,22 @@ fn next_fork_delay<T: BeaconChainTypes>(
.map(|(_, until_fork)| tokio::time::sleep(until_fork))
}

/// Returns a `Sleep` that triggers `SUBSCRIBE_DELAY_SLOTS` before the next fork.
/// Returns `None` if there are no scheduled forks or we are already past `current_slot + SUBSCRIBE_DELAY_SLOTS > fork_slot`.
fn next_fork_subscriptions_delay<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
) -> Option<tokio::time::Sleep> {
if let Some((_, duration_to_fork)) = beacon_chain.duration_to_next_fork() {
let duration_to_subscription = duration_to_fork.saturating_sub(Duration::from_secs(
beacon_chain.spec.seconds_per_slot * SUBSCRIBE_DELAY_SLOTS,
));
if !duration_to_subscription.is_zero() {
return Some(tokio::time::sleep(duration_to_subscription));
}
}
None
}

impl<T: BeaconChainTypes> Drop for NetworkService<T> {
fn drop(&mut self) {
// network thread is terminating
Expand Down
42 changes: 37 additions & 5 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::*;
use int_to_bytes::int_to_bytes4;
use serde_derive::{Deserialize, Serialize};
use serde::{Deserializer, Serialize, Serializer};
use serde_derive::Deserialize;
use serde_utils::quoted_u64::MaybeQuoted;
use std::fs::File;
use std::path::Path;
Expand Down Expand Up @@ -467,7 +468,7 @@ impl ChainSpec {
domain_sync_committee_selection_proof: 8,
domain_contribution_and_proof: 9,
altair_fork_version: [0x01, 0x00, 0x00, 0x00],
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
altair_fork_epoch: None,

/*
* Network specific
Expand Down Expand Up @@ -506,7 +507,7 @@ impl ChainSpec {
// Altair
epochs_per_sync_committee_period: Epoch::new(8),
altair_fork_version: [0x01, 0x00, 0x00, 0x01],
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
altair_fork_epoch: None,
// Other
network_id: 2, // lighthouse testnet network id
deposit_chain_id: 5,
Expand Down Expand Up @@ -544,7 +545,9 @@ pub struct Config {

#[serde(with = "serde_utils::bytes_4_hex")]
altair_fork_version: [u8; 4],
altair_fork_epoch: Option<MaybeQuoted<Epoch>>,
#[serde(serialize_with = "serialize_fork_epoch")]
#[serde(deserialize_with = "deserialize_fork_epoch")]
pub altair_fork_epoch: Option<MaybeQuoted<Epoch>>,

#[serde(with = "serde_utils::quoted_u64")]
seconds_per_slot: u64,
Expand Down Expand Up @@ -582,6 +585,35 @@ impl Default for Config {
}
}

/// Util function to serialize a `None` fork epoch value
/// as `Epoch::max_value()`.
fn serialize_fork_epoch<S>(val: &Option<MaybeQuoted<Epoch>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match val {
None => MaybeQuoted {
value: Epoch::max_value(),
}
.serialize(s),
Some(epoch) => epoch.serialize(s),
}
}

/// Util function to deserialize a u64::max() fork epoch as `None`.
fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result<Option<MaybeQuoted<Epoch>>, D::Error>
where
D: Deserializer<'de>,
{
let decoded: Option<MaybeQuoted<Epoch>> = serde::de::Deserialize::deserialize(deserializer)?;
if let Some(fork_epoch) = decoded {
if fork_epoch.value != Epoch::max_value() {
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(fork_epoch));
}
}
Ok(None)
}

impl Config {
/// Maps `self` to an identifier for an `EthSpec` instance.
///
Expand All @@ -606,7 +638,7 @@ impl Config {
altair_fork_version: spec.altair_fork_version,
altair_fork_epoch: spec
.altair_fork_epoch
.map(|slot| MaybeQuoted { value: slot }),
.map(|epoch| MaybeQuoted { value: epoch }),

seconds_per_slot: spec.seconds_per_slot,
seconds_per_eth1_block: spec.seconds_per_eth1_block,
Expand Down
5 changes: 3 additions & 2 deletions consensus/types/src/fork_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ impl ForkContext {
ChainSpec::compute_fork_digest(spec.genesis_fork_version, genesis_validators_root),
)];

// Only add Altair to list of forks if it's enabled (i.e. spec.altair_fork_epoch != None)
// Only add Altair to list of forks if it's enabled
// Note: `altair_fork_epoch == None` implies altair hasn't been activated yet on the config.
if spec.altair_fork_epoch.is_some() {
fork_to_digest.push((
ForkName::Altair,
ChainSpec::compute_fork_digest(spec.altair_fork_version, genesis_validators_root),
))
));
}

let fork_to_digest: HashMap<ForkName, [u8; 4]> = fork_to_digest.into_iter().collect();
Expand Down