Skip to content

Commit

Permalink
Reject post fork gossip before fork
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Aug 26, 2021
1 parent 11b39ab commit c8b33b9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 40 deletions.
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use types::ForkName;
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, ChainSpec, EnrForkId, EthSpec, ForkContext,
SignedBeaconBlock, Slot, SubnetId, SyncSubnetId,
Expand Down Expand Up @@ -103,6 +104,8 @@ pub enum BehaviourEvent<TSpec: EthSpec> {
topic: TopicHash,
/// The message itself.
message: PubsubMessage<TSpec>,
/// The fork corresponding to the topic the message was received on.
fork_name: ForkName,
},
/// Inform the network to send a Status to this peer.
StatusPeer(PeerId),
Expand Down Expand Up @@ -810,13 +813,14 @@ impl<TSpec: EthSpec> NetworkBehaviourEventProcess<GossipsubEvent> for Behaviour<
warn!(self.log, "Failed to report message validation"; "message_id" => %id, "peer_id" => %propagation_source, "error" => ?e);
}
}
Ok(msg) => {
Ok((msg, fork_name)) => {
// Notify the network
self.add_event(BehaviourEvent::PubsubMessage {
id,
source: propagation_source,
topic: gs_msg.topic,
message: msg,
fork_name,
});
}
}
Expand Down
69 changes: 31 additions & 38 deletions beacon_node/eth2_libp2p/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,90 +117,83 @@ impl<T: EthSpec> PubsubMessage<T> {
}

/// This decodes `data` into a `PubsubMessage` given a topic.
/// Returns the decoded data along with the fork name corresponding to the topic
/// the message was received on.
/* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will
* need to be modified.
*/
pub fn decode(
topic: &TopicHash,
data: &[u8],
fork_context: &ForkContext,
) -> Result<Self, String> {
) -> Result<(Self, ForkName), String> {
match GossipTopic::decode(topic.as_str()) {
Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)),
Ok(gossip_topic) => {
let fork_name = fork_context
.from_context_bytes(gossip_topic.fork_digest)
.ok_or(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
))?;

// All topics are currently expected to be compressed and decompressed with snappy.
// This is done in the `SnappyTransform` struct.
// Therefore compression has already been handled for us by the time we are
// decoding the objects here.

// the ssz decoders
match gossip_topic.kind() {
let msg = match gossip_topic.kind() {
GossipKind::BeaconAggregateAndProof => {
let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AggregateAndProofAttestation(Box::new(
agg_and_proof,
)))
PubsubMessage::AggregateAndProofAttestation(Box::new(agg_and_proof))
}
GossipKind::Attestation(subnet_id) => {
let attestation =
Attestation::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::Attestation(Box::new((
*subnet_id,
attestation,
))))
PubsubMessage::Attestation(Box::new((*subnet_id, attestation)))
}
GossipKind::BeaconBlock => {
let beacon_block =
match fork_context.from_context_bytes(gossip_topic.fork_digest) {
Some(ForkName::Base) => SignedBeaconBlock::<T>::Base(
SignedBeaconBlockBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
Some(ForkName::Altair) => SignedBeaconBlock::<T>::Altair(
SignedBeaconBlockAltair::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
None => {
return Err(format!(
"Unknown gossipsub fork digest: {:?}",
gossip_topic.fork_digest
))
}
};
Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block)))
let beacon_block = match fork_name {
ForkName::Base => SignedBeaconBlock::<T>::Base(
SignedBeaconBlockBase::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
ForkName::Altair => SignedBeaconBlock::<T>::Altair(
SignedBeaconBlockAltair::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
),
};
PubsubMessage::BeaconBlock(Box::new(beacon_block))
}
GossipKind::VoluntaryExit => {
let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit)))
PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))
}
GossipKind::ProposerSlashing => {
let proposer_slashing = ProposerSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing)))
PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))
}
GossipKind::AttesterSlashing => {
let attester_slashing = AttesterSlashing::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing)))
PubsubMessage::AttesterSlashing(Box::new(attester_slashing))
}
GossipKind::SignedContributionAndProof => {
let sync_aggregate = SignedContributionAndProof::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::SignedContributionAndProof(Box::new(
sync_aggregate,
)))
PubsubMessage::SignedContributionAndProof(Box::new(sync_aggregate))
}
GossipKind::SyncCommitteeMessage(subnet_id) => {
let sync_committee = SyncCommitteeMessage::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::SyncCommitteeMessage(Box::new((
*subnet_id,
sync_committee,
))))
PubsubMessage::SyncCommitteeMessage(Box::new((*subnet_id, sync_committee)))
}
}
};
Ok((msg, *fork_name))
}
}
}
Expand Down
30 changes: 29 additions & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
&beacon_chain.spec,
));

debug!(network_log, "Current fork"; "fork_name" => ?fork_context.current_fork());

// launch libp2p service
let (network_globals, mut libp2p) = LibP2PService::new(
executor.clone(),
Expand Down Expand Up @@ -600,10 +602,36 @@ fn spawn_service<T: BeaconChainTypes>(
id,
source,
message,
..
fork_name,
topic,
} => {
// Update prometheus metrics.
metrics::expose_receive_metrics(&message);

if fork_name != service.fork_context.current_fork() {
if let Some((_, next_fork_duration)) = service.beacon_chain.duration_to_next_fork() {
// This implies that the peer is sending messages on post fork topics
// before the fork. We ignore the message and score down the peer.
if next_fork_duration > beacon_chain::MAXIMUM_GOSSIP_CLOCK_DISPARITY {
debug!(
service.log,
"Peer sent gossip on incorrect fork version topic";
"peer" => %source,
"fork_topic" => %topic,
);
service.libp2p.report_peer(&source, PeerAction::LowToleranceError, ReportSource::Gossipsub);
service
.libp2p
.swarm
.behaviour_mut()
.report_message_validation_result(
&source, id, MessageAcceptance::Ignore,
);
continue;
}
}

}
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
Expand Down

0 comments on commit c8b33b9

Please sign in to comment.