From d46ac6c3d3b89fe1773c53776947d646c53ebdb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 9 Jul 2024 06:37:19 +0100 Subject: [PATCH] Implement gossipsub IDONTWANT (#5422) * move gossipsub into a separate crate * Merge branch 'unstable' of github.com:sigp/lighthouse into separate-gossipsub * update rpc.proto and generate rust bindings * gossipsub: implement IDONTWANT messages * address review * move GossipPromises out of PeerScore * impl PeerKind::is_gossipsub that returns true if peer speaks any version of gossipsub * address review 2 * Merge branch 'separate-gossipsub' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * add metrics * add tests * make 1.2 beta before spec is merged * Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * cargo clippy * Collect decoded IDONTWANT messages * Use the beta tag in most places to simplify the transition * Fix failed test by using fresh message-ids * Gossipsub v1.2-beta * Merge latest unstable * Cargo update * Merge pull request #5 from ackintosh/impl-gossipsub-idontwant-ackintosh-fix-test Fix `test_ignore_too_many_messages_in_ihave` test * Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * update CHANGELOG.md * remove beta for 1.2 IDONTWANT spec has been merged * Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse into impl-gossipsub-idontwant * Merge branch 'unstable' of github.com:sigp/lighthouse into impl-gossipsub-idontwant * improve comments wording * Merge branch 'impl-gossipsub-idontwant' of github.com:jxs/lighthouse into impl-gossipsub-idontwant --- Cargo.lock | 16 +- Cargo.toml | 1 + .../lighthouse_network/gossipsub/CHANGELOG.md | 3 + .../lighthouse_network/gossipsub/Cargo.toml | 1 + .../gossipsub/src/behaviour.rs | 169 +++++++++++--- .../gossipsub/src/behaviour/tests.rs | 215 +++++++++++++++++- .../gossipsub/src/generated/gossipsub/pb.rs | 36 +++ .../gossipsub/src/generated/rpc.proto | 5 + .../gossipsub/src/gossip_promises.rs | 8 + .../gossipsub/src/metrics.rs | 35 +++ .../gossipsub/src/protocol.rs | 29 ++- .../lighthouse_network/gossipsub/src/types.rs | 62 +++++ .../lighthouse_network/src/service/utils.rs | 1 + 13 files changed, 533 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de325243c50..317b30d960d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2069,7 +2069,7 @@ dependencies = [ "enr", "fnv", "futures", - "hashlink", + "hashlink 0.8.4", "hex", "hkdf", "lazy_static", @@ -3357,6 +3357,7 @@ dependencies = [ "futures-ticker", "futures-timer", "getrandom", + "hashlink 0.9.0", "hex_fmt", "libp2p", "prometheus-client", @@ -3472,6 +3473,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692eaaf7f7607518dd3cef090f1474b61edc5301d8012f09579920df68b725ee" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "headers" version = "0.3.9" @@ -6956,7 +6966,7 @@ dependencies = [ "bitflags 1.3.2", "fallible-iterator", "fallible-streaming-iterator", - "hashlink", + "hashlink 0.8.4", "libsqlite3-sys", "smallvec", ] @@ -9773,7 +9783,7 @@ checksum = "498f4d102a79ea1c9d4dd27573c0fc96ad74c023e8da38484e47883076da25fb" dependencies = [ "arraydeque", "encoding_rs", - "hashlink", + "hashlink 0.8.4", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index eedc47470e2..b3532dda35e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ fnv = "1" fs2 = "0.4" futures = "0.3" hex = "0.4" +hashlink = "0.9.0" hyper = "1" itertools = "0.10" lazy_static = "1" diff --git a/beacon_node/lighthouse_network/gossipsub/CHANGELOG.md b/beacon_node/lighthouse_network/gossipsub/CHANGELOG.md index 448e224cb6b..7ec10af741e 100644 --- a/beacon_node/lighthouse_network/gossipsub/CHANGELOG.md +++ b/beacon_node/lighthouse_network/gossipsub/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.5 Sigma Prime fork +- Implement IDONTWANT messages as per [spec](https://github.com/libp2p/specs/pull/548). + See [PR 5422](https://github.com/sigp/lighthouse/pull/5422) + - Attempt to publish to at least mesh_n peers when publishing a message when flood publish is disabled. See [PR 5357](https://github.com/sigp/lighthouse/pull/5357). - Drop `Publish` and `Forward` gossipsub stale messages when polling ConnectionHandler. diff --git a/beacon_node/lighthouse_network/gossipsub/Cargo.toml b/beacon_node/lighthouse_network/gossipsub/Cargo.toml index d8fa445e63f..56c42d29920 100644 --- a/beacon_node/lighthouse_network/gossipsub/Cargo.toml +++ b/beacon_node/lighthouse_network/gossipsub/Cargo.toml @@ -24,6 +24,7 @@ futures = "0.3.30" futures-ticker = "0.0.3" futures-timer = "3.0.2" getrandom = "0.2.12" +hashlink.workspace = true hex_fmt = "0.3.0" libp2p = { version = "0.53", default-features = false } quick-protobuf = "0.8" diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs index ccebb4e267f..0a3b7a9f529 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs @@ -31,6 +31,7 @@ use std::{ use futures::StreamExt; use futures_ticker::Ticker; +use hashlink::LinkedHashMap; use prometheus_client::registry::Registry; use rand::{seq::SliceRandom, thread_rng}; @@ -45,6 +46,8 @@ use libp2p::swarm::{ }; use web_time::{Instant, SystemTime}; +use crate::types::IDontWant; + use super::gossip_promises::GossipPromises; use super::handler::{Handler, HandlerEvent, HandlerIn}; use super::mcache::MessageCache; @@ -73,6 +76,12 @@ use std::{cmp::Ordering::Equal, fmt::Debug}; #[cfg(test)] mod tests; +/// IDONTWANT cache capacity. +const IDONTWANT_CAP: usize = 10_000; + +/// IDONTWANT timeout before removal. +const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0); + /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. @@ -304,9 +313,8 @@ pub struct Behaviour { /// discovery and not by PX). outbound_peers: HashSet, - /// Stores optional peer score data together with thresholds, decay interval and gossip - /// promises. - peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>, + /// Stores optional peer score data together with thresholds and decay interval. + peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker)>, /// Counts the number of `IHAVE` received from each peer since the last heartbeat. count_received_ihave: HashMap, @@ -331,6 +339,9 @@ pub struct Behaviour { /// Tracks the numbers of failed messages per peer-id. failed_messages: HashMap, + + /// Tracks recently sent `IWANT` messages and checks if peers respond to them. + gossip_promises: GossipPromises, } impl Behaviour @@ -467,6 +478,7 @@ where subscription_filter, data_transform, failed_messages: Default::default(), + gossip_promises: Default::default(), }) } } @@ -919,7 +931,7 @@ where let interval = Ticker::new(params.decay_interval); let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback); - self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default())); + self.peer_score = Some((peer_score, threshold, interval)); Ok(()) } @@ -1187,7 +1199,7 @@ where } fn score_below_threshold_from_scores( - peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker, GossipPromises)>, + peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker)>, peer_id: &PeerId, threshold: impl Fn(&PeerScoreThresholds) -> f64, ) -> (bool, f64) { @@ -1248,10 +1260,7 @@ where return false; } - self.peer_score - .as_ref() - .map(|(_, _, _, promises)| !promises.contains(id)) - .unwrap_or(true) + !self.gossip_promises.contains(id) }; for (topic, ids) in ihave_msgs { @@ -1298,13 +1307,11 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { - gossip_promises.add_promise( - *peer_id, - &iwant_ids_vec, - Instant::now() + self.config.iwant_followup_time(), - ); - } + self.gossip_promises.add_promise( + *peer_id, + &iwant_ids_vec, + Instant::now() + self.config.iwant_followup_time(), + ); if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { tracing::trace!( @@ -1369,6 +1376,11 @@ where "IWANT: Peer has asked for message too many times; ignoring request" ); } else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { + if peer.dont_send.get(&id).is_some() { + tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message"); + continue; + } + tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); if peer .sender @@ -1706,14 +1718,15 @@ where peer=%propagation_source, "Rejecting message from blacklisted peer" ); - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + self.gossip_promises + .reject_message(msg_id, &RejectReason::BlackListedPeer); + if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message( propagation_source, msg_id, &raw_message.topic, RejectReason::BlackListedPeer, ); - gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer); } return false; } @@ -1795,6 +1808,9 @@ where // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); + // Broadcast IDONTWANT messages. + self.send_idontwant(&raw_message, &msg_id, propagation_source); + // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. @@ -1820,11 +1836,12 @@ where metrics.msg_recvd(&message.topic); } - // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + self.gossip_promises.message_delivered(&msg_id); + + // Tells score that message arrived (but is maybe not fully validated yet). + if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.validate_message(propagation_source, &msg_id, &message.topic); - gossip_promises.message_delivered(&msg_id); } // Add the message to our memcache @@ -1871,7 +1888,7 @@ where raw_message: &RawMessage, reject_reason: RejectReason, ) { - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { + if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(metrics) = self.metrics.as_mut() { metrics.register_invalid_message(&raw_message.topic); } @@ -1886,7 +1903,8 @@ where reject_reason, ); - gossip_promises.reject_message(&message_id, &reject_reason); + self.gossip_promises + .reject_message(&message_id, &reject_reason); } else { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double @@ -1959,7 +1977,7 @@ where } // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) - && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub) + && peer.kind.is_gossipsub() && !Self::score_below_threshold_from_scores( &self.peer_score, propagation_source, @@ -2066,8 +2084,8 @@ where /// Applies penalties to peers that did not respond to our IWANT requests. fn apply_iwant_penalties(&mut self) { - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - for (peer, count) in gossip_promises.get_broken_promises() { + if let Some((peer_score, ..)) = &mut self.peer_score { + for (peer, count) in self.gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); if let Some(metrics) = self.metrics.as_mut() { metrics.register_score_penalty(Penalty::BrokenPromise); @@ -2288,7 +2306,7 @@ where && peers.len() > 1 && self.peer_score.is_some() { - if let Some((_, thresholds, _, _)) = &self.peer_score { + if let Some((_, thresholds, _)) = &self.peer_score { // Opportunistic grafting works as follows: we check the median score of peers // in the mesh; if this score is below the opportunisticGraftThreshold, we // select a few peers at random with score over the median. @@ -2381,7 +2399,7 @@ where for (topic_hash, peers) in self.fanout.iter_mut() { let mut to_remove_peers = Vec::new(); let publish_threshold = match &self.peer_score { - Some((_, thresholds, _, _)) => thresholds.publish_threshold, + Some((_, thresholds, _)) => thresholds.publish_threshold, _ => 0.0, }; for peer_id in peers.iter() { @@ -2474,6 +2492,17 @@ where } self.failed_messages.shrink_to_fit(); + // Flush stale IDONTWANTs. + for peer in self.connected_peers.values_mut() { + while let Some((_front, instant)) = peer.dont_send.front() { + if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() { + break; + } else { + peer.dont_send.pop_front(); + } + } + } + tracing::debug!("Completed Heartbeat"); if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); @@ -2655,6 +2684,59 @@ where } } + /// Helper function which sends an IDONTWANT message to mesh\[topic\] peers. + fn send_idontwant( + &mut self, + message: &RawMessage, + msg_id: &MessageId, + propagation_source: &PeerId, + ) { + let Some(mesh_peers) = self.mesh.get(&message.topic) else { + return; + }; + + let iwant_peers = self.gossip_promises.peers_for_message(msg_id); + + let recipient_peers = mesh_peers + .iter() + .chain(iwant_peers.iter()) + .filter(|peer_id| { + *peer_id != propagation_source && Some(*peer_id) != message.source.as_ref() + }); + + for peer_id in recipient_peers { + let Some(peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!(peer = %peer_id, + "Could not IDONTWANT, peer doesn't exist in connected peer list"); + continue; + }; + + // Only gossipsub 1.2 peers support IDONTWANT. + if peer.kind != PeerKind::Gossipsubv1_2_beta { + continue; + } + + if peer + .sender + .idontwant(IDontWant { + message_ids: vec![msg_id.clone()], + }) + .is_err() + { + tracing::warn!(peer=%peer_id, "Send Queue full. Could not send IDONTWANT"); + + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.failed_message_slow_peer(peer_id); + } + // Increment failed message count + self.failed_messages + .entry(*peer_id) + .or_default() + .non_priority += 1; + } + } + } + /// Helper function which forwards a message to mesh\[topic\] peers. /// /// Returns true if at least one peer was messaged. @@ -2708,6 +2790,11 @@ where if !recipient_peers.is_empty() { for peer_id in recipient_peers.iter() { if let Some(peer) = self.connected_peers.get_mut(peer_id) { + if peer.dont_send.get(msg_id).is_some() { + tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message"); + continue; + } + tracing::debug!(%peer_id, message=%msg_id, "Sending message to peer"); if peer .sender @@ -3057,6 +3144,7 @@ where connections: vec![], sender: RpcSender::new(self.config.connection_handler_queue_len()), topics: Default::default(), + dont_send: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3087,6 +3175,7 @@ where connections: vec![], sender: RpcSender::new(self.config.connection_handler_queue_len()), topics: Default::default(), + dont_send: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3136,7 +3225,7 @@ where } HandlerEvent::MessageDropped(rpc) => { // Account for this in the scoring logic - if let Some((peer_score, _, _, _)) = &mut self.peer_score { + if let Some((peer_score, _, _)) = &mut self.peer_score { peer_score.failed_message_slow_peer(&propagation_source); } @@ -3245,6 +3334,24 @@ where peers, backoff, }) => prune_msgs.push((topic_hash, peers, backoff)), + ControlAction::IDontWant(IDontWant { message_ids }) => { + let Some(peer) = self.connected_peers.get_mut(&propagation_source) + else { + tracing::error!(peer = %propagation_source, + "Could not handle IDONTWANT, peer doesn't exist in connected peer list"); + continue; + }; + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_idontwant(message_ids.len()); + } + for message_id in message_ids { + peer.dont_send.insert(message_id, Instant::now()); + // Don't exceed capacity. + if peer.dont_send.len() > IDONTWANT_CAP { + peer.dont_send.pop_front(); + } + } + } } } if !ihave_msgs.is_empty() { @@ -3270,7 +3377,7 @@ where } // update scores - if let Some((peer_score, _, interval, _)) = &mut self.peer_score { + if let Some((peer_score, _, interval)) = &mut self.peer_score { while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) { peer_score.refresh_scores(); } @@ -3395,7 +3502,7 @@ fn get_random_peers_dynamic( .iter() .filter(|(_, p)| p.topics.contains(topic_hash)) .filter(|(peer_id, _)| f(peer_id)) - .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1) + .filter(|(_, p)| p.kind.is_gossipsub()) .map(|(peer_id, _)| *peer_id) .collect::>(); diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs index 2af0199ec93..a378198be33 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs @@ -31,13 +31,7 @@ use std::net::Ipv4Addr; use std::thread::sleep; #[derive(Default, Debug)] -struct InjectNodes -// TODO: remove trait bound Default when this issue is fixed: -// https://github.com/colin-kiegel/rust-derive-builder/issues/93 -where - D: DataTransform + Default + Clone + Send + 'static, - F: TopicSubscriptionFilter + Clone + Default + Send + 'static, -{ +struct InjectNodes { peer_no: usize, topics: Vec, to_subscribe: bool, @@ -47,6 +41,7 @@ where scoring: Option<(PeerScoreParams, PeerScoreThresholds)>, data_transform: D, subscription_filter: F, + peer_kind: Option, } impl InjectNodes @@ -94,7 +89,7 @@ where let empty = vec![]; for i in 0..self.peer_no { - let (peer, receiver) = add_peer( + let (peer, receiver) = add_peer_with_addr_and_kind( &mut gs, if self.to_subscribe { &topic_hashes @@ -103,6 +98,8 @@ where }, i < self.outbound, i < self.explicit, + Multiaddr::empty(), + self.peer_kind.clone().or(Some(PeerKind::Gossipsubv1_1)), ); peers.push(peer); receivers.insert(peer, receiver); @@ -151,6 +148,11 @@ where self.subscription_filter = subscription_filter; self } + + fn peer_kind(mut self, peer_kind: PeerKind) -> Self { + self.peer_kind = Some(peer_kind); + self + } } fn inject_nodes() -> InjectNodes @@ -235,6 +237,7 @@ where kind: kind.clone().unwrap_or(PeerKind::Floodsub), connections: vec![connection_id], topics: Default::default(), + dont_send: LinkedHashMap::new(), sender, }, ); @@ -620,6 +623,7 @@ fn test_join() { kind: PeerKind::Floodsub, connections: vec![connection_id], topics: Default::default(), + dont_send: LinkedHashMap::new(), sender, }, ); @@ -1015,6 +1019,7 @@ fn test_get_random_peers() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: RpcSender::new(gs.config.connection_handler_queue_len()), + dont_send: LinkedHashMap::new(), }, ); } @@ -4580,9 +4585,9 @@ fn test_ignore_too_many_messages_in_ihave() { let (peer, receiver) = add_peer(&mut gs, &topics, false, false); receivers.insert(peer, receiver); - //peer has 20 messages + //peer has 30 messages let mut seq = 0; - let message_ids: Vec<_> = (0..20) + let message_ids: Vec<_> = (0..30) .map(|_| random_message(&mut seq, &topics)) .map(|msg| gs.data_transform.inbound_transform(msg).unwrap()) .map(|msg| config.message_id(&msg)) @@ -4624,7 +4629,7 @@ fn test_ignore_too_many_messages_in_ihave() { gs.heartbeat(); gs.handle_ihave( &peer, - vec![(topics[0].clone(), message_ids[10..20].to_vec())], + vec![(topics[0].clone(), message_ids[20..30].to_vec())], ); //we sent 10 iwant messages ids via a IWANT rpc. @@ -5236,3 +5241,191 @@ fn test_graft_without_subscribe() { // We unsubscribe from the topic. let _ = gs.unsubscribe(&Topic::new(topic)); } + +/// Test that a node sends IDONTWANT messages to the mesh peers +/// that run Gossipsub v1.2. +#[test] +fn sends_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2_beta) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + assert_ne!(peer_id, peers[1]); + idontwants += 1; + } + } + idontwants + }), + 3, + "IDONTWANT was not sent" + ); +} + +/// Test that a node doesn't send IDONTWANT messages to the mesh peers +/// that don't run Gossipsub v1.2. +#[test] +fn doesnt_send_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_1) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if matches!(non_priority.try_recv(), Ok(RpcOut::IDontWant(_)) if peer_id != peers[1]) { + idontwants += 1; + } + } + idontwants + }), + 0, + "IDONTWANT were sent" + ); +} + +/// Test that a node doesn't forward a messages to the mesh peers +/// that sent IDONTWANT. +#[test] +fn doesnt_forward_idontwant() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(4) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2_beta) + .create_network(); + + let local_id = PeerId::random(); + + let raw_message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + let message = gs + .data_transform + .inbound_transform(raw_message.clone()) + .unwrap(); + let message_id = gs.config.message_id(&message); + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + peer.dont_send.insert(message_id, Instant::now()); + + gs.handle_received_message(raw_message.clone(), &local_id); + assert_eq!( + receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() { + assert_ne!(peer_id, peers[2]); + fwds += 1; + } + } + fwds + }), + 2, + "IDONTWANT was not sent" + ); +} + +/// Test that a node parses an +/// IDONTWANT message to the respective peer. +#[test] +fn parses_idontwant() { + let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + .peer_no(2) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2_beta) + .create_network(); + + let message_id = MessageId::new(&[0, 1, 2, 3]); + let rpc = Rpc { + messages: vec![], + subscriptions: vec![], + control_msgs: vec![ControlAction::IDontWant(IDontWant { + message_ids: vec![message_id.clone()], + })], + }; + gs.on_connection_handler_event( + peers[1], + ConnectionId::new_unchecked(0), + HandlerEvent::Message { + rpc, + invalid_messages: vec![], + }, + ); + let peer = gs.connected_peers.get_mut(&peers[1]).unwrap(); + assert!(peer.dont_send.get(&message_id).is_some()); +} + +/// Test that a node clears stale IDONTWANT messages. +#[test] +fn clear_stale_idontwant() { + let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + .peer_no(4) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2_beta) + .create_network(); + + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + peer.dont_send + .insert(MessageId::new(&[1, 2, 3, 4]), Instant::now()); + std::thread::sleep(Duration::from_secs(3)); + gs.heartbeat(); + let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); + assert!(peer.dont_send.is_empty()); +} diff --git a/beacon_node/lighthouse_network/gossipsub/src/generated/gossipsub/pb.rs b/beacon_node/lighthouse_network/gossipsub/src/generated/gossipsub/pb.rs index 9a074fd61fc..24ac80d2755 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/generated/gossipsub/pb.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/generated/gossipsub/pb.rs @@ -154,6 +154,7 @@ pub struct ControlMessage { pub iwant: Vec, pub graft: Vec, pub prune: Vec, + pub idontwant: Vec, } impl<'a> MessageRead<'a> for ControlMessage { @@ -165,6 +166,7 @@ impl<'a> MessageRead<'a> for ControlMessage { Ok(18) => msg.iwant.push(r.read_message::(bytes)?), Ok(26) => msg.graft.push(r.read_message::(bytes)?), Ok(34) => msg.prune.push(r.read_message::(bytes)?), + Ok(42) => msg.idontwant.push(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -180,6 +182,7 @@ impl MessageWrite for ControlMessage { + self.iwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() } fn write_message(&self, w: &mut Writer) -> Result<()> { @@ -187,6 +190,7 @@ impl MessageWrite for ControlMessage { for s in &self.iwant { w.write_with_tag(18, |w| w.write_message(s))?; } for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; } for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; } + for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; } Ok(()) } } @@ -331,6 +335,38 @@ impl MessageWrite for ControlPrune { } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct ControlIDontWant { + pub message_ids: Vec>, +} + +impl<'a> MessageRead<'a> for ControlIDontWant { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.message_ids.push(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for ControlIDontWant { + fn get_size(&self) -> usize { + 0 + + self.message_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::() + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + for s in &self.message_ids { w.write_with_tag(10, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] pub struct PeerInfo { diff --git a/beacon_node/lighthouse_network/gossipsub/src/generated/rpc.proto b/beacon_node/lighthouse_network/gossipsub/src/generated/rpc.proto index 2ce12f3f37f..e3b5888d2c0 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/generated/rpc.proto +++ b/beacon_node/lighthouse_network/gossipsub/src/generated/rpc.proto @@ -28,6 +28,7 @@ message ControlMessage { repeated ControlIWant iwant = 2; repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; + repeated ControlIDontWant idontwant = 5; } message ControlIHave { @@ -49,6 +50,10 @@ message ControlPrune { optional uint64 backoff = 3; // gossipsub v1.1 backoff time (in seconds) } +message ControlIDontWant { + repeated bytes message_ids = 1; +} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; diff --git a/beacon_node/lighthouse_network/gossipsub/src/gossip_promises.rs b/beacon_node/lighthouse_network/gossipsub/src/gossip_promises.rs index 2bfb20595a8..3f72709245f 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/gossip_promises.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/gossip_promises.rs @@ -41,6 +41,14 @@ impl GossipPromises { self.promises.contains_key(message) } + ///Get the peers we sent IWANT the input message id. + pub(crate) fn peers_for_message(&self, message_id: &MessageId) -> Vec { + self.promises + .get(message_id) + .map(|peers| peers.keys().copied().collect()) + .unwrap_or_default() + } + /// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting. pub(crate) fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) { for message_id in messages { diff --git a/beacon_node/lighthouse_network/gossipsub/src/metrics.rs b/beacon_node/lighthouse_network/gossipsub/src/metrics.rs index 91bcd5f54bc..7e1cdac18ba 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/metrics.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/metrics.rs @@ -179,6 +179,12 @@ pub(crate) struct Metrics { /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, + /// The number of times we have received an IDONTWANT control message. + idontwant_msgs: Counter, + + /// The number of msg_id's we have received in every IDONTWANT control message. + idontwant_msgs_ids: Counter, + /// The size of the priority queue. priority_queue_size: Histogram, /// The size of the non-priority queue. @@ -311,6 +317,27 @@ impl Metrics { "topic_iwant_msgs", "Number of times we have decided an IWANT is required for this topic" ); + + let idontwant_msgs = { + let metric = Counter::default(); + registry.register( + "idontwant_msgs", + "The number of times we have received an IDONTWANT control message", + metric.clone(), + ); + metric + }; + + let idontwant_msgs_ids = { + let metric = Counter::default(); + registry.register( + "idontwant_msgs_ids", + "The number of msg_id's we have received in every IDONTWANT control message.", + metric.clone(), + ); + metric + }; + let memcache_misses = { let metric = Counter::default(); registry.register( @@ -362,6 +389,8 @@ impl Metrics { heartbeat_duration, memcache_misses, topic_iwant_msgs, + idontwant_msgs, + idontwant_msgs_ids, priority_queue_size, non_priority_queue_size, } @@ -560,6 +589,12 @@ impl Metrics { } } + /// Register receiving an IDONTWANT msg for this topic. + pub(crate) fn register_idontwant(&mut self, msgs: usize) { + self.idontwant_msgs.inc(); + self.idontwant_msgs_ids.inc_by(msgs as u64); + } + /// Observes a heartbeat duration. pub(crate) fn observe_heartbeat_duration(&mut self, millis: u64) { self.heartbeat_duration.observe(millis as f64); diff --git a/beacon_node/lighthouse_network/gossipsub/src/protocol.rs b/beacon_node/lighthouse_network/gossipsub/src/protocol.rs index ba84ae0aa7a..5611ae32c91 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/protocol.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/protocol.rs @@ -23,8 +23,8 @@ use super::handler::HandlerEvent; use super::rpc_proto::proto; use super::topic::TopicHash; use super::types::{ - ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc, - Subscription, SubscriptionAction, + ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, + RawMessage, Rpc, Subscription, SubscriptionAction, }; use super::ValidationError; use asynchronous_codec::{Decoder, Encoder, Framed}; @@ -40,6 +40,10 @@ use void::Void; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_2_0_BETA_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.2.0"), + kind: PeerKind::Gossipsubv1_2_beta, +}; pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.1.0"), kind: PeerKind::Gossipsubv1_1, @@ -69,7 +73,11 @@ impl Default for ProtocolConfig { Self { max_transmit_size: 65536, validation_mode: ValidationMode::Strict, - protocol_ids: vec![GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL], + protocol_ids: vec![ + GOSSIPSUB_1_2_0_BETA_PROTOCOL, + GOSSIPSUB_1_1_0_PROTOCOL, + GOSSIPSUB_1_0_0_PROTOCOL, + ], } } } @@ -476,10 +484,25 @@ impl Decoder for GossipsubCodec { })); } + let idontwant_msgs: Vec = rpc_control + .idontwant + .into_iter() + .map(|idontwant| { + ControlAction::IDontWant(IDontWant { + message_ids: idontwant + .message_ids + .into_iter() + .map(MessageId::from) + .collect::>(), + }) + }) + .collect(); + control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); control_msgs.extend(graft_msgs); control_msgs.extend(prune_msgs); + control_msgs.extend(idontwant_msgs); } Ok(Some(HandlerEvent::Message { diff --git a/beacon_node/lighthouse_network/gossipsub/src/types.rs b/beacon_node/lighthouse_network/gossipsub/src/types.rs index 84bdfb786f9..8df307d470b 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/types.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/types.rs @@ -25,6 +25,7 @@ use async_channel::{Receiver, Sender}; use futures::stream::Peekable; use futures::{Future, Stream, StreamExt}; use futures_timer::Delay; +use hashlink::LinkedHashMap; use libp2p::identity::PeerId; use libp2p::swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; @@ -34,6 +35,7 @@ use std::fmt::Debug; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Instant; use std::{fmt, pin::Pin}; use web_time::Duration; @@ -121,11 +123,16 @@ pub(crate) struct PeerConnections { pub(crate) sender: RpcSender, /// Subscribed topics. pub(crate) topics: BTreeSet, + /// Don't send messages. + pub(crate) dont_send: LinkedHashMap, } /// Describes the types of peers that can exist in the gossipsub context. #[derive(Debug, Clone, PartialEq, Hash, EncodeLabelValue, Eq)] +#[allow(non_camel_case_types)] pub enum PeerKind { + /// A gossipsub 1.2 peer. + Gossipsubv1_2_beta, /// A gossipsub 1.1 peer. Gossipsubv1_1, /// A gossipsub 1.0 peer. @@ -136,6 +143,16 @@ pub enum PeerKind { NotSupported, } +impl PeerKind { + /// Returns true if peer speaks any gossipsub version. + pub(crate) fn is_gossipsub(&self) -> bool { + matches!( + self, + Self::Gossipsubv1_2_beta | Self::Gossipsubv1_1 | Self::Gossipsub + ) + } +} + /// A message received by the gossipsub system and stored locally in caches.. #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct RawMessage { @@ -257,6 +274,8 @@ pub enum ControlAction { Graft(Graft), /// The node has been removed from the mesh - Prune control message. Prune(Prune), + /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message. + IDontWant(IDontWant), } /// Node broadcasts known messages per topic - IHave control message. @@ -293,6 +312,13 @@ pub struct Prune { pub(crate) backoff: Option, } +/// The node requests us to not forward message ids - IDontWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IDontWant { + /// A list of known message ids. + pub(crate) message_ids: Vec, +} + /// A Gossipsub RPC message sent. #[derive(Debug)] pub enum RpcOut { @@ -314,6 +340,8 @@ pub enum RpcOut { IHave(IHave), /// Send a IWant control message. IWant(IWant), + /// Send a IDontWant control message. + IDontWant(IDontWant), } impl RpcOut { @@ -374,6 +402,7 @@ impl From for proto::RPC { iwant: vec![], graft: vec![], prune: vec![], + idontwant: vec![], }), }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { @@ -386,6 +415,7 @@ impl From for proto::RPC { }], graft: vec![], prune: vec![], + idontwant: vec![], }), }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { @@ -398,6 +428,7 @@ impl From for proto::RPC { topic_id: Some(topic_hash.into_string()), }], prune: vec![], + idontwant: vec![], }), }, RpcOut::Prune(Prune { @@ -424,9 +455,23 @@ impl From for proto::RPC { .collect(), backoff, }], + idontwant: vec![], }), } } + RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![proto::ControlIDontWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + }), + }, } } } @@ -485,6 +530,7 @@ impl From for proto::RPC { iwant: Vec::new(), graft: Vec::new(), prune: Vec::new(), + idontwant: Vec::new(), }; let empty_control_msg = rpc.control_msgs.is_empty(); @@ -533,6 +579,12 @@ impl From for proto::RPC { }; control.prune.push(rpc_prune); } + ControlAction::IDontWant(IDontWant { message_ids }) => { + let rpc_idontwant = proto::ControlIDontWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }; + control.idontwant.push(rpc_idontwant); + } } } @@ -571,6 +623,7 @@ impl PeerKind { Self::Floodsub => "Floodsub", Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", + Self::Gossipsubv1_2_beta => "Gossipsub v1.2-beta", } } } @@ -657,6 +710,15 @@ impl RpcSender { .map_err(|err| err.into_inner()) } + /// Send a `RpcOut::IWant` message to the `RpcReceiver` + /// this is low priority, if the queue is full an Err is returned. + #[allow(clippy::result_large_err)] + pub(crate) fn idontwant(&mut self, idontwant: IDontWant) -> Result<(), RpcOut> { + self.non_priority_sender + .try_send(RpcOut::IDontWant(idontwant)) + .map_err(|err| err.into_inner()) + } + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn subscribe(&mut self, topic: TopicHash) { diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index c6dbee1d2ed..80187efc103 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -5,6 +5,7 @@ use crate::types::{ }; use crate::{GossipTopic, NetworkConfig}; use futures::future::Either; +use gossipsub; use libp2p::core::{multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed}; use libp2p::identity::{secp256k1, Keypair}; use libp2p::{core, noise, yamux, PeerId, Transport};