From e1b677827240cce5be575f56c07e806aef24297c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 12:26:14 +0300 Subject: [PATCH 01/14] kad/find_node: Extract immutable components to FindNodeConfig Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 86 ++++++++++--------- src/protocol/libp2p/kademlia/query/mod.rs | 38 ++++---- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 04a058a3..969e6bf0 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -32,17 +32,30 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node"; -/// Context for `FIND_NODE` queries. +/// The configuration needed to instantiate a new [`FindNodeContext`]. #[derive(Debug)] -pub struct FindNodeContext>> { +pub struct FindNodeConfig>> { /// Local peer ID. - local_peer_id: PeerId, + pub local_peer_id: PeerId, + + /// Replication factor. + pub replication_factor: usize, + + /// Parallelism factor. + pub parallelism_factor: usize, /// Query ID. pub query: QueryId, /// Target key. pub target: Key, +} + +/// Context for `FIND_NODE` queries. +#[derive(Debug)] +pub struct FindNodeContext>> { + /// Query immutable config. + pub config: FindNodeConfig, /// Peers from whom the `QueryEngine` is waiting to hear a response. pub pending: HashMap, @@ -58,41 +71,25 @@ pub struct FindNodeContext>> { /// Responses. pub responses: BTreeMap, - - /// Replication factor. - pub replication_factor: usize, - - /// Parallelism factor. - pub parallelism_factor: usize, } impl>> FindNodeContext { /// Create new [`FindNodeContext`]. - pub fn new( - local_peer_id: PeerId, - query: QueryId, - target: Key, - in_peers: VecDeque, - replication_factor: usize, - parallelism_factor: usize, - ) -> Self { + pub fn new(config: FindNodeConfig, in_peers: VecDeque) -> Self { let mut candidates = BTreeMap::new(); for candidate in &in_peers { - let distance = target.distance(&candidate.key); + let distance = config.target.distance(&candidate.key); candidates.insert(distance, candidate.clone()); } Self { - query, - target, + config, + candidates, - local_peer_id, pending: HashMap::new(), queried: HashSet::new(), responses: BTreeMap::new(), - replication_factor, - parallelism_factor, } } @@ -117,14 +114,14 @@ impl>> FindNodeContext { // calculate distance for `peer` from target and insert it if // a) the map doesn't have 20 responses // b) it can replace some other peer that has a higher distance - let distance = self.target.distance(&peer.key); + let distance = self.config.target.distance(&peer.key); // always mark the peer as queried to prevent it getting queried again self.queried.insert(peer.peer); // TODO: could this be written in another way? // TODO: only insert nodes from whom a response was received - match self.responses.len() < self.replication_factor { + match self.responses.len() < self.config.replication_factor { true => { self.responses.insert(distance, peer); } @@ -141,11 +138,11 @@ impl>> FindNodeContext { if !self.queried.contains(&candidate.peer) && !self.pending.contains_key(&candidate.peer) { - if self.local_peer_id == candidate.peer { + if self.config.local_peer_id == candidate.peer { continue; } - let distance = self.target.distance(&candidate.key); + let distance = self.config.target.distance(&candidate.key); self.candidates.insert(distance, candidate); } } @@ -154,23 +151,23 @@ impl>> FindNodeContext { /// Get next action for `peer`. pub fn next_peer_action(&mut self, peer: &PeerId) -> Option { self.pending.contains_key(peer).then_some(QueryAction::SendMessage { - query: self.query, + query: self.config.query, peer: *peer, - message: KademliaMessage::find_node(self.target.clone().into_preimage()), + message: KademliaMessage::find_node(self.config.target.clone().into_preimage()), }) } /// Schedule next peer for outbound `FIND_NODE` query. pub fn schedule_next_peer(&mut self) -> QueryAction { - tracing::trace!(target: LOG_TARGET, query = ?self.query, "get next peer"); + tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer"); let (_, candidate) = self.candidates.pop_first().expect("entry to exist"); self.pending.insert(candidate.peer, candidate.clone()); QueryAction::SendMessage { - query: self.query, + query: self.config.query, peer: candidate.peer, - message: KademliaMessage::find_node(self.target.clone().into_preimage()), + message: KademliaMessage::find_node(self.config.target.clone().into_preimage()), } } @@ -179,14 +176,16 @@ impl>> FindNodeContext { pub fn next_action(&mut self) -> Option { // we didn't receive any responses and there are no candidates or pending queries left. if self.responses.is_empty() && self.pending.is_empty() && self.candidates.is_empty() { - return Some(QueryAction::QueryFailed { query: self.query }); + return Some(QueryAction::QueryFailed { + query: self.config.query, + }); } // there are still possible peers to query or peers who are being queried - if self.responses.len() < self.replication_factor + if self.responses.len() < self.config.replication_factor && (!self.pending.is_empty() || !self.candidates.is_empty()) { - if self.pending.len() == self.parallelism_factor || self.candidates.is_empty() { + if self.pending.len() == self.config.parallelism_factor || self.candidates.is_empty() { return None; } @@ -195,7 +194,9 @@ impl>> FindNodeContext { // query succeeded with one or more results if self.pending.is_empty() && self.candidates.is_empty() { - return Some(QueryAction::QuerySucceeded { query: self.query }); + return Some(QueryAction::QuerySucceeded { + query: self.config.query, + }); } // check if any candidate has lower distance thant the current worst @@ -203,22 +204,27 @@ impl>> FindNodeContext { // entries if !self.candidates.is_empty() { let first_candidate_distance = self + .config .target .distance(&self.candidates.first_key_value().expect("candidate to exist").1.key); let worst_response_candidate = *self.responses.last_entry().expect("response to exist").key(); if first_candidate_distance < worst_response_candidate - && self.pending.len() < self.parallelism_factor + && self.pending.len() < self.config.parallelism_factor { return Some(self.schedule_next_peer()); } - return Some(QueryAction::QuerySucceeded { query: self.query }); + return Some(QueryAction::QuerySucceeded { + query: self.config.query, + }); } - if self.responses.len() == self.replication_factor { - return Some(QueryAction::QuerySucceeded { query: self.query }); + if self.responses.len() == self.config.replication_factor { + return Some(QueryAction::QuerySucceeded { + query: self.config.query, + }); } tracing::error!( diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index f0287325..f29af805 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -22,7 +22,7 @@ use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, query::{ - find_node::FindNodeContext, + find_node::{FindNodeConfig, FindNodeContext}, get_record::{GetRecordConfig, GetRecordContext}, }, record::{Key as RecordKey, Record}, @@ -190,17 +190,19 @@ impl QueryEngine { "start `FIND_NODE` query" ); + let target = Key::from(target); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + self.queries.insert( query_id, QueryType::FindNode { - context: FindNodeContext::new( - self.local_peer_id, - query_id, - Key::from(target), - candidates, - self.replication_factor, - self.parallelism_factor, - ), + context: FindNodeContext::new(config, candidates), }, ); @@ -223,19 +225,19 @@ impl QueryEngine { ); let target = Key::new(record.key.clone()); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; self.queries.insert( query_id, QueryType::PutRecord { record, - context: FindNodeContext::new( - self.local_peer_id, - query_id, - target, - candidates, - self.replication_factor, - self.parallelism_factor, - ), + context: FindNodeContext::new(config, candidates), }, ); @@ -386,7 +388,7 @@ impl QueryEngine { match self.queries.remove(&query).expect("query to exist") { QueryType::FindNode { context } => QueryAction::FindNodeQuerySucceeded { query, - target: context.target.into_preimage(), + target: context.config.target.into_preimage(), peers: context.responses.into_values().collect::>(), }, QueryType::PutRecord { record, context } => QueryAction::PutRecordToFoundNodes { From d850c52432adf10119a7e4617ec16773a6f006b4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 12:39:59 +0300 Subject: [PATCH 02/14] kad/find_node: Refactor register_response Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 969e6bf0..39afe74c 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -119,32 +119,39 @@ impl>> FindNodeContext { // always mark the peer as queried to prevent it getting queried again self.queried.insert(peer.peer); - // TODO: could this be written in another way? - // TODO: only insert nodes from whom a response was received - match self.responses.len() < self.config.replication_factor { - true => { - self.responses.insert(distance, peer); - } - false => { - let mut entry = self.responses.last_entry().expect("entry to exist"); + if self.responses.len() < self.config.replication_factor { + self.responses.insert(distance, peer); + } else { + // Update the last entry if the peer is closer. + if let Some(mut entry) = self.responses.last_entry() { if entry.key() > &distance { entry.insert(peer); } } } - // filter already queried peers and extend the set of candidates - for candidate in peers { - if !self.queried.contains(&candidate.peer) - && !self.pending.contains_key(&candidate.peer) - { - if self.config.local_peer_id == candidate.peer { - continue; - } + let to_query_candidate = peers.into_iter().filter_map(|peer| { + // Peer already produced a response. + if self.queried.contains(&peer.peer) { + return None; + } - let distance = self.config.target.distance(&candidate.key); - self.candidates.insert(distance, candidate); + // Peer was queried, awaiting response. + if self.pending.contains_key(&peer.peer) { + return None; } + + // Local node. + if self.config.local_peer_id == peer.peer { + return None; + } + + Some(peer) + }); + + for candidate in to_query_candidate { + let distance = self.config.target.distance(&candidate.key); + self.candidates.insert(distance, candidate); } } From 0cadf30b40e55edaea763877208b97728db8604b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 12:46:47 +0300 Subject: [PATCH 03/14] kad/find_node: Refactor schedule_next_peer Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_node.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 39afe74c..a6fa1a26 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -165,17 +165,18 @@ impl>> FindNodeContext { } /// Schedule next peer for outbound `FIND_NODE` query. - pub fn schedule_next_peer(&mut self) -> QueryAction { + fn schedule_next_peer(&mut self) -> Option { tracing::trace!(target: LOG_TARGET, query = ?self.config.query, "get next peer"); - let (_, candidate) = self.candidates.pop_first().expect("entry to exist"); + let (_, candidate) = self.candidates.pop_first()?; + self.pending.insert(candidate.peer, candidate.clone()); - QueryAction::SendMessage { + Some(QueryAction::SendMessage { query: self.config.query, peer: candidate.peer, message: KademliaMessage::find_node(self.config.target.clone().into_preimage()), - } + }) } /// Get next action for a `FIND_NODE` query. @@ -196,7 +197,7 @@ impl>> FindNodeContext { return None; } - return Some(self.schedule_next_peer()); + return self.schedule_next_peer(); } // query succeeded with one or more results @@ -220,7 +221,7 @@ impl>> FindNodeContext { if first_candidate_distance < worst_response_candidate && self.pending.len() < self.config.parallelism_factor { - return Some(self.schedule_next_peer()); + return self.schedule_next_peer(); } return Some(QueryAction::QuerySucceeded { From 9bc1577a940545967cd8e2dbf7c54485c1667da9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 18:29:41 +0300 Subject: [PATCH 04/14] kad/find_node: Refactor next_action Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 96 ++++++++----------- 1 file changed, 42 insertions(+), 54 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index a6fa1a26..24814e4c 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -122,7 +122,7 @@ impl>> FindNodeContext { if self.responses.len() < self.config.replication_factor { self.responses.insert(distance, peer); } else { - // Update the last entry if the peer is closer. + // Update the furthest peer if this response is closer. if let Some(mut entry) = self.responses.last_entry() { if entry.key() > &distance { entry.insert(peer); @@ -179,71 +179,59 @@ impl>> FindNodeContext { }) } + /// Check if the query cannot make any progress. + /// + /// Returns true when there are no pending responses and no candidates to query. + fn is_done(&self) -> bool { + self.pending.is_empty() && self.candidates.is_empty() + } + /// Get next action for a `FIND_NODE` query. - // TODO: refactor this function pub fn next_action(&mut self) -> Option { - // we didn't receive any responses and there are no candidates or pending queries left. - if self.responses.is_empty() && self.pending.is_empty() && self.candidates.is_empty() { - return Some(QueryAction::QueryFailed { - query: self.config.query, - }); + // If we cannot make progress, return the final result. + // A query failed when we are not able to identify one single peer. + if self.is_done() { + return if self.responses.is_empty() { + Some(QueryAction::QueryFailed { + query: self.config.query, + }) + } else { + Some(QueryAction::QuerySucceeded { + query: self.config.query, + }) + }; } - // there are still possible peers to query or peers who are being queried - if self.responses.len() < self.config.replication_factor - && (!self.pending.is_empty() || !self.candidates.is_empty()) - { - if self.pending.len() == self.config.parallelism_factor || self.candidates.is_empty() { - return None; - } - - return self.schedule_next_peer(); + // At this point, we either have pending responses or candidates to query; and we need more + // results. Ensure we do not exceed the parallelism factor. + if self.pending.len() == self.config.parallelism_factor { + return None; } - // query succeeded with one or more results - if self.pending.is_empty() && self.candidates.is_empty() { - return Some(QueryAction::QuerySucceeded { - query: self.config.query, - }); + // Schedule the next peer to fill up the responses. + if self.responses.len() < self.config.replication_factor { + return self.schedule_next_peer(); } - // check if any candidate has lower distance thant the current worst - // `expect()` is ok because both `candidates` and `responses` have been confirmed to contain - // entries - if !self.candidates.is_empty() { - let first_candidate_distance = self - .config - .target - .distance(&self.candidates.first_key_value().expect("candidate to exist").1.key); - let worst_response_candidate = - *self.responses.last_entry().expect("response to exist").key(); - - if first_candidate_distance < worst_response_candidate - && self.pending.len() < self.config.parallelism_factor - { - return self.schedule_next_peer(); + // We can finish the query here, but check if there is a better candidate for the query. + match ( + self.candidates.first_key_value(), + self.responses.last_key_value(), + ) { + (Some((_, candidate_peer)), Some((worst_response_distance, _))) => { + let first_candidate_distance = self.config.target.distance(&candidate_peer.key); + if first_candidate_distance < *worst_response_distance { + return self.schedule_next_peer(); + } } - return Some(QueryAction::QuerySucceeded { - query: self.config.query, - }); - } - - if self.responses.len() == self.config.replication_factor { - return Some(QueryAction::QuerySucceeded { - query: self.config.query, - }); + _ => (), } - tracing::error!( - target: LOG_TARGET, - candidates_len = ?self.candidates.len(), - pending_len = ?self.pending.len(), - responses_len = ?self.responses.len(), - "unhandled state" - ); - - unreachable!(); + // We have found enough responses and there are no better candidates to query. + Some(QueryAction::QuerySucceeded { + query: self.config.query, + }) } } From 9ff2b6b0c4df7b75b36b0720b2d0868485b042e3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 18:48:52 +0300 Subject: [PATCH 05/14] kad/find_node: Cache kad message Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_node.rs | 12 ++++++++++-- src/protocol/libp2p/kademlia/query/get_record.rs | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 24814e4c..ac53faab 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use bytes::Bytes; + use crate::{ protocol::libp2p::kademlia::{ message::KademliaMessage, @@ -57,6 +59,9 @@ pub struct FindNodeContext>> { /// Query immutable config. pub config: FindNodeConfig, + /// Cached Kademlia message to send. + kad_message: Bytes, + /// Peers from whom the `QueryEngine` is waiting to hear a response. pub pending: HashMap, @@ -83,8 +88,11 @@ impl>> FindNodeContext { candidates.insert(distance, candidate.clone()); } + let kad_message = KademliaMessage::find_node(config.target.clone().into_preimage()); + Self { config, + kad_message, candidates, pending: HashMap::new(), @@ -160,7 +168,7 @@ impl>> FindNodeContext { self.pending.contains_key(peer).then_some(QueryAction::SendMessage { query: self.config.query, peer: *peer, - message: KademliaMessage::find_node(self.config.target.clone().into_preimage()), + message: self.kad_message.clone(), }) } @@ -175,7 +183,7 @@ impl>> FindNodeContext { Some(QueryAction::SendMessage { query: self.config.query, peer: candidate.peer, - message: KademliaMessage::find_node(self.config.target.clone().into_preimage()), + message: self.kad_message.clone(), }) } diff --git a/src/protocol/libp2p/kademlia/query/get_record.rs b/src/protocol/libp2p/kademlia/query/get_record.rs index fb5004ca..d5b32cf1 100644 --- a/src/protocol/libp2p/kademlia/query/get_record.rs +++ b/src/protocol/libp2p/kademlia/query/get_record.rs @@ -85,7 +85,7 @@ pub struct GetRecordContext { /// Query immutable config. pub config: GetRecordConfig, - /// Cached Kadmelia message to send. + /// Cached Kademlia message to send. kad_message: Bytes, /// Peers from whom the `QueryEngine` is waiting to hear a response. From 70d20f821e3396560164f8dc6f02f45db5bbe10a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 18:53:42 +0300 Subject: [PATCH 06/14] find_node/tests: Check query completes on empty candidates Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index ac53faab..9c8f8dd6 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -243,4 +243,26 @@ impl>> FindNodeContext { } } -// TODO: tests +#[cfg(test)] +mod tests { + use super::*; + + fn default_config() -> FindNodeConfig> { + FindNodeConfig { + local_peer_id: PeerId::random(), + replication_factor: 20, + parallelism_factor: 10, + query: QueryId(0), + target: Key::new(vec![1, 2, 3].into()), + } + } + + #[test] + fn completes_when_no_candidates() { + let config = default_config(); + let mut context = FindNodeContext::new(config, VecDeque::new()); + assert!(context.is_done()); + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) }); + } +} From 4c956aa4877fa6e1305450bfa407f00e96da36b8 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 18:58:01 +0300 Subject: [PATCH 07/14] find_node/tests: Check the parallelism is fulfilled Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 9c8f8dd6..1e815baf 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -246,6 +246,7 @@ impl>> FindNodeContext { #[cfg(test)] mod tests { use super::*; + use crate::protocol::libp2p::kademlia::types::ConnectionType; fn default_config() -> FindNodeConfig> { FindNodeConfig { @@ -257,6 +258,15 @@ mod tests { } } + fn peer_to_kad(peer: PeerId) -> KademliaPeer { + KademliaPeer { + peer, + key: Key::from(peer), + addresses: vec![], + connection: ConnectionType::Connected, + } + } + #[test] fn completes_when_no_candidates() { let config = default_config(); @@ -265,4 +275,35 @@ mod tests { let event = context.next_action().unwrap(); assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) }); } + + #[test] + fn fulfill_parallelism() { + let config = FindNodeConfig { + parallelism_factor: 3, + ..default_config() + }; + + let in_peers_set = (0..3).map(|_| PeerId::random()).collect::>(); + let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect(); + let mut context = FindNodeContext::new(config, in_peers); + + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(in_peers_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Fulfilled parallelism. + assert!(context.next_action().is_none()); + } } From 8d776d90aa8c5f6bb39e62d45ddf939d179a5803 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 19:03:19 +0300 Subject: [PATCH 08/14] find_node/tests: Check the query ends on the number of responses Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 1e815baf..c6873d5b 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -306,4 +306,84 @@ mod tests { // Fulfilled parallelism. assert!(context.next_action().is_none()); } + + #[test] + fn completes_when_responses() { + let config = FindNodeConfig { + parallelism_factor: 3, + replication_factor: 3, + ..default_config() + }; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + let in_peers_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect(); + assert_eq!(in_peers_set.len(), 3); + + let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect(); + let mut context = FindNodeContext::new(config, in_peers); + + // Schedule peer queries. + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(in_peers_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Checks a failed query that was not initiated. + let peer_d = PeerId::random(); + context.register_response_failure(peer_d); + assert_eq!(context.pending.len(), 3); + assert!(context.queried.is_empty()); + + // Provide responses back. + context.register_response(peer_a, vec![]); + assert_eq!(context.pending.len(), 2); + assert_eq!(context.queried.len(), 1); + assert_eq!(context.responses.len(), 1); + + // Provide different response from peer b with peer d as candidate. + context.register_response(peer_b, vec![peer_to_kad(peer_d.clone())]); + assert_eq!(context.pending.len(), 1); + assert_eq!(context.queried.len(), 2); + assert_eq!(context.responses.len(), 2); + assert_eq!(context.candidates.len(), 1); + + // Peer C fails. + context.register_response_failure(peer_c); + assert!(context.pending.is_empty()); + assert_eq!(context.queried.len(), 3); + assert_eq!(context.responses.len(), 2); + + // Drain the last candidate. + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 1); + assert_eq!(peer, peer_d); + } + _ => panic!("Unexpected event"), + } + + // Peer D responds. + context.register_response(peer_d, vec![]); + + // Produces the result. + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); + } } From e0ce49d9132844fd7d1f6504a28da6a32519d18c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 19:16:36 +0300 Subject: [PATCH 09/14] find_node/tests: Produce the closest responses without further candidates Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index c6873d5b..2f2a83dd 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -386,4 +386,50 @@ mod tests { let event = context.next_action().unwrap(); assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); } + + #[test] + fn offers_closest_responses() { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let target = PeerId::random(); + + let distance_a = Key::from(peer_a).distance(&Key::from(target)); + let distance_b = Key::from(peer_b).distance(&Key::from(target)); + let (closest, furthest) = if distance_a < distance_b { + (peer_a, peer_b) + } else { + (peer_b, peer_a) + }; + + let config = FindNodeConfig { + parallelism_factor: 1, + replication_factor: 1, + target: Key::from(target), + local_peer_id: PeerId::random(), + query: QueryId(0), + }; + + // Scenario where we should return with the number of responses. + let in_peers = vec![peer_to_kad(furthest), peer_to_kad(closest)]; + let mut context = FindNodeContext::new(config, in_peers.into_iter().collect()); + + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 1); + assert!(context.pending.contains_key(&peer)); + + // The closest should be queried first regardless of the input order. + assert_eq!(closest, peer); + } + _ => panic!("Unexpected event"), + } + + context.register_response(closest, vec![]); + + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); + } } From e81a30481575ba13c07bf9f763be23829d56e906 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 19:24:57 +0300 Subject: [PATCH 10/14] find_node/tests: Continue the query on better candidates Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 98 +++++++++++++++---- 1 file changed, 77 insertions(+), 21 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 2f2a83dd..5e2820d3 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -35,7 +35,7 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node"; /// The configuration needed to instantiate a new [`FindNodeContext`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FindNodeConfig>> { /// Local peer ID. pub local_peer_id: PeerId, @@ -267,6 +267,31 @@ mod tests { } } + fn setup_closest_responses() -> (PeerId, PeerId, FindNodeConfig) { + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let target = PeerId::random(); + + let distance_a = Key::from(peer_a).distance(&Key::from(target)); + let distance_b = Key::from(peer_b).distance(&Key::from(target)); + + let (closest, furthest) = if distance_a < distance_b { + (peer_a, peer_b) + } else { + (peer_b, peer_a) + }; + + let config = FindNodeConfig { + parallelism_factor: 1, + replication_factor: 1, + target: Key::from(target), + local_peer_id: PeerId::random(), + query: QueryId(0), + }; + + (closest, furthest, config) + } + #[test] fn completes_when_no_candidates() { let config = default_config(); @@ -389,29 +414,11 @@ mod tests { #[test] fn offers_closest_responses() { - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - let target = PeerId::random(); - - let distance_a = Key::from(peer_a).distance(&Key::from(target)); - let distance_b = Key::from(peer_b).distance(&Key::from(target)); - let (closest, furthest) = if distance_a < distance_b { - (peer_a, peer_b) - } else { - (peer_b, peer_a) - }; - - let config = FindNodeConfig { - parallelism_factor: 1, - replication_factor: 1, - target: Key::from(target), - local_peer_id: PeerId::random(), - query: QueryId(0), - }; + let (closest, furthest, config) = setup_closest_responses(); // Scenario where we should return with the number of responses. let in_peers = vec![peer_to_kad(furthest), peer_to_kad(closest)]; - let mut context = FindNodeContext::new(config, in_peers.into_iter().collect()); + let mut context = FindNodeContext::new(config.clone(), in_peers.into_iter().collect()); let event = context.next_action().unwrap(); match event { @@ -432,4 +439,53 @@ mod tests { let event = context.next_action().unwrap(); assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); } + + #[test] + fn offers_closest_responses_with_better_candidates() { + let (closest, furthest, config) = setup_closest_responses(); + + println!("Closest: {:?}, Furthest: {:?}", closest, furthest); + + // Scenario where the query is fulfilled however it continues because + // there is a closer peer to query. + let in_peers = vec![peer_to_kad(furthest)]; + let mut context = FindNodeContext::new(config, in_peers.into_iter().collect()); + + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 1); + assert!(context.pending.contains_key(&peer)); + + // Furthest is the only peer available. + assert_eq!(furthest, peer); + } + _ => panic!("Unexpected event"), + } + + // Furthest node produces a response with the closest node. + // Even if we reach a total of 1 (parallelism factor) replies, we should continue. + context.register_response(furthest, vec![peer_to_kad(closest)]); + + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 1); + assert!(context.pending.contains_key(&peer)); + + // Furthest provided another peer that is closer. + assert_eq!(closest, peer); + } + _ => panic!("Unexpected event"), + } + + context.register_response(closest, vec![]); + + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); + } } From 51ca1aa838d199c4c1f06b66a5f597e1406d60d5 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 May 2024 19:27:33 +0300 Subject: [PATCH 11/14] find_node/tests: Ensure the query awaits for pending better responses Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_node.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 5e2820d3..c5d2d706 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -444,8 +444,6 @@ mod tests { fn offers_closest_responses_with_better_candidates() { let (closest, furthest, config) = setup_closest_responses(); - println!("Closest: {:?}, Furthest: {:?}", closest, furthest); - // Scenario where the query is fulfilled however it continues because // there is a closer peer to query. let in_peers = vec![peer_to_kad(furthest)]; @@ -483,6 +481,11 @@ mod tests { _ => panic!("Unexpected event"), } + // Even if we have the total number of responses, we have at least one + // inflight query which might be closer to the target. + assert!(context.next_action().is_none()); + + // Query finishes when receiving the response back. context.register_response(closest, vec![]); let event = context.next_action().unwrap(); From 587b92e35c4825ce823ad7bb7c3ceba4033816c3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 May 2024 14:05:56 +0300 Subject: [PATCH 12/14] kad/find_node: Keep the best 20 responses and don't update last entry Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_node.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index c5d2d706..fab98d62 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -131,9 +131,18 @@ impl>> FindNodeContext { self.responses.insert(distance, peer); } else { // Update the furthest peer if this response is closer. - if let Some(mut entry) = self.responses.last_entry() { - if entry.key() > &distance { - entry.insert(peer); + // Find the furthest distance. + let furthest_distance = + self.responses.last_entry().map(|entry| *entry.key()).unwrap_or(distance); + + // The response received from the peer is closer than the furthest response. + if distance < furthest_distance { + // Update the entries only if the distance is not already present. + if !self.responses.contains_key(&distance) { + self.responses.insert(distance, peer); + + // Remove the furthest entry. + self.responses.pop_last(); } } } From e946c69df9ae147962a6229487bd3705996611ef Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 May 2024 14:44:02 +0300 Subject: [PATCH 13/14] find_node/tests: Check we keep an window of the best peers, not just the last entry Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index fab98d62..f77ce79a 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -500,4 +500,77 @@ mod tests { let event = context.next_action().unwrap(); assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); } + + #[test] + fn keep_k_best_results() { + let mut peers = (0..6).map(|_| PeerId::random()).collect::>(); + let target = Key::from(PeerId::random()); + // Sort the peers by their distance to the target in descending order. + peers.sort_by_key(|peer| std::cmp::Reverse(target.distance(&Key::from(*peer)))); + + let config = FindNodeConfig { + parallelism_factor: 3, + replication_factor: 3, + target, + local_peer_id: PeerId::random(), + query: QueryId(0), + }; + + let in_peers = vec![peers[0], peers[1], peers[2]] + .iter() + .map(|peer| peer_to_kad(*peer)) + .collect(); + let mut context = FindNodeContext::new(config, in_peers); + + // Schedule peer queries. + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Each peer responds with a better (closer) peer. + context.register_response(peers[0], vec![peer_to_kad(peers[3])]); + context.register_response(peers[1], vec![peer_to_kad(peers[4])]); + context.register_response(peers[2], vec![peer_to_kad(peers[5])]); + + // Must schedule better peers. + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + context.register_response(peers[3], vec![]); + context.register_response(peers[4], vec![]); + context.register_response(peers[5], vec![]); + + // Produces the result. + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); + + // Because the FindNode query keeps a window of the best K (3 in this case) peers, + // we expect to produce the best K peers. As opposed to having only the last entry + // updated, which would have produced [peer[0], peer[1], peer[5]]. + + // Check the responses. + let responses = context.responses.values().map(|peer| peer.peer).collect::>(); + // Note: peers are returned in order closest to the target, our `peers` input is sorted in + // decreasing order. + assert_eq!(responses, [peers[5], peers[4], peers[3]]); + } } From 19f8c6e7b8e63e0e9b51332fb34e8cd15f8e11b9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 May 2024 13:48:57 +0300 Subject: [PATCH 14/14] find_node: Replace entry with the new peer Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/query/find_node.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index f77ce79a..3556b90b 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -137,11 +137,10 @@ impl>> FindNodeContext { // The response received from the peer is closer than the furthest response. if distance < furthest_distance { - // Update the entries only if the distance is not already present. - if !self.responses.contains_key(&distance) { - self.responses.insert(distance, peer); + self.responses.insert(distance, peer); - // Remove the furthest entry. + // Remove the furthest entry. + if self.responses.len() > self.config.replication_factor { self.responses.pop_last(); } }