Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: tag kad-close peers #375

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.1",
"@libp2p/peer-collections": "^2.2.0",
"@libp2p/peer-id": "^1.1.15",
"@libp2p/record": "^2.0.2",
"@libp2p/topology": "^3.0.0",
Expand Down Expand Up @@ -200,6 +201,7 @@
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-retry": "^5.0.0",
"p-wait-for": "^5.0.0",
"protons": "^5.1.0",
"sinon": "^14.0.0",
"ts-sinon": "^2.0.2",
Expand Down
84 changes: 77 additions & 7 deletions src/routing-table/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Logger } from '@libp2p/logger'
import { Components, Initializable } from '@libp2p/components'
import { PeerSet } from '@libp2p/peer-collections'

export const KAD_CLOSE_TAG_NAME = 'kad-close'
export const KAD_CLOSE_TAG_VALUE = 50
export const KBUCKET_SIZE = 20
export const PING_TIMEOUT = 10000
export const PING_CONCURRENCY = 10

export interface KBucketPeer {
id: Uint8Array
Expand All @@ -22,10 +29,20 @@ export interface KBucket {
right: KBucket
}

interface KBucketTreeEvents {
'ping': (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void
'added': (contact: KBucketPeer) => void
'removed': (contact: KBucketPeer) => void
}

export interface KBucketTree {
root: KBucket
localNodeId: Uint8Array
on: (event: 'ping', callback: (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void) => void

on: <U extends keyof KBucketTreeEvents>(
event: U, listener: KBucketTreeEvents[U]
) => this

closest: (key: Uint8Array, count: number) => KBucketPeer[]
closestPeer: (key: Uint8Array) => KBucketPeer
remove: (key: Uint8Array) => void
Expand All @@ -45,6 +62,8 @@ export interface RoutingTableInit {
kBucketSize?: number
pingTimeout?: number
pingConcurrency?: number
tagName?: string
tagValue?: number
}

/**
Expand All @@ -63,17 +82,21 @@ export class RoutingTable implements Startable, Initializable {
private readonly pingConcurrency: number
private running: boolean
private readonly protocol: string
private readonly tagName: string
private readonly tagValue: number

constructor (init: RoutingTableInit) {
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol } = init
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init

this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`)
this.kBucketSize = kBucketSize ?? 20
this.pingTimeout = pingTimeout ?? 10000
this.pingConcurrency = pingConcurrency ?? 10
this.kBucketSize = kBucketSize ?? KBUCKET_SIZE
this.pingTimeout = pingTimeout ?? PING_TIMEOUT
this.pingConcurrency = pingConcurrency ?? PING_CONCURRENCY
this.lan = lan
this.running = false
this.protocol = protocol
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE

const updatePingQueueSizeMetric = () => {
this.components.getMetrics()?.updateComponentMetric({
Expand Down Expand Up @@ -108,13 +131,18 @@ export class RoutingTable implements Startable, Initializable {
async start () {
this.running = true

const kBuck = new KBuck({
const kBuck: KBucketTree = new KBuck({
localNodeId: await utils.convertPeerId(this.components.getPeerId()),
numberOfNodesPerKBucket: this.kBucketSize,
numberOfNodesToPing: 1
})
kBuck.on('ping', this._onPing)
this.kb = kBuck

// test whether to evict peers
kBuck.on('ping', this._onPing)

// tag kad-close peers
this._tagPeers(kBuck)
}

async stop () {
Expand All @@ -123,6 +151,48 @@ export class RoutingTable implements Startable, Initializable {
this.kb = undefined
}

/**
* Keep track of our k-closest peers and tag them in the peer store as such
* - this will lower the chances that connections to them get closed when
* we reach connection limits
*/
_tagPeers (kBuck: KBucketTree) {
let kClosest = new PeerSet()

const updatePeerTags = utils.debounce(() => {
const newClosest = new PeerSet(
kBuck.closest(kBuck.localNodeId, KBUCKET_SIZE).map(contact => contact.peer)
)
const addedPeers = newClosest.difference(kClosest)
const removedPeers = kClosest.difference(newClosest)

Promise.resolve()
.then(async () => {
for (const peer of addedPeers) {
await this.components.getPeerStore().tagPeer(peer, this.tagName, {
value: this.tagValue
})
}

for (const peer of removedPeers) {
await this.components.getPeerStore().unTagPeer(peer, this.tagName)
}
})
.catch(err => {
this.log.error('Could not update peer tags', err)
})

kClosest = newClosest
})

kBuck.on('added', () => {
updatePeerTags()
})
kBuck.on('removed', () => {
updatePeerTags()
})
}

/**
* Called on the `ping` event from `k-bucket` when a bucket is full
* and cannot split.
Expand Down
9 changes: 9 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,12 @@ export function createPutRecord (key: Uint8Array, value: Uint8Array) {

return rec.serialize()
}

export function debounce (callback: () => void, wait: number = 100) {
let timeout: ReturnType<typeof setTimeout>

return () => {
clearTimeout(timeout)
timeout = setTimeout(() => callback(), wait)
}
}
100 changes: 98 additions & 2 deletions test/routing-table.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
import { expect } from 'aegir/chai'
import random from 'lodash.random'
import sinon from 'sinon'
import { RoutingTable } from '../src/routing-table/index.js'
import { KAD_CLOSE_TAG_NAME, KAD_CLOSE_TAG_VALUE, KBUCKET_SIZE, RoutingTable } from '../src/routing-table/index.js'
import * as kadUtils from '../src/utils.js'
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
import { PROTOCOL_DHT } from '../src/constants.js'
import { peerIdFromString } from '@libp2p/peer-id'
import { Components } from '@libp2p/components'
import { mockConnectionManager } from '@libp2p/interface-mocks'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import pWaitFor from 'p-wait-for'
import { pipe } from 'it-pipe'
import all from 'it-all'
import { PeerSet } from '@libp2p/peer-collections'

describe('Routing Table', () => {
let table: RoutingTable
Expand All @@ -20,7 +28,9 @@ describe('Routing Table', () => {

components = new Components({
peerId: await createPeerId(),
connectionManager: mockConnectionManager()
connectionManager: mockConnectionManager(),
datastore: new MemoryDatastore(),
peerStore: new PersistentPeerStore()
})

table = new RoutingTable({
Expand Down Expand Up @@ -207,4 +217,90 @@ describe('Routing Table', () => {
// evicted the old peer
expect(table.kb.get(oldPeer.id)).to.be.null()
})

it('tags newly found kad-close peers', async () => {
const remotePeer = await createEd25519PeerId()
const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')

await table.add(remotePeer)

expect(tagPeerSpy.callCount).to.equal(0, 'did not debounce call to peerStore.tagPeer')

await pWaitFor(() => {
return tagPeerSpy.callCount === 1
})

expect(tagPeerSpy.callCount).to.equal(1, 'did not tag kad-close peer')
expect(tagPeerSpy.getCall(0).args[0].toString()).to.equal(remotePeer.toString())
expect(tagPeerSpy.getCall(0).args[1]).to.equal(KAD_CLOSE_TAG_NAME)
expect(tagPeerSpy.getCall(0).args[2]).to.have.property('value', KAD_CLOSE_TAG_VALUE)
})

it('removes tags from kad-close peers when closer peers are found', async () => {
async function getTaggedPeers (): Promise<PeerSet> {
return new PeerSet(await pipe(
await components.getPeerStore().all(),
async function * (source) {
for await (const peer of source) {
const tags = await components.getPeerStore().getTags(peer.id)
const kadCloseTags = tags.filter(tag => tag.name === KAD_CLOSE_TAG_NAME)

if (kadCloseTags.length > 0) {
yield peer.id
}
}
},
async (source) => await all(source)
))
}

const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')
const unTagPeerSpy = sinon.spy(components.getPeerStore(), 'unTagPeer')
const localNodeId = await kadUtils.convertPeerId(components.getPeerId())
const sortedPeerList = await sortClosestPeers(
await Promise.all(
new Array(KBUCKET_SIZE + 1).fill(0).map(async () => await createEd25519PeerId())
),
localNodeId
)

// sort list furthest -> closest
sortedPeerList.reverse()

// fill the table up to the first kbucket size
for (let i = 0; i < KBUCKET_SIZE; i++) {
await table.add(sortedPeerList[i])
}

// should have all added contacts in the root kbucket
expect(table.kb?.count()).to.equal(KBUCKET_SIZE, 'did not fill kbuckets')
expect(table.kb?.root.contacts).to.have.lengthOf(KBUCKET_SIZE, 'split root kbucket when we should not have')
expect(table.kb?.root.left).to.be.null('split root kbucket when we should not have')
expect(table.kb?.root.right).to.be.null('split root kbucket when we should not have')

await pWaitFor(() => {
return tagPeerSpy.callCount === KBUCKET_SIZE
})

// make sure we tagged all of the peers as kad-close
const taggedPeers = await getTaggedPeers()
expect(taggedPeers.difference(new PeerSet(sortedPeerList.slice(0, sortedPeerList.length - 1)))).to.have.property('size', 0)
tagPeerSpy.resetHistory()

// add a node that is closer than any added so far
await table.add(sortedPeerList[sortedPeerList.length - 1])

expect(table.kb?.count()).to.equal(KBUCKET_SIZE + 1, 'did not fill kbuckets')
expect(table.kb?.root.left).to.not.be.null('did not split root kbucket when we should have')
expect(table.kb?.root.right).to.not.be.null('did not split root kbucket when we should have')

// wait for tag new peer and untag old peer
await pWaitFor(() => {
return tagPeerSpy.callCount === 1 && unTagPeerSpy.callCount === 1
})

// should have updated list of tagged peers
const finalTaggedPeers = await getTaggedPeers()
expect(finalTaggedPeers.difference(new PeerSet(sortedPeerList.slice(1)))).to.have.property('size', 0)
})
})