Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: tag kad-close peers (#375)
Browse files Browse the repository at this point in the history
When peers are added or removed from the kbuckets, if the `kBucketSize`
closest peers change, update their peer store tags to ensure we don't
close connections to them when the connection manager reaches it's
max connections limit.
  • Loading branch information
achingbrain authored Oct 4, 2022
1 parent 16583be commit df15a83
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 9 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-retry": "^5.0.0",
"p-wait-for": "^5.0.0",
"protons": "^5.1.0",
"sinon": "^14.0.0",
"ts-sinon": "^2.0.2",
Expand Down
84 changes: 77 additions & 7 deletions src/routing-table/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Logger } from '@libp2p/logger'
import { Components, Initializable } from '@libp2p/components'
import { PeerSet } from '@libp2p/peer-collections'

export const KAD_CLOSE_TAG_NAME = 'kad-close'
export const KAD_CLOSE_TAG_VALUE = 50
export const KBUCKET_SIZE = 20
export const PING_TIMEOUT = 10000
export const PING_CONCURRENCY = 10

export interface KBucketPeer {
id: Uint8Array
Expand All @@ -22,10 +29,20 @@ export interface KBucket {
right: KBucket
}

interface KBucketTreeEvents {
'ping': (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void
'added': (contact: KBucketPeer) => void
'removed': (contact: KBucketPeer) => void
}

export interface KBucketTree {
root: KBucket
localNodeId: Uint8Array
on: (event: 'ping', callback: (oldContacts: KBucketPeer[], newContact: KBucketPeer) => void) => void

on: <U extends keyof KBucketTreeEvents>(
event: U, listener: KBucketTreeEvents[U]
) => this

closest: (key: Uint8Array, count: number) => KBucketPeer[]
closestPeer: (key: Uint8Array) => KBucketPeer
remove: (key: Uint8Array) => void
Expand All @@ -45,6 +62,8 @@ export interface RoutingTableInit {
kBucketSize?: number
pingTimeout?: number
pingConcurrency?: number
tagName?: string
tagValue?: number
}

/**
Expand All @@ -63,17 +82,21 @@ export class RoutingTable implements Startable, Initializable {
private readonly pingConcurrency: number
private running: boolean
private readonly protocol: string
private readonly tagName: string
private readonly tagValue: number

constructor (init: RoutingTableInit) {
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol } = init
const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init

this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:routing-table`)
this.kBucketSize = kBucketSize ?? 20
this.pingTimeout = pingTimeout ?? 10000
this.pingConcurrency = pingConcurrency ?? 10
this.kBucketSize = kBucketSize ?? KBUCKET_SIZE
this.pingTimeout = pingTimeout ?? PING_TIMEOUT
this.pingConcurrency = pingConcurrency ?? PING_CONCURRENCY
this.lan = lan
this.running = false
this.protocol = protocol
this.tagName = tagName ?? KAD_CLOSE_TAG_NAME
this.tagValue = tagValue ?? KAD_CLOSE_TAG_VALUE

const updatePingQueueSizeMetric = () => {
this.components.getMetrics()?.updateComponentMetric({
Expand Down Expand Up @@ -108,13 +131,18 @@ export class RoutingTable implements Startable, Initializable {
async start () {
this.running = true

const kBuck = new KBuck({
const kBuck: KBucketTree = new KBuck({
localNodeId: await utils.convertPeerId(this.components.getPeerId()),
numberOfNodesPerKBucket: this.kBucketSize,
numberOfNodesToPing: 1
})
kBuck.on('ping', this._onPing)
this.kb = kBuck

// test whether to evict peers
kBuck.on('ping', this._onPing)

// tag kad-close peers
this._tagPeers(kBuck)
}

async stop () {
Expand All @@ -123,6 +151,48 @@ export class RoutingTable implements Startable, Initializable {
this.kb = undefined
}

/**
* Keep track of our k-closest peers and tag them in the peer store as such
* - this will lower the chances that connections to them get closed when
* we reach connection limits
*/
_tagPeers (kBuck: KBucketTree) {
let kClosest = new PeerSet()

const updatePeerTags = utils.debounce(() => {
const newClosest = new PeerSet(
kBuck.closest(kBuck.localNodeId, KBUCKET_SIZE).map(contact => contact.peer)
)
const addedPeers = newClosest.difference(kClosest)
const removedPeers = kClosest.difference(newClosest)

Promise.resolve()
.then(async () => {
for (const peer of addedPeers) {
await this.components.getPeerStore().tagPeer(peer, this.tagName, {
value: this.tagValue
})
}

for (const peer of removedPeers) {
await this.components.getPeerStore().unTagPeer(peer, this.tagName)
}
})
.catch(err => {
this.log.error('Could not update peer tags', err)
})

kClosest = newClosest
})

kBuck.on('added', () => {
updatePeerTags()
})
kBuck.on('removed', () => {
updatePeerTags()
})
}

/**
* Called on the `ping` event from `k-bucket` when a bucket is full
* and cannot split.
Expand Down
9 changes: 9 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,12 @@ export function createPutRecord (key: Uint8Array, value: Uint8Array) {

return rec.serialize()
}

export function debounce (callback: () => void, wait: number = 100) {
let timeout: ReturnType<typeof setTimeout>

return () => {
clearTimeout(timeout)
timeout = setTimeout(() => callback(), wait)
}
}
100 changes: 98 additions & 2 deletions test/routing-table.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
import { expect } from 'aegir/chai'
import random from 'lodash.random'
import sinon from 'sinon'
import { RoutingTable } from '../src/routing-table/index.js'
import { KAD_CLOSE_TAG_NAME, KAD_CLOSE_TAG_VALUE, KBUCKET_SIZE, RoutingTable } from '../src/routing-table/index.js'
import * as kadUtils from '../src/utils.js'
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
import { PROTOCOL_DHT } from '../src/constants.js'
import { peerIdFromString } from '@libp2p/peer-id'
import { Components } from '@libp2p/components'
import { mockConnectionManager } from '@libp2p/interface-mocks'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { MemoryDatastore } from 'datastore-core'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import pWaitFor from 'p-wait-for'
import { pipe } from 'it-pipe'
import all from 'it-all'
import { PeerSet } from '@libp2p/peer-collections'

describe('Routing Table', () => {
let table: RoutingTable
Expand All @@ -20,7 +28,9 @@ describe('Routing Table', () => {

components = new Components({
peerId: await createPeerId(),
connectionManager: mockConnectionManager()
connectionManager: mockConnectionManager(),
datastore: new MemoryDatastore(),
peerStore: new PersistentPeerStore()
})

table = new RoutingTable({
Expand Down Expand Up @@ -207,4 +217,90 @@ describe('Routing Table', () => {
// evicted the old peer
expect(table.kb.get(oldPeer.id)).to.be.null()
})

it('tags newly found kad-close peers', async () => {
const remotePeer = await createEd25519PeerId()
const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')

await table.add(remotePeer)

expect(tagPeerSpy.callCount).to.equal(0, 'did not debounce call to peerStore.tagPeer')

await pWaitFor(() => {
return tagPeerSpy.callCount === 1
})

expect(tagPeerSpy.callCount).to.equal(1, 'did not tag kad-close peer')
expect(tagPeerSpy.getCall(0).args[0].toString()).to.equal(remotePeer.toString())
expect(tagPeerSpy.getCall(0).args[1]).to.equal(KAD_CLOSE_TAG_NAME)
expect(tagPeerSpy.getCall(0).args[2]).to.have.property('value', KAD_CLOSE_TAG_VALUE)
})

it('removes tags from kad-close peers when closer peers are found', async () => {
async function getTaggedPeers (): Promise<PeerSet> {
return new PeerSet(await pipe(
await components.getPeerStore().all(),
async function * (source) {
for await (const peer of source) {
const tags = await components.getPeerStore().getTags(peer.id)
const kadCloseTags = tags.filter(tag => tag.name === KAD_CLOSE_TAG_NAME)

if (kadCloseTags.length > 0) {
yield peer.id
}
}
},
async (source) => await all(source)
))
}

const tagPeerSpy = sinon.spy(components.getPeerStore(), 'tagPeer')
const unTagPeerSpy = sinon.spy(components.getPeerStore(), 'unTagPeer')
const localNodeId = await kadUtils.convertPeerId(components.getPeerId())
const sortedPeerList = await sortClosestPeers(
await Promise.all(
new Array(KBUCKET_SIZE + 1).fill(0).map(async () => await createEd25519PeerId())
),
localNodeId
)

// sort list furthest -> closest
sortedPeerList.reverse()

// fill the table up to the first kbucket size
for (let i = 0; i < KBUCKET_SIZE; i++) {
await table.add(sortedPeerList[i])
}

// should have all added contacts in the root kbucket
expect(table.kb?.count()).to.equal(KBUCKET_SIZE, 'did not fill kbuckets')
expect(table.kb?.root.contacts).to.have.lengthOf(KBUCKET_SIZE, 'split root kbucket when we should not have')
expect(table.kb?.root.left).to.be.null('split root kbucket when we should not have')
expect(table.kb?.root.right).to.be.null('split root kbucket when we should not have')

await pWaitFor(() => {
return tagPeerSpy.callCount === KBUCKET_SIZE
})

// make sure we tagged all of the peers as kad-close
const taggedPeers = await getTaggedPeers()
expect(taggedPeers.difference(new PeerSet(sortedPeerList.slice(0, sortedPeerList.length - 1)))).to.have.property('size', 0)
tagPeerSpy.resetHistory()

// add a node that is closer than any added so far
await table.add(sortedPeerList[sortedPeerList.length - 1])

expect(table.kb?.count()).to.equal(KBUCKET_SIZE + 1, 'did not fill kbuckets')
expect(table.kb?.root.left).to.not.be.null('did not split root kbucket when we should have')
expect(table.kb?.root.right).to.not.be.null('did not split root kbucket when we should have')

// wait for tag new peer and untag old peer
await pWaitFor(() => {
return tagPeerSpy.callCount === 1 && unTagPeerSpy.callCount === 1
})

// should have updated list of tagged peers
const finalTaggedPeers = await getTaggedPeers()
expect(finalTaggedPeers.difference(new PeerSet(sortedPeerList.slice(1)))).to.have.property('size', 0)
})
})

0 comments on commit df15a83

Please sign in to comment.