diff --git a/package.json b/package.json index 488104c2..02d27046 100644 --- a/package.json +++ b/package.json @@ -201,6 +201,7 @@ "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", "p-retry": "^5.0.0", + "p-wait-for": "^5.0.0", "protons": "^5.1.0", "sinon": "^14.0.0", "ts-sinon": "^2.0.2", diff --git a/src/routing-table/index.ts b/src/routing-table/index.ts index bd3ade4b..b34adc0c 100644 --- a/src/routing-table/index.ts +++ b/src/routing-table/index.ts @@ -8,6 +8,13 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { Logger } from '@libp2p/logger' import { Components, Initializable } from '@libp2p/components' +import { PeerSet } from '@libp2p/peer-collections' + +export const KAD_CLOSE_TAG_NAME = 'kad-close' +export const KAD_CLOSE_TAG_VALUE = 50 +export const KBUCKET_SIZE = 20 +export const PING_TIMEOUT = 10000 +export const PING_CONCURRENCY = 10 export interface KBucketPeer { id: Uint8Array @@ -22,10 +29,20 @@ export interface KBucket { right: KBucket } +interface KBucketTreeEvents { + 'ping': (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void + 'added': (contact: KBucketPeer) => void + 'removed': (contact: KBucketPeer) => void +} + export interface KBucketTree { root: KBucket localNodeId: Uint8Array - on: (event: 'ping', callback: (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void) => void + + on: ( + event: U, listener: KBucketTreeEvents[U] + ) => this + closest: (key: Uint8Array, count: number) => KBucketPeer[] closestPeer: (key: Uint8Array) => KBucketPeer remove: (key: Uint8Array) => void @@ -45,6 +62,8 @@ export interface RoutingTableInit { kBucketSize?: number pingTimeout?: number pingConcurrency?: number + tagName?: string + tagValue?: number } /** @@ -63,17 +82,21 @@ export class RoutingTable implements Startable, Initializable { private readonly pingConcurrency: number private running: boolean private readonly protocol: string + private readonly tagName: string + private readonly tagValue: number constructor (init: RoutingTableInit) { - const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol } = init + const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`) - this.kBucketSize = kBucketSize ?? 20 - this.pingTimeout = pingTimeout ?? 10000 - this.pingConcurrency = pingConcurrency ?? 10 + this.kBucketSize = kBucketSize ?? KBUCKET_SIZE + this.pingTimeout = pingTimeout ?? PING_TIMEOUT + this.pingConcurrency = pingConcurrency ?? PING_CONCURRENCY this.lan = lan this.running = false this.protocol = protocol + this.tagName = tagName ?? KAD_CLOSE_TAG_NAME + this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE const updatePingQueueSizeMetric = () => { this.components.getMetrics()?.updateComponentMetric({ @@ -108,13 +131,18 @@ export class RoutingTable implements Startable, Initializable { async start () { this.running = true - const kBuck = new KBuck({ + const kBuck: KBucketTree = new KBuck({ localNodeId: await utils.convertPeerId(this.components.getPeerId()), numberOfNodesPerKBucket: this.kBucketSize, numberOfNodesToPing: 1 }) - kBuck.on('ping', this._onPing) this.kb = kBuck + + // test whether to evict peers + kBuck.on('ping', this._onPing) + + // tag kad-close peers + this._tagPeers(kBuck) } async stop () { @@ -123,6 +151,48 @@ export class RoutingTable implements Startable, Initializable { this.kb = undefined } + /** + * Keep track of our k-closest peers and tag them in the peer store as such + * - this will lower the chances that connections to them get closed when + * we reach connection limits + */ + _tagPeers (kBuck: KBucketTree) { + let kClosest = new PeerSet() + + const updatePeerTags = utils.debounce(() => { + const newClosest = new PeerSet( + kBuck.closest(kBuck.localNodeId, KBUCKET_SIZE).map(contact => contact.peer) + ) + const addedPeers = newClosest.difference(kClosest) + const removedPeers = kClosest.difference(newClosest) + + Promise.resolve() + .then(async () => { + for (const peer of addedPeers) { + await this.components.getPeerStore().tagPeer(peer, this.tagName, { + value: this.tagValue + }) + } + + for (const peer of removedPeers) { + await this.components.getPeerStore().unTagPeer(peer, this.tagName) + } + }) + .catch(err => { + this.log.error('Could not update peer tags', err) + }) + + kClosest = newClosest + }) + + kBuck.on('added', () => { + updatePeerTags() + }) + kBuck.on('removed', () => { + updatePeerTags() + }) + } + /** * Called on the `ping` event from `k-bucket` when a bucket is full * and cannot split. diff --git a/src/utils.ts b/src/utils.ts index 63b0a558..403721fb 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -112,3 +112,12 @@ export function createPutRecord (key: Uint8Array, value: Uint8Array) { return rec.serialize() } + +export function debounce (callback: () => void, wait: number = 100) { + let timeout: ReturnType + + return () => { + clearTimeout(timeout) + timeout = setTimeout(() => callback(), wait) + } +} diff --git a/test/routing-table.spec.ts b/test/routing-table.spec.ts index f0eb857a..55e926eb 100644 --- a/test/routing-table.spec.ts +++ b/test/routing-table.spec.ts @@ -3,13 +3,21 @@ import { expect } from 'aegir/chai' import random from 'lodash.random' import sinon from 'sinon' -import { RoutingTable } from '../src/routing-table/index.js' +import { KAD_CLOSE_TAG_NAME, KAD_CLOSE_TAG_VALUE, KBUCKET_SIZE, RoutingTable } from '../src/routing-table/index.js' import * as kadUtils from '../src/utils.js' import { createPeerId, createPeerIds } from './utils/create-peer-id.js' import { PROTOCOL_DHT } from '../src/constants.js' import { peerIdFromString } from '@libp2p/peer-id' import { Components } from '@libp2p/components' import { mockConnectionManager } from '@libp2p/interface-mocks' +import { PersistentPeerStore } from '@libp2p/peer-store' +import { MemoryDatastore } from 'datastore-core' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { sortClosestPeers } from './utils/sort-closest-peers.js' +import pWaitFor from 'p-wait-for' +import { pipe } from 'it-pipe' +import all from 'it-all' +import { PeerSet } from '@libp2p/peer-collections' describe('Routing Table', () => { let table: RoutingTable @@ -20,7 +28,9 @@ describe('Routing Table', () => { components = new Components({ peerId: await createPeerId(), - connectionManager: mockConnectionManager() + connectionManager: mockConnectionManager(), + datastore: new MemoryDatastore(), + peerStore: new PersistentPeerStore() }) table = new RoutingTable({ @@ -207,4 +217,90 @@ describe('Routing Table', () => { // evicted the old peer expect(table.kb.get(oldPeer.id)).to.be.null() }) + + it('tags newly found kad-close peers', async () => { + const remotePeer = await createEd25519PeerId() + const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer') + + await table.add(remotePeer) + + expect(tagPeerSpy.callCount).to.equal(0, 'did not debounce call to peerStore.tagPeer') + + await pWaitFor(() => { + return tagPeerSpy.callCount === 1 + }) + + expect(tagPeerSpy.callCount).to.equal(1, 'did not tag kad-close peer') + expect(tagPeerSpy.getCall(0).args[0].toString()).to.equal(remotePeer.toString()) + expect(tagPeerSpy.getCall(0).args[1]).to.equal(KAD_CLOSE_TAG_NAME) + expect(tagPeerSpy.getCall(0).args[2]).to.have.property('value', KAD_CLOSE_TAG_VALUE) + }) + + it('removes tags from kad-close peers when closer peers are found', async () => { + async function getTaggedPeers (): Promise { + return new PeerSet(await pipe( + await components.getPeerStore().all(), + async function * (source) { + for await (const peer of source) { + const tags = await components.getPeerStore().getTags(peer.id) + const kadCloseTags = tags.filter(tag => tag.name === KAD_CLOSE_TAG_NAME) + + if (kadCloseTags.length > 0) { + yield peer.id + } + } + }, + async (source) => await all(source) + )) + } + + const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer') + const unTagPeerSpy = sinon.spy(components.getPeerStore(), 'unTagPeer') + const localNodeId = await kadUtils.convertPeerId(components.getPeerId()) + const sortedPeerList = await sortClosestPeers( + await Promise.all( + new Array(KBUCKET_SIZE + 1).fill(0).map(async () => await createEd25519PeerId()) + ), + localNodeId + ) + + // sort list furthest -> closest + sortedPeerList.reverse() + + // fill the table up to the first kbucket size + for (let i = 0; i < KBUCKET_SIZE; i++) { + await table.add(sortedPeerList[i]) + } + + // should have all added contacts in the root kbucket + expect(table.kb?.count()).to.equal(KBUCKET_SIZE, 'did not fill kbuckets') + expect(table.kb?.root.contacts).to.have.lengthOf(KBUCKET_SIZE, 'split root kbucket when we should not have') + expect(table.kb?.root.left).to.be.null('split root kbucket when we should not have') + expect(table.kb?.root.right).to.be.null('split root kbucket when we should not have') + + await pWaitFor(() => { + return tagPeerSpy.callCount === KBUCKET_SIZE + }) + + // make sure we tagged all of the peers as kad-close + const taggedPeers = await getTaggedPeers() + expect(taggedPeers.difference(new PeerSet(sortedPeerList.slice(0, sortedPeerList.length - 1)))).to.have.property('size', 0) + tagPeerSpy.resetHistory() + + // add a node that is closer than any added so far + await table.add(sortedPeerList[sortedPeerList.length - 1]) + + expect(table.kb?.count()).to.equal(KBUCKET_SIZE + 1, 'did not fill kbuckets') + expect(table.kb?.root.left).to.not.be.null('did not split root kbucket when we should have') + expect(table.kb?.root.right).to.not.be.null('did not split root kbucket when we should have') + + // wait for tag new peer and untag old peer + await pWaitFor(() => { + return tagPeerSpy.callCount === 1 && unTagPeerSpy.callCount === 1 + }) + + // should have updated list of tagged peers + const finalTaggedPeers = await getTaggedPeers() + expect(finalTaggedPeers.difference(new PeerSet(sortedPeerList.slice(1)))).to.have.property('size', 0) + }) })