Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Subscribe to altair gossip topics 2 slots before fork #2532

Closed
wants to merge 11 commits into from
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use types::ForkName;
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext,
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId,
Expand Down Expand Up @@ -103,6 +104,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
topic: TopicHash,
/// The message itself.
message: PubsubMessage<TSpec>,
/// The fork corresponding to the topic the message was received on.
fork_name: ForkName,
},
/// Inform the network to send a Status to this peer.
StatusPeer(PeerId),
Expand Down Expand Up @@ -810,13 +813,14 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
warn!(self.log, "Failed to report message validation"; "message_id" => %id, "peer_id" => %propagation_source, "error" => ?e);
}
}
Ok(msg) => {
Ok((msg, fork_name)) => {
// Notify the network
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topic: gs_msg.topic,
message: msg,
fork_name,
});
}
}
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,11 @@ mod tests {
type Spec = types::MainnetEthSpec;

fn fork_context() -> ForkContext {
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &Spec::default_spec())
let mut chain_spec = Spec::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
ForkContext::new::<Spec>(types::Slot::new(0), Hash256::zero(), &chain_spec)
}

fn base_block() -> SignedBeaconBlock<Spec> {
Expand Down
69 changes: 31 additions & 38 deletions beacon_node/eth2_libp2p/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,90 +117,83 @@ impl<T: EthSpec> PubsubMessage<T> {
}

/// This decodes `data` into a `PubsubMessage` given a topic.
/// Returns the decoded data along with the fork name corresponding to the topic
/// the message was received on.
/* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will
* need to be modified.
*/
pub fn decode(
topic: &TopicHash,
data: &[u8],
fork_context: &ForkContext,
) -> Result<Self, String> {
) -> Result<(Self, ForkName), String> {
match GossipTopic::decode(topic.as_str()) {
Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)),
Ok(gossip_topic) => {
let fork_name = fork_context
.from_context_bytes(gossip_topic.fork_digest)
.ok_or(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
))?;

// All topics are currently expected to be compressed and decompressed with snappy.
// This is done in the `SnappyTransform` struct.
// Therefore compression has already been handled for us by the time we are
// decoding the objects here.

// the ssz decoders
match gossip_topic.kind() {
let msg = match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {
let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)))
PubsubMessage::AggregateAndProofAttestation(Box::new(agg_and_proof))
}
GossipKind::Attestation(subnet_id) => {
let attestation =
Attestation::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))))
PubsubMessage::Attestation(Box::new((*subnet_id, attestation)))
}
GossipKind::BeaconBlock => {
let beacon_block =
match fork_context.from_context_bytes(gossip_topic.fork_digest) {
Some(ForkName::Base) => SignedBeaconBlock::<T>::Base(
SignedBeaconBlockBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
Some(ForkName::Altair) => SignedBeaconBlock::<T>::Altair(
SignedBeaconBlockAltair::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
None => {
return Err(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
))
}
};
Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)))
let beacon_block = match fork_name {
ForkName::Base => SignedBeaconBlock::<T>::Base(
SignedBeaconBlockBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
ForkName::Altair => SignedBeaconBlock::<T>::Altair(
SignedBeaconBlockAltair::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
};
PubsubMessage::BeaconBlock(Box::new(beacon_block))
}
GossipKind::VoluntaryExit => {
let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)))
PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))
}
GossipKind::ProposerSlashing => {
let proposer_slashing = ProposerSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing)))
PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))
}
GossipKind::AttesterSlashing => {
let attester_slashing = AttesterSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing)))
PubsubMessage::AttesterSlashing(Box::new(attester_slashing))
}
GossipKind::SignedContributionAndProof => {
let sync_aggregate = SignedContributionAndProof::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::SignedContributionAndProof(Box::new(
sync_aggregate,
)))
PubsubMessage::SignedContributionAndProof(Box::new(sync_aggregate))
}
GossipKind::SyncCommitteeMessage(subnet_id) => {
let sync_committee = SyncCommitteeMessage::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::SyncCommitteeMessage(Box::new((
*subnet_id,
sync_committee,
))))
PubsubMessage::SyncCommitteeMessage(Box::new((*subnet_id, sync_committee)))
}
}
};
Ok((msg, *fork_name))
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/eth2_libp2p/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::runtime::Runtime;
use types::{ChainSpec, EnrForkId, ForkContext, Hash256, MinimalEthSpec};
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec};

type E = MinimalEthSpec;
use tempfile::Builder as TempBuilder;

/// Returns a dummy fork context
fn fork_context() -> ForkContext {
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &ChainSpec::minimal())
let mut chain_spec = E::default_spec();
// Set fork_epoch to `Some` to ensure that the `ForkContext` object
// includes altair in the list of forks
chain_spec.altair_fork_epoch = Some(types::Epoch::new(42));
ForkContext::new::<E>(types::Slot::new(0), Hash256::zero(), &chain_spec)
}

pub struct Libp2pInstance(LibP2PService<E>, exit_future::Signal);
Expand Down
37 changes: 30 additions & 7 deletions beacon_node/eth2_libp2p/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,16 @@ fn test_blocks_by_range_chunked_rpc() {
step: 0,
});

// BlocksByRange Response
let spec = E::default_spec();
let empty_block = BeaconBlock::empty(&spec);
let empty_signed = SignedBeaconBlock::from_block(empty_block, Signature::empty());
let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed)));

// BlocksByRange Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_base = Response::BlocksByRange(Some(Box::new(signed_full_block)));

let full_block = BeaconBlock::Altair(BeaconBlockAltair::<E>::full(&spec));
let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty());
let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block)));

// keep count of the number of messages received
let mut messages_received = 0;
Expand All @@ -167,7 +172,11 @@ fn test_blocks_by_range_chunked_rpc() {
warn!(log, "Sender received a response");
match response {
Response::BlocksByRange(Some(_)) => {
assert_eq!(response, rpc_response.clone());
if messages_received < 5 {
assert_eq!(response, rpc_response_base.clone());
} else {
assert_eq!(response, rpc_response_altair.clone());
}
messages_received += 1;
warn!(log, "Chunk received");
}
Expand Down Expand Up @@ -197,7 +206,14 @@ fn test_blocks_by_range_chunked_rpc() {
if request == rpc_request {
// send the response
warn!(log, "Receiver got request");
for _ in 1..=messages_to_send {
for i in 0..messages_to_send {
// Send first half of responses as base blocks and
// second half as altair blocks.
let rpc_response = if i < 5 {
rpc_response_base.clone()
} else {
rpc_response_altair.clone()
};
receiver.swarm.behaviour_mut().send_successful_response(
peer_id,
id,
Expand Down Expand Up @@ -481,7 +497,7 @@ fn test_blocks_by_root_chunked_rpc() {
let log_level = Level::Debug;
let enable_logging = false;

let messages_to_send = 3;
let messages_to_send = 10;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();
Expand All @@ -497,6 +513,13 @@ fn test_blocks_by_root_chunked_rpc() {
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
]),
});

Expand Down
31 changes: 30 additions & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));

debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork());

// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
executor.clone(),
Expand Down Expand Up @@ -600,10 +602,37 @@ fn spawn_service<T: BeaconChainTypes>(
id,
source,
message,
..
fork_name,
topic,
} => {
// Update prometheus metrics.
metrics::expose_receive_metrics(&message);

if fork_name != service.fork_context.current_fork() {
if let Some((_, next_fork_duration)) = service.beacon_chain.duration_to_next_fork() {
// This implies that the peer is sending messages on post fork topics
// before the fork. We ignore the message and score down the peer.
if next_fork_duration > beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY {
debug!(
service.log,
"Peer sent gossip on incorrect fork version topic";
"peer" => %source,
"fork_topic" => %topic,
);
// Penalize with MidTolerance to account for slight clock skews beyond `MAXIMUM_GOSSIP_CLOCK_DISPARITY`
service.libp2p.report_peer(&source, PeerAction::MidToleranceError, ReportSource::Gossipsub);
service
.libp2p
.swarm
.behaviour_mut()
.report_message_validation_result(
&source, id, MessageAcceptance::Ignore,
);
continue;
}
}

}
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
Expand Down
42 changes: 37 additions & 5 deletions consensus/types/src/chain_spec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::*;
use int_to_bytes::int_to_bytes4;
use serde_derive::{Deserialize, Serialize};
use serde::{Deserializer, Serialize, Serializer};
use serde_derive::Deserialize;
use serde_utils::quoted_u64::MaybeQuoted;
use std::fs::File;
use std::path::Path;
Expand Down Expand Up @@ -467,7 +468,7 @@ impl ChainSpec {
domain_sync_committee_selection_proof: 8,
domain_contribution_and_proof: 9,
altair_fork_version: [0x01, 0x00, 0x00, 0x00],
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
altair_fork_epoch: None,

/*
* Network specific
Expand Down Expand Up @@ -506,7 +507,7 @@ impl ChainSpec {
// Altair
epochs_per_sync_committee_period: Epoch::new(8),
altair_fork_version: [0x01, 0x00, 0x00, 0x01],
altair_fork_epoch: Some(Epoch::new(u64::MAX)),
altair_fork_epoch: None,
// Other
network_id: 2, // lighthouse testnet network id
deposit_chain_id: 5,
Expand Down Expand Up @@ -544,7 +545,9 @@ pub struct Config {

#[serde(with = "serde_utils::bytes_4_hex")]
altair_fork_version: [u8; 4],
altair_fork_epoch: Option<MaybeQuoted<Epoch>>,
#[serde(serialize_with = "serialize_fork_epoch")]
#[serde(deserialize_with = "deserialize_fork_epoch")]
pub altair_fork_epoch: Option<MaybeQuoted<Epoch>>,

#[serde(with = "serde_utils::quoted_u64")]
seconds_per_slot: u64,
Expand Down Expand Up @@ -582,6 +585,35 @@ impl Default for Config {
}
}

/// Util function to serialize a `None` fork epoch value
/// as `Epoch::max_value()`.
fn serialize_fork_epoch<S>(val: &Option<MaybeQuoted<Epoch>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match val {
None => MaybeQuoted {
value: Epoch::max_value(),
}
.serialize(s),
Some(epoch) => epoch.serialize(s),
}
}

/// Util function to deserialize a u64::max() fork epoch as `None`.
fn deserialize_fork_epoch<'de, D>(deserializer: D) -> Result<Option<MaybeQuoted<Epoch>>, D::Error>
where
D: Deserializer<'de>,
{
let decoded: Option<MaybeQuoted<Epoch>> = serde::de::Deserialize::deserialize(deserializer)?;
if let Some(fork_epoch) = decoded {
if fork_epoch.value != Epoch::max_value() {
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Some(fork_epoch));
}
}
Ok(None)
}

impl Config {
/// Maps `self` to an identifier for an `EthSpec` instance.
///
Expand All @@ -606,7 +638,7 @@ impl Config {
altair_fork_version: spec.altair_fork_version,
altair_fork_epoch: spec
.altair_fork_epoch
.map(|slot| MaybeQuoted { value: slot }),
.map(|epoch| MaybeQuoted { value: epoch }),

seconds_per_slot: spec.seconds_per_slot,
seconds_per_eth1_block: spec.seconds_per_eth1_block,
Expand Down
Loading