-
Notifications
You must be signed in to change notification settings - Fork 974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kad: make bucket size configurable #5389
Comments
I do like the idea of having this configurable as 20 may not always be suited for every use-case (though I do understand its reasoning for being that by default).
We could have this apart of |
Another thing I would like to point out is that the fields in |
Theres also I can imagine we could do something like the following (incomplete and untested patch and diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs
index 302433a8d..a861821c0 100644
--- a/protocols/kad/src/behaviour.rs
+++ b/protocols/kad/src/behaviour.rs
@@ -175,6 +175,7 @@ pub enum StoreInserts {
#[derive(Debug, Clone)]
pub struct Config {
kbucket_pending_timeout: Duration,
+ k_value: NonZeroUsize,
query_config: QueryConfig,
protocol_config: ProtocolConfig,
record_ttl: Option<Duration>,
@@ -218,6 +219,7 @@ impl Config {
pub fn new(protocol_name: StreamProtocol) -> Self {
Config {
kbucket_pending_timeout: Duration::from_secs(60),
+ k_value: K_VALUE,
query_config: QueryConfig::default(),
protocol_config: ProtocolConfig::new(protocol_name),
record_ttl: Some(Duration::from_secs(48 * 60 * 60)),
@@ -483,7 +485,7 @@ where
Behaviour {
store,
caching: config.caching,
- kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
+ kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout, config.k_value),
kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config,
record_filtering: config.record_filtering,
diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs
index 7ed10f7f8..84a3f6690 100644
--- a/protocols/kad/src/kbucket.rs
+++ b/protocols/kad/src/kbucket.rs
@@ -78,6 +78,7 @@ pub use entry::*;
use arrayvec::ArrayVec;
use bucket::KBucket;
use std::collections::VecDeque;
+use std::num::NonZeroUsize;
use std::time::Duration;
use web_time::Instant;
@@ -156,11 +157,11 @@ where
/// The given `pending_timeout` specifies the duration after creation of
/// a [`PendingEntry`] after which it becomes eligible for insertion into
/// a full bucket, replacing the least-recently (dis)connected node.
- pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self {
+ pub(crate) fn new(local_key: TKey, pending_timeout: Duration, k_value: NonZeroUsize) -> Self {
KBucketsTable {
local_key,
buckets: (0..NUM_BUCKETS)
- .map(|_| KBucket::new(pending_timeout))
+ .map(|_| KBucket::new(pending_timeout, k_value))
.collect(),
applied_pending: VecDeque::new(),
}
@@ -535,7 +536,7 @@ mod tests {
fn arbitrary(g: &mut Gen) -> TestTable {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(g.gen_range(1..360));
- let mut table = TestTable::new(local_key.into(), timeout);
+ let mut table = TestTable::new(local_key.into(), timeout, K_VALUE);
let mut num_total = g.gen_range(0..100);
for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() {
let ix = BucketIndex(i);
@@ -560,7 +561,7 @@ mod tests {
fn buckets_are_non_overlapping_and_exhaustive() {
let local_key = Key::from(PeerId::random());
let timeout = Duration::from_secs(0);
- let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout);
+ let mut table = KBucketsTable::<KeyBytes, ()>::new(local_key.into(), timeout, K_VALUE);
let mut prev_max = U256::from(0);
@@ -577,7 +578,7 @@ mod tests {
fn bucket_contains_range() {
fn prop(ix: u8) {
let index = BucketIndex(ix as usize);
- let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0));
+ let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(0), K_VALUE);
let bucket_ref = KBucketRef {
index,
bucket: &mut bucket,
@@ -623,7 +624,7 @@ mod tests {
let local_key = Key::from(PeerId::random());
let other_id = Key::from(PeerId::random());
- let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+ let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
if let Some(Entry::Absent(entry)) = table.entry(&other_id) {
match entry.insert((), NodeStatus::Connected) {
InsertResult::Inserted => (),
@@ -641,7 +642,7 @@ mod tests {
#[test]
fn entry_self() {
let local_key = Key::from(PeerId::random());
- let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+ let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
assert!(table.entry(&local_key).is_none())
}
@@ -649,7 +650,7 @@ mod tests {
#[test]
fn closest() {
let local_key = Key::from(PeerId::random());
- let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5));
+ let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5), K_VALUE);
let mut count = 0;
loop {
if count == 100 {
@@ -684,7 +685,7 @@ mod tests {
#[test]
fn applied_pending() {
let local_key = Key::from(PeerId::random());
- let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1));
+ let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1), K_VALUE);
let expected_applied;
let full_bucket_index;
loop {
diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs
index 1bd4389eb..d5e10e2e7 100644
--- a/protocols/kad/src/kbucket/bucket.rs
+++ b/protocols/kad/src/kbucket/bucket.rs
@@ -25,6 +25,8 @@
//! > buckets in a `KBucketsTable` and hence is enforced by the public API
//! > of the `KBucketsTable` and in particular the public `Entry` API.
+use std::num::NonZeroUsize;
+
use super::*;
pub(crate) use crate::K_VALUE;
/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
@@ -96,7 +98,9 @@ pub(crate) struct Position(usize);
#[derive(Debug, Clone)]
pub(crate) struct KBucket<TKey, TVal> {
/// The nodes contained in the bucket.
- nodes: ArrayVec<Node<TKey, TVal>, { K_VALUE.get() }>,
+ nodes: Vec<Node<TKey, TVal>>,
+
+ k_value: NonZeroUsize,
/// The position (index) in `nodes` that marks the first connected node.
///
@@ -162,9 +166,10 @@ where
TVal: Clone,
{
/// Creates a new `KBucket` with the given timeout for pending entries.
- pub(crate) fn new(pending_timeout: Duration) -> Self {
+ pub(crate) fn new(pending_timeout: Duration, k_value: NonZeroUsize) -> Self {
KBucket {
- nodes: ArrayVec::new(),
+ nodes: Vec::with_capacity(k_value.get()),
+ k_value,
first_connected_pos: None,
pending: None,
pending_timeout,
@@ -205,7 +210,7 @@ where
pub(crate) fn apply_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
if let Some(pending) = self.pending.take() {
if pending.replace <= Instant::now() {
- if self.nodes.is_full() {
+ if self.nodes.len() == self.k_value.get() {
if self.status(Position(0)) == NodeStatus::Connected {
// The bucket is full with connected nodes. Drop the pending node.
return None;
@@ -316,7 +321,7 @@ where
) -> InsertResult<TKey> {
match status {
NodeStatus::Connected => {
- if self.nodes.is_full() {
+ if self.nodes.len() == self.k_value.get() {
if self.first_connected_pos == Some(0) || self.pending.is_some() {
return InsertResult::Full;
} else {
@@ -336,7 +341,7 @@ where
InsertResult::Inserted
}
NodeStatus::Disconnected => {
- if self.nodes.is_full() {
+ if self.nodes.len() == self.k_value.get() {
return InsertResult::Full;
}
if let Some(ref mut p) = self.first_connected_pos {
@@ -435,7 +440,7 @@ mod tests {
impl Arbitrary for KBucket<Key<PeerId>, ()> {
fn arbitrary(g: &mut Gen) -> KBucket<Key<PeerId>, ()> {
let timeout = Duration::from_secs(g.gen_range(1..g.size()) as u64);
- let mut bucket = KBucket::<Key<PeerId>, ()>::new(timeout);
+ let mut bucket = KBucket::<Key<PeerId>, ()>::new(timeout, K_VALUE);
let num_nodes = g.gen_range(1..K_VALUE.get() + 1);
for _ in 0..num_nodes {
let key = Key::from(PeerId::random());
@@ -480,7 +485,7 @@ mod tests {
#[test]
fn ordering() {
fn prop(status: Vec<NodeStatus>) -> bool {
- let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+ let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
// The expected lists of connected and disconnected nodes.
let mut connected = VecDeque::new();
@@ -522,7 +527,7 @@ mod tests {
#[test]
fn full_bucket() {
- let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+ let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
// Fill the bucket with disconnected nodes.
fill_bucket(&mut bucket, NodeStatus::Disconnected);
@@ -590,7 +595,7 @@ mod tests {
#[test]
fn full_bucket_discard_pending() {
- let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1));
+ let mut bucket = KBucket::<Key<PeerId>, ()>::new(Duration::from_secs(1), K_VALUE);
fill_bucket(&mut bucket, NodeStatus::Disconnected);
let (first, _) = bucket.iter().next().unwrap();
let first_disconnected = first.clone(); thoughts @guillaumemichel ? EDIT: |
@dariusc93 I think it makes sense to switch to pub(crate) fn new(local_key: TKey, pending_timeout: Duration, k_value: NonZeroUsize) -> Self { We could even define a proper I was thinking of making the bucket size independent of the |
@guillaumemichel Implementing an external 'peer address cache' behaviour is a fine solution for me as well. But in order for it to work, the addresses discovered by kad lookup need to be somehow shared. As far as I can see, the addresses stored in |
As discussed on |
Making bucket size configurable. Currently `K_VALUE` is used by default, and the only way to change the bucket size is to edit the const. Resolves libp2p#5389 Pull-Request: libp2p#5414.
Description
The bucket size should be customizable to fit the needs of
kad
's users. Currently the bucket size is bound to the staticK_VALUE
which is a constant (20
by default).Motivation
The bucket size is a system parameter that must be adjusted to a network's need.
20
is the default value used in the IPFS DHT, butdiscv5
(not using libp2p) has a bucket size of16
for instance. The bucket size should be configurable.Current Implementation
The current bucket size if defined to be the
K_VALUE
constant also used for other purposes, and the value must be manually modified to change the bucket size, which is far from ideal.This will likely be a breaking change since the
KBucketsTable::new
should probably be modified to accept the bucket size as additional parameter or in aConfig
.Are you planning to do it yourself in a pull request ?
Yes
The text was updated successfully, but these errors were encountered: