From 46313a876783da7c036f79daa69a558bd8f1a245 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 26 May 2023 15:42:16 +0100 Subject: [PATCH] fix: use events to delay before self-query (#478) Instead of debouncing and using timeouts to wait for DHT peers before running the initial self-query, instead get the routing table to emit events when peers are added or removed - if the table is empty when we run the self-query, wait for the `peer:add` event before continuing. Improves startup time. Adds tests for the query-self component. --- package.json | 1 + src/kad-dht.ts | 10 ++- src/query-self.ts | 148 ++++++++++++++++--------------------- src/routing-table/index.ts | 18 ++++- test/query-self.spec.ts | 128 ++++++++++++++++++++++++++++++++ test/routing-table.spec.ts | 23 ++++++ 6 files changed, 236 insertions(+), 92 deletions(-) create mode 100644 test/query-self.spec.ts diff --git a/package.json b/package.json index 8026b5fb..b27d1634 100644 --- a/package.json +++ b/package.json @@ -181,6 +181,7 @@ "it-take": "^3.0.1", "multiformats": "^11.0.0", "p-defer": "^4.0.0", + "p-event": "^5.0.1", "p-queue": "^7.3.4", "private-ip": "^3.0.0", "progress-events": "^1.0.0", diff --git a/src/kad-dht.ts b/src/kad-dht.ts index 8e887404..b0e40154 100644 --- a/src/kad-dht.ts +++ b/src/kad-dht.ts @@ -285,10 +285,11 @@ export class DefaultKadDHT extends EventEmitter implements this.queryManager.start(), this.network.start(), this.routingTable.start(), - this.topologyListener.start(), - this.querySelf.start() + this.topologyListener.start() ]) + this.querySelf.start() + await this.routingTableRefresh.start() } @@ -299,14 +300,15 @@ export class DefaultKadDHT extends EventEmitter implements async stop (): Promise { this.running = false + this.querySelf.stop() + await Promise.all([ this.providers.stop(), this.queryManager.stop(), this.network.stop(), this.routingTable.stop(), this.routingTableRefresh.stop(), - this.topologyListener.stop(), - this.querySelf.stop() + this.topologyListener.stop() ]) } diff --git a/src/query-self.ts b/src/query-self.ts index e4f4439e..5bdef869 100644 --- a/src/query-self.ts +++ b/src/query-self.ts @@ -4,10 +4,12 @@ import { anySignal } from 'any-signal' import length from 'it-length' import { pipe } from 'it-pipe' import take from 'it-take' +import pDefer from 'p-defer' +import { pEvent } from 'p-event' import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K, QUERY_SELF_INITIAL_INTERVAL } from './constants.js' -import type { KadDHTComponents } from './index.js' import type { PeerRouting } from './peer-routing/index.js' import type { RoutingTable } from './routing-table/index.js' +import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' import type { DeferredPromise } from 'p-defer' @@ -22,18 +24,8 @@ export interface QuerySelfInit { initialQuerySelfHasRun: DeferredPromise } -function debounce (func: () => void, wait: number): () => void { - let timeout: ReturnType | undefined - - return function () { - const later = function (): void { - timeout = undefined - func() - } - - clearTimeout(timeout) - timeout = setTimeout(later, wait) - } +export interface QuerySelfComponents { + peerId: PeerId } /** @@ -41,7 +33,7 @@ function debounce (func: () => void, wait: number): () => void { */ export class QuerySelf implements Startable { private readonly log: Logger - private readonly components: KadDHTComponents + private readonly components: QuerySelfComponents private readonly peerRouting: PeerRouting private readonly routingTable: RoutingTable private readonly count: number @@ -49,17 +41,16 @@ export class QuerySelf implements Startable { private readonly initialInterval: number private readonly queryTimeout: number private started: boolean - private running: boolean private timeoutId?: NodeJS.Timer private controller?: AbortController private initialQuerySelfHasRun?: DeferredPromise + private querySelfPromise?: DeferredPromise - constructor (components: KadDHTComponents, init: QuerySelfInit) { + constructor (components: QuerySelfComponents, init: QuerySelfInit) { const { peerRouting, lan, count, interval, queryTimeout, routingTable } = init this.components = components this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:query-self`) - this.running = false this.started = false this.peerRouting = peerRouting this.routingTable = routingTable @@ -68,25 +59,28 @@ export class QuerySelf implements Startable { this.initialInterval = init.initialInterval ?? QUERY_SELF_INITIAL_INTERVAL this.queryTimeout = queryTimeout ?? QUERY_SELF_TIMEOUT this.initialQuerySelfHasRun = init.initialQuerySelfHasRun - - this.querySelf = debounce(this.querySelf.bind(this), 100) } isStarted (): boolean { return this.started } - async start (): Promise { + start (): void { if (this.started) { return } this.started = true clearTimeout(this.timeoutId) - this.timeoutId = setTimeout(this.querySelf.bind(this), this.initialInterval) + this.timeoutId = setTimeout(() => { + this.querySelf() + .catch(err => { + this.log.error('error running self-query', err) + }) + }, this.initialInterval) } - async stop (): Promise { + stop (): void { this.started = false if (this.timeoutId != null) { @@ -98,84 +92,68 @@ export class QuerySelf implements Startable { } } - querySelf (): void { + async querySelf (): Promise { if (!this.started) { this.log('skip self-query because we are not started') return } - if (this.running) { - this.log('skip self-query because we are already running, will run again in %dms', this.interval) - return + if (this.querySelfPromise != null) { + this.log('joining existing self query') + return this.querySelfPromise.promise } - if (this.routingTable.size === 0) { - let nextInterval = this.interval + this.querySelfPromise = pDefer() - if (this.initialQuerySelfHasRun != null) { - // if we've not yet run the first self query, shorten the interval until we try again - nextInterval = this.initialInterval - } - - this.log('skip self-query because routing table is empty, will run again in %dms', nextInterval) - clearTimeout(this.timeoutId) - this.timeoutId = setTimeout(this.querySelf.bind(this), nextInterval) - return + if (this.routingTable.size === 0) { + // wait to discover at least one DHT peer + await pEvent(this.routingTable, 'peer:add') } - this.running = true + if (this.started) { + this.controller = new AbortController() + const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) - Promise.resolve() - .then(async () => { - if (!this.started) { - this.log('not running self-query - node stopped before query started') - return + // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged + try { + if (setMaxListeners != null) { + setMaxListeners(Infinity, signal) } + } catch {} // fails on node < 15.4 - this.controller = new AbortController() - const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)]) - - // this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged - try { - if (setMaxListeners != null) { - setMaxListeners(Infinity, signal) - } - } catch {} // fails on node < 15.4 - - try { - this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout) - - const found = await pipe( - this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { - signal, - isSelfQuery: true - }), - (source) => take(source, this.count), - async (source) => length(source) - ) - - this.log('self-query ran successfully - found %d peers', found) - - if (this.initialQuerySelfHasRun != null) { - this.initialQuerySelfHasRun.resolve() - this.initialQuerySelfHasRun = undefined - } - } catch (err: any) { - this.log.error('self-query error', err) - } finally { - signal.clear() - } - }).catch(err => { - this.log('self-query error', err) - }).finally(() => { - this.running = false + try { + this.log('run self-query, look for %d peers timing out after %dms', this.count, this.queryTimeout) + + const found = await pipe( + this.peerRouting.getClosestPeers(this.components.peerId.toBytes(), { + signal, + isSelfQuery: true + }), + (source) => take(source, this.count), + async (source) => length(source) + ) - clearTimeout(this.timeoutId) + this.log('self-query ran successfully - found %d peers', found) - if (this.started) { - this.log('running self-query again in %dms', this.interval) - this.timeoutId = setTimeout(this.querySelf.bind(this), this.interval) + if (this.initialQuerySelfHasRun != null) { + this.initialQuerySelfHasRun.resolve() + this.initialQuerySelfHasRun = undefined } - }) + } catch (err: any) { + this.log.error('self-query error', err) + } finally { + signal.clear() + } + } + + this.querySelfPromise.resolve() + this.querySelfPromise = undefined + + this.timeoutId = setTimeout(() => { + this.querySelf() + .catch(err => { + this.log.error('error running self-query', err) + }) + }, this.interval) } } diff --git a/src/routing-table/index.ts b/src/routing-table/index.ts index 29ac8714..a3f2afb8 100644 --- a/src/routing-table/index.ts +++ b/src/routing-table/index.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from '@libp2p/interfaces/events' import { logger } from '@libp2p/logger' import { PeerSet } from '@libp2p/peer-collections' import Queue from 'p-queue' @@ -33,11 +34,16 @@ export interface RoutingTableComponents { metrics?: Metrics } +export interface RoutingTableEvents { + 'peer:add': CustomEvent + 'peer:remove': CustomEvent +} + /** * A wrapper around `k-bucket`, to provide easy store and * retrieval for peers. */ -export class RoutingTable implements Startable { +export class RoutingTable extends EventEmitter implements Startable { public kBucketSize: number public kb?: KBucket public pingQueue: Queue @@ -58,6 +64,8 @@ export class RoutingTable implements Startable { } constructor (components: RoutingTableComponents, init: RoutingTableInit) { + super() + const { kBucketSize, pingTimeout, lan, pingConcurrency, protocol, tagName, tagValue } = init this.components = components @@ -160,11 +168,15 @@ export class RoutingTable implements Startable { kClosest = newClosest }) - kBuck.addEventListener('added', () => { + kBuck.addEventListener('added', (evt) => { updatePeerTags() + + this.safeDispatchEvent('peer:add', { detail: evt.detail.peer }) }) - kBuck.addEventListener('removed', () => { + kBuck.addEventListener('removed', (evt) => { updatePeerTags() + + this.safeDispatchEvent('peer:remove', { detail: evt.detail.peer }) }) } diff --git a/test/query-self.spec.ts b/test/query-self.spec.ts new file mode 100644 index 00000000..22204198 --- /dev/null +++ b/test/query-self.spec.ts @@ -0,0 +1,128 @@ +/* eslint-env mocha */ + +import { CustomEvent } from '@libp2p/interfaces/events' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import pDefer from 'p-defer' +import { stubInterface, type StubbedInstance } from 'ts-sinon' +import { finalPeerEvent } from '../src/query/events.js' +import { QuerySelf } from '../src/query-self.js' +import type { PeerRouting } from '../src/peer-routing/index.js' +import type { RoutingTable } from '../src/routing-table/index.js' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { DeferredPromise } from 'p-defer' + +describe('Query Self', () => { + let peerId: PeerId + let querySelf: QuerySelf + let peerRouting: StubbedInstance + let routingTable: StubbedInstance + let initialQuerySelfHasRun: DeferredPromise + + beforeEach(async () => { + peerId = await createEd25519PeerId() + initialQuerySelfHasRun = pDefer() + routingTable = stubInterface() + peerRouting = stubInterface() + + const components = { + peerId + } + + const init = { + lan: false, + peerRouting, + routingTable, + initialQuerySelfHasRun + } + + querySelf = new QuerySelf(components, init) + }) + + afterEach(() => { + if (querySelf != null) { + querySelf.stop() + } + }) + + it('should not run if not started', async () => { + await querySelf.querySelf() + + expect(peerRouting.getClosestPeers).to.have.property('callCount', 0) + }) + + it('should wait for routing table peers before running first query', async () => { + querySelf.start() + + // @ts-expect-error read-only property + routingTable.size = 0 + + const querySelfPromise = querySelf.querySelf() + const remotePeer = await createEd25519PeerId() + + let initialQuerySelfHasRunResolved = false + + void initialQuerySelfHasRun.promise.then(() => { + initialQuerySelfHasRunResolved = true + }) + + // should have registered a peer:add listener + // @ts-expect-error ts-sinon makes every property access a function and p-event checks this one first + expect(routingTable.on).to.have.property('callCount', 2) + // @ts-expect-error ts-sinon makes every property access a function and p-event checks this one first + expect(routingTable.on.getCall(0)).to.have.nested.property('args[0]', 'peer:add') + + // self query results + peerRouting.getClosestPeers.withArgs(peerId.toBytes()).returns(async function * () { + yield finalPeerEvent({ + from: remotePeer, + peer: { + id: remotePeer, + multiaddrs: [], + protocols: [] + } + }) + }()) + + // @ts-expect-error args[1] type could be an object + routingTable.on.getCall(0).args[1](new CustomEvent('peer:add', { detail: remotePeer })) + + // self-query should complete + await querySelfPromise + + // should have resolved initial query self promise + expect(initialQuerySelfHasRunResolved).to.be.true() + }) + + it('should join an existing query promise and not run twise', async () => { + querySelf.start() + + // @ts-expect-error read-only property + routingTable.size = 0 + + const querySelfPromise1 = querySelf.querySelf() + const querySelfPromise2 = querySelf.querySelf() + const remotePeer = await createEd25519PeerId() + + // self query results + peerRouting.getClosestPeers.withArgs(peerId.toBytes()).returns(async function * () { + yield finalPeerEvent({ + from: remotePeer, + peer: { + id: remotePeer, + multiaddrs: [], + protocols: [] + } + }) + }()) + + // @ts-expect-error args[1] type could be an object + routingTable.on.getCall(0).args[1](new CustomEvent('peer:add', { detail: remotePeer })) + + // both self-query promises should resolve + await Promise.all([querySelfPromise1, querySelfPromise2]) + + // should only have made one query + expect(peerRouting.getClosestPeers).to.have.property('callCount', 1) + }) +}) diff --git a/test/routing-table.spec.ts b/test/routing-table.spec.ts index ac940458..5fafc297 100644 --- a/test/routing-table.spec.ts +++ b/test/routing-table.spec.ts @@ -11,6 +11,7 @@ import { MemoryDatastore } from 'datastore-core' import all from 'it-all' import { pipe } from 'it-pipe' import random from 'lodash.random' +import { pEvent } from 'p-event' import pWaitFor from 'p-wait-for' import sinon from 'sinon' import { stubInterface } from 'ts-sinon' @@ -21,6 +22,7 @@ import { createPeerId, createPeerIds } from './utils/create-peer-id.js' import { sortClosestPeers } from './utils/sort-closest-peers.js' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { Libp2pEvents } from '@libp2p/interface-libp2p' +import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerStore } from '@libp2p/interface-peer-store' import type { Registrar } from '@libp2p/interface-registrar' @@ -80,6 +82,16 @@ describe('Routing Table', () => { ) }) + it('emits peer:add event', async () => { + const id = await createEd25519PeerId() + const eventPromise = pEvent<'peer:add', CustomEvent>(table, 'peer:add') + + await table.add(id) + + const event = await eventPromise + expect(event.detail.toString()).to.equal(id.toString()) + }) + it('remove', async function () { this.timeout(20 * 1000) @@ -94,6 +106,17 @@ describe('Routing Table', () => { expect(table.size).to.be.eql(9) }) + it('emits peer:remove event', async () => { + const id = await createEd25519PeerId() + const eventPromise = pEvent<'peer:remove', CustomEvent>(table, 'peer:remove') + + await table.add(id) + await table.remove(id) + + const event = await eventPromise + expect(event.detail.toString()).to.equal(id.toString()) + }) + it('closestPeer', async function () { this.timeout(10 * 1000)