Skip to content

Commit

Permalink
feat(kad): configurable bucket size
Browse files Browse the repository at this point in the history
Making bucket size configurable. Currently `K_VALUE` is used by default, and the only way to change the bucket size is to edit the const.

Resolves libp2p#5389

Pull-Request: libp2p#5414.
  • Loading branch information
guillaumemichel authored and TimTinkers committed Sep 14, 2024
1 parent 3b89041 commit 29ac63f
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 52 deletions.
8 changes: 5 additions & 3 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
- Included multiaddresses of found peers alongside peer IDs in `GetClosestPeers` query results.
See [PR 5475](https://github.com/libp2p/rust-libp2p/pull/5475)
- Changed `FIND_NODE` response: now includes a list of closest peers when querying the recipient peer ID. Previously, this request yielded an empty response.
See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270)
- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451)
See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230)
See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270).
- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451).
See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230).
- Use default dial conditions more consistently.
See [PR 4957](https://github.com/libp2p/rust-libp2p/pull/4957)
- QueryClose progress whenever closer in range, instead of having to be the closest.
Expand All @@ -19,6 +19,8 @@
See [PR 5148](https://github.com/libp2p/rust-libp2p/pull/5148).
- Derive `Copy` for `kbucket::key::Key<T>`.
See [PR 5317](https://github.com/libp2p/rust-libp2p/pull/5317).
`KBucket` size can now be modified without changing the `K_VALUE`.
See [PR 5414](https://github.com/libp2p/rust-libp2p/pull/5414).
- Use `web-time` instead of `instant`.
See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347).
<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
26 changes: 22 additions & 4 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod test;

use crate::addresses::Addresses;
use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId};
use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus};
use crate::kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus};
use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig};
use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState};
use crate::record::{
Expand Down Expand Up @@ -172,7 +172,7 @@ pub enum StoreInserts {
/// The configuration is consumed by [`Behaviour::new`].
#[derive(Debug, Clone)]
pub struct Config {
kbucket_pending_timeout: Duration,
kbucket_config: KBucketConfig,
query_config: QueryConfig,
protocol_config: ProtocolConfig,
record_ttl: Option<Duration>,
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Config {
/// Builds a new `Config` with the given protocol name.
pub fn new(protocol_name: StreamProtocol) -> Self {
Config {
kbucket_pending_timeout: Duration::from_secs(60),
kbucket_config: KBucketConfig::default(),
query_config: QueryConfig::default(),
protocol_config: ProtocolConfig::new(protocol_name),
record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
Expand Down Expand Up @@ -424,6 +424,24 @@ impl Config {
self
}

/// Sets the configuration for the k-buckets.
///
/// * Default to K_VALUE.
pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self {
self.kbucket_config.set_bucket_size(size);
self
}

/// Sets the timeout duration after creation of a pending entry after which
/// it becomes eligible for insertion into a full bucket, replacing the
/// least-recently (dis)connected node.
///
/// * Default to `60` s.
pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self {
self.kbucket_config.set_pending_timeout(timeout);
self
}

/// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table.
/// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time".
/// This also allows to wait a little bit for other potential peers to be inserted into the routing table before
Expand Down Expand Up @@ -481,7 +499,7 @@ where
Behaviour {
store,
caching: config.caching,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbuckets: KBucketsTable::new(local_key, config.kbucket_config),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
record_filtering: config.record_filtering,
Expand Down
91 changes: 68 additions & 23 deletions protocols/kad/src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,58 @@ mod key;
pub use bucket::NodeStatus;
pub use entry::*;

use arrayvec::ArrayVec;
use bucket::KBucket;
use std::collections::VecDeque;
use std::num::NonZeroUsize;
use std::time::Duration;
use web_time::Instant;

/// Maximum number of k-buckets.
const NUM_BUCKETS: usize = 256;

/// The configuration for `KBucketsTable`.
#[derive(Debug, Clone, Copy)]
pub(crate) struct KBucketConfig {
/// Maximal number of nodes that a bucket can contain.
bucket_size: usize,
/// Specifies the duration after creation of a [`PendingEntry`] after which
/// it becomes eligible for insertion into a full bucket, replacing the
/// least-recently (dis)connected node.
pending_timeout: Duration,
}

impl Default for KBucketConfig {
fn default() -> Self {
KBucketConfig {
bucket_size: K_VALUE.get(),
pending_timeout: Duration::from_secs(60),
}
}
}

impl KBucketConfig {
/// Modifies the maximal number of nodes that a bucket can contain.
pub(crate) fn set_bucket_size(&mut self, bucket_size: NonZeroUsize) {
self.bucket_size = bucket_size.get();
}

/// Modifies the duration after creation of a [`PendingEntry`] after which
/// it becomes eligible for insertion into a full bucket, replacing the
/// least-recently (dis)connected node.
pub(crate) fn set_pending_timeout(&mut self, pending_timeout: Duration) {
self.pending_timeout = pending_timeout;
}
}

/// A `KBucketsTable` represents a Kademlia routing table.
#[derive(Debug, Clone)]
pub(crate) struct KBucketsTable<TKey, TVal> {
/// The key identifying the local peer that owns the routing table.
local_key: TKey,
/// The buckets comprising the routing table.
buckets: Vec<KBucket<TKey, TVal>>,
/// The maximal number of nodes that a bucket can contain.
bucket_size: usize,
/// The list of evicted entries that have been replaced with pending
/// entries since the last call to [`KBucketsTable::take_applied_pending`].
applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
Expand Down Expand Up @@ -151,17 +187,12 @@ where
TVal: Clone,
{
/// Creates a new, empty Kademlia routing table with entries partitioned
/// into buckets as per the Kademlia protocol.
///
/// The given `pending_timeout` specifies the duration after creation of
/// a [`PendingEntry`] after which it becomes eligible for insertion into
/// a full bucket, replacing the least-recently (dis)connected node.
pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self {
/// into buckets as per the Kademlia protocol using the provided config.
pub(crate) fn new(local_key: TKey, config: KBucketConfig) -> Self {
KBucketsTable {
local_key,
buckets: (0..NUM_BUCKETS)
.map(|_| KBucket::new(pending_timeout))
.collect(),
buckets: (0..NUM_BUCKETS).map(|_| KBucket::new(config)).collect(),
bucket_size: config.bucket_size,
applied_pending: VecDeque::new(),
}
}
Expand Down Expand Up @@ -247,13 +278,16 @@ where
T: AsRef<KeyBytes>,
{
let distance = self.local_key.as_ref().distance(target);
let bucket_size = self.bucket_size;
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<TKey, _>| -> ArrayVec<_, { K_VALUE.get() }> {
b.iter().map(|(n, _)| n.key.clone()).collect()
fmap: move |b: &KBucket<TKey, _>| -> Vec<_> {
let mut vec = Vec::with_capacity(bucket_size);
vec.extend(b.iter().map(|(n, _)| n.key.clone()));
vec
},
}
}
Expand All @@ -269,13 +303,15 @@ where
TVal: Clone,
{
let distance = self.local_key.as_ref().distance(target);
let bucket_size = self.bucket_size;
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_, { K_VALUE.get() }> {
fmap: move |b: &KBucket<_, TVal>| -> Vec<_> {
b.iter()
.take(bucket_size)
.map(|(n, status)| EntryView {
node: n.clone(),
status,
Expand Down Expand Up @@ -324,7 +360,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
/// distance of the local key to the target.
buckets_iter: ClosestBucketsIter,
/// The iterator over the entries in the currently traversed bucket.
iter: Option<arrayvec::IntoIter<TOut, { K_VALUE.get() }>>,
iter: Option<std::vec::IntoIter<TOut>>,
/// The projection function / mapping applied on each bucket as
/// it is encountered, producing the next `iter`ator.
fmap: TMap,
Expand Down Expand Up @@ -429,7 +465,7 @@ where
TTarget: AsRef<KeyBytes>,
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
TMap: Fn(&KBucket<TKey, TVal>) -> ArrayVec<TOut, { K_VALUE.get() }>,
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
TOut: AsRef<KeyBytes>,
{
type Item = TOut;
Expand Down Expand Up @@ -535,11 +571,14 @@ mod tests {
fn arbitrary(g: &mut Gen) -> TestTable {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(g.gen_range(1..360));
let mut table = TestTable::new(local_key.into(), timeout);
let mut config = KBucketConfig::default();
config.set_pending_timeout(timeout);
let bucket_size = config.bucket_size;
let mut table = TestTable::new(local_key.into(), config);
let mut num_total = g.gen_range(0..100);
for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
let ix = BucketIndex(i);
let num = g.gen_range(0..usize::min(K_VALUE.get(), num_total) + 1);
let num = g.gen_range(0..usize::min(bucket_size, num_total) + 1);
num_total -= num;
for _ in 0..num {
let distance = ix.rand_distance(&mut rand::thread_rng());
Expand All @@ -560,7 +599,9 @@ mod tests {
fn buckets_are_non_overlapping_and_exhaustive() {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(0);
let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
let mut config = KBucketConfig::default();
config.set_pending_timeout(timeout);
let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), config);

let mut prev_max = U256::from(0);

Expand All @@ -577,7 +618,9 @@ mod tests {
fn bucket_contains_range() {
fn prop(ix: u8) {
let index = BucketIndex(ix as usize);
let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
let mut config = KBucketConfig::default();
config.set_pending_timeout(Duration::from_secs(0));
let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
let bucket_ref = KBucketRef {
index,
bucket: &mut bucket,
Expand Down Expand Up @@ -623,7 +666,7 @@ mod tests {
let local_key = Key::from(PeerId::random());
let other_id = Key::from(PeerId::random());

let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
match entry.insert((), NodeStatus::Connected) {
InsertResult::Inserted => (),
Expand All @@ -641,15 +684,15 @@ mod tests {
#[test]
fn entry_self() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());

assert!(table.entry(&local_key).is_none())
}

#[test]
fn closest() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default());
let mut count = 0;
loop {
if count == 100 {
Expand Down Expand Up @@ -684,7 +727,9 @@ mod tests {
#[test]
fn applied_pending() {
let local_key = Key::from(PeerId::random());
let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1));
let mut config = KBucketConfig::default();
config.set_pending_timeout(Duration::from_millis(1));
let mut table = KBucketsTable::<_, ()>::new(local_key, config);
let expected_applied;
let full_bucket_index;
loop {
Expand Down
Loading

0 comments on commit 29ac63f

Please sign in to comment.