From 61aa0ceb0cd7d7d089a217eddebe9de2f68ce1d5 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 2 Nov 2023 13:05:49 +0000 Subject: [PATCH] fix: invoke onProgress callback with DHT queries during routing Allow passing an `onProgress` callback to the peer/content routers that can receive DHT query events. Refs #1574 --- packages/interface/package.json | 1 + .../interface/src/content-routing/index.ts | 16 +++-- packages/interface/src/index.ts | 55 ++++++++++++++--- packages/interface/src/peer-routing/index.ts | 10 ++- packages/kad-dht/src/dual-kad-dht.ts | 61 +++++++++++++++---- packages/libp2p/package.json | 1 + packages/libp2p/src/libp2p.ts | 39 +++++++++--- .../content-routing/content-routing.node.ts | 32 ++++++++++ 8 files changed, 182 insertions(+), 33 deletions(-) diff --git a/packages/interface/package.json b/packages/interface/package.json index 5c57784506..7a853cfea1 100644 --- a/packages/interface/package.json +++ b/packages/interface/package.json @@ -167,6 +167,7 @@ "it-stream-types": "^2.0.1", "multiformats": "^12.0.1", "p-defer": "^4.0.0", + "progress-events": "^1.0.0", "race-signal": "^1.0.0", "uint8arraylist": "^2.4.3" }, diff --git a/packages/interface/src/content-routing/index.ts b/packages/interface/src/content-routing/index.ts index 0d38d58143..0cfa498605 100644 --- a/packages/interface/src/content-routing/index.ts +++ b/packages/interface/src/content-routing/index.ts @@ -1,6 +1,7 @@ import type { AbortOptions } from '../index.js' import type { PeerInfo } from '../peer-info/index.js' import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' /** * Any object that implements this Symbol as a property should return a @@ -23,7 +24,12 @@ import type { CID } from 'multiformats/cid' */ export const contentRouting = Symbol.for('@libp2p/content-routing') -export interface ContentRouting { +export interface ContentRouting< + ProvideProgressEvents extends ProgressEvent = ProgressEvent, + FindProvidersProgressEvents extends ProgressEvent = ProgressEvent, + PutProgressEvents extends ProgressEvent = ProgressEvent, + GetProgressEvents extends ProgressEvent = ProgressEvent +> { /** * The implementation of this method should ensure that network peers know the * caller can provide content that corresponds to the passed CID. @@ -35,7 +41,7 @@ export interface ContentRouting { * await contentRouting.provide(cid) * ``` */ - provide(cid: CID, options?: AbortOptions): Promise + provide(cid: CID, options?: AbortOptions & ProgressOptions): Promise /** * Find the providers of the passed CID. @@ -49,7 +55,7 @@ export interface ContentRouting { * } * ``` */ - findProviders(cid: CID, options?: AbortOptions): AsyncIterable + findProviders(cid: CID, options?: AbortOptions & ProgressOptions): AsyncIterable /** * Puts a value corresponding to the passed key in a way that can later be @@ -65,7 +71,7 @@ export interface ContentRouting { * await contentRouting.put(key, value) * ``` */ - put(key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise + put(key: Uint8Array, value: Uint8Array, options?: AbortOptions & ProgressOptions): Promise /** * Retrieves a value from the network corresponding to the passed key. @@ -79,5 +85,5 @@ export interface ContentRouting { * const value = await contentRouting.get(key) * ``` */ - get(key: Uint8Array, options?: AbortOptions): Promise + get(key: Uint8Array, options?: AbortOptions & ProgressOptions): Promise } diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 5b1463e4c2..20a27b3e12 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -28,6 +28,7 @@ import type { StreamHandler, StreamHandlerOptions } from './stream-handler/index import type { Topology } from './topology/index.js' import type { Listener } from './transport/index.js' import type { Multiaddr } from '@multiformats/multiaddr' +import type { ProgressEvent } from 'progress-events' /** * Used by the connection manager to sort addresses into order before dialling @@ -113,7 +114,15 @@ export interface IdentifyResult { * Event names are `noun:verb` so the first part is the name of the object * being acted on and the second is the action. */ -export interface Libp2pEvents { +export interface Libp2pEvents< + Services extends ServiceMap = ServiceMap, + FindPeerProgressEvents extends ProgressEvent = ProgressEvent, + GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent, + ProvideProgressEvents extends ProgressEvent = ProgressEvent, + FindProvidersProgressEvents extends ProgressEvent = ProgressEvent, + PutProgressEvents extends ProgressEvent = ProgressEvent, + GetProgressEvents extends ProgressEvent = ProgressEvent +> { /** * This event is dispatched when a new network peer is discovered. * @@ -240,7 +249,15 @@ export interface Libp2pEvents { * }) * ``` */ - 'start': CustomEvent> + 'start': CustomEvent> /** * This event notifies listeners that the node has stopped @@ -251,7 +268,15 @@ export interface Libp2pEvents { * }) * ``` */ - 'stop': CustomEvent> + 'stop': CustomEvent> } /** @@ -308,7 +333,23 @@ export interface PendingDial { /** * Libp2p nodes implement this interface. */ -export interface Libp2p extends Startable, TypedEventTarget> { +export interface Libp2p< + Services extends ServiceMap = ServiceMap, + FindPeerProgressEvents extends ProgressEvent = ProgressEvent, + GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent, + ProvideProgressEvents extends ProgressEvent = ProgressEvent, + FindProvidersProgressEvents extends ProgressEvent = ProgressEvent, + PutProgressEvents extends ProgressEvent = ProgressEvent, + GetProgressEvents extends ProgressEvent = ProgressEvent +> extends Startable, TypedEventTarget> { /** * The PeerId is a unique identifier for a node on the network. * @@ -359,7 +400,7 @@ export interface Libp2p extends Startable, Ty * } * ``` */ - peerRouting: PeerRouting + peerRouting: PeerRouting /** * The content routing subsystem allows the user to find providers for content, @@ -375,7 +416,7 @@ export interface Libp2p extends Startable, Ty * } * ``` */ - contentRouting: ContentRouting + contentRouting: ContentRouting /** * The keychain contains the keys used by the current node, and can create new @@ -602,7 +643,7 @@ export interface Libp2p extends Startable, Ty /** * A set of user defined services */ - services: T + services: Services } /** diff --git a/packages/interface/src/peer-routing/index.ts b/packages/interface/src/peer-routing/index.ts index dcaa68eaa2..abe1710f09 100644 --- a/packages/interface/src/peer-routing/index.ts +++ b/packages/interface/src/peer-routing/index.ts @@ -1,6 +1,7 @@ import type { AbortOptions } from '../index.js' import type { PeerId } from '../peer-id/index.js' import type { PeerInfo } from '../peer-info/index.js' +import type { ProgressEvent, ProgressOptions } from 'progress-events' /** * Any object that implements this Symbol as a property should return a @@ -23,7 +24,10 @@ import type { PeerInfo } from '../peer-info/index.js' */ export const peerRouting = Symbol.for('@libp2p/peer-routing') -export interface PeerRouting { +export interface PeerRouting< + FindPeerProgressEvents extends ProgressEvent = ProgressEvent, + GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent +> { /** * Searches the network for peer info corresponding to the passed peer id. * @@ -34,7 +38,7 @@ export interface PeerRouting { * const peer = await peerRouting.findPeer(peerId, options) * ``` */ - findPeer(peerId: PeerId, options?: AbortOptions): Promise + findPeer(peerId: PeerId, options?: AbortOptions & ProgressOptions): Promise /** * Search the network for peers that are closer to the passed key. Peer @@ -49,5 +53,5 @@ export interface PeerRouting { * } * ``` */ - getClosestPeers(key: Uint8Array, options?: AbortOptions): AsyncIterable + getClosestPeers(key: Uint8Array, options?: AbortOptions & ProgressOptions): AsyncIterable } diff --git a/packages/kad-dht/src/dual-kad-dht.ts b/packages/kad-dht/src/dual-kad-dht.ts index ef06f739ea..5427e1fd99 100644 --- a/packages/kad-dht/src/dual-kad-dht.ts +++ b/packages/kad-dht/src/dual-kad-dht.ts @@ -4,9 +4,9 @@ import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events' import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery' import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing' import { logger } from '@libp2p/logger' -import drain from 'it-drain' import merge from 'it-merge' import isPrivate from 'private-ip' +import { CustomProgressEvent } from 'progress-events' import { DefaultKadDHT } from './kad-dht.js' import { queryErrorEvent } from './query/events.js' import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js' @@ -14,37 +14,63 @@ import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerInfo } from '@libp2p/interface/peer-info' import type { Multiaddr } from '@multiformats/multiaddr' import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' + +export type ProvideProgressEvents = + ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent> + +export type FindProvidersProgressEvents = + ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent> + +export type PutProgressEvents = + ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent> + +export type GetProgressEvents = + ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent> const log = logger('libp2p:kad-dht') /** * Wrapper class to convert events into returned values */ -class DHTContentRouting implements ContentRouting { +class DHTContentRouting implements ContentRouting< +ProvideProgressEvents, +FindProvidersProgressEvents, +PutProgressEvents, +GetProgressEvents +> { private readonly dht: KadDHT constructor (dht: KadDHT) { this.dht = dht } - async provide (cid: CID, options: QueryOptions = {}): Promise { - await drain(this.dht.provide(cid, options)) + async provide (cid: CID, options: QueryOptions & ProgressOptions = {}): Promise { + for await (const event of this.dht.provide(cid, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event)) + } } - async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator { + async * findProviders (cid: CID, options: QueryOptions & ProgressOptions = {}): AsyncGenerator { for await (const event of this.dht.findProviders(cid, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event)) + if (event.name === 'PROVIDER') { yield * event.providers } } } - async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise { - await drain(this.dht.put(key, value, options)) + async put (key: Uint8Array, value: Uint8Array, options: QueryOptions & ProgressOptions = {}): Promise { + for await (const event of this.dht.put(key, value, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event)) + } } - async get (key: Uint8Array, options?: QueryOptions): Promise { + async get (key: Uint8Array, options: QueryOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.get(key, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event)) + if (event.name === 'VALUE') { return event.value } @@ -54,18 +80,29 @@ class DHTContentRouting implements ContentRouting { } } +export type FindPeerProgressEvents = + ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent> + +export type GetClosestPeersProgressEvents = + ProgressEvent<'libp2p:peer-routing:get-closest-peers:dht:event', QueryEvent> + /** * Wrapper class to convert events into returned values */ -class DHTPeerRouting implements PeerRouting { +class DHTPeerRouting implements PeerRouting< +FindPeerProgressEvents, +GetClosestPeersProgressEvents +> { private readonly dht: KadDHT constructor (dht: KadDHT) { this.dht = dht } - async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise { + async findPeer (peerId: PeerId, options: QueryOptions & ProgressOptions = {}): Promise { for await (const event of this.dht.findPeer(peerId, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event)) + if (event.name === 'FINAL_PEER') { return event.peer } @@ -74,8 +111,10 @@ class DHTPeerRouting implements PeerRouting { throw new CodeError('Not found', 'ERR_NOT_FOUND') } - async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable { + async * getClosestPeers (key: Uint8Array, options: QueryOptions & ProgressOptions = {}): AsyncIterable { for await (const event of this.dht.getClosestPeers(key, options)) { + options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peers:dht:event', event)) + if (event.name === 'FINAL_PEER') { yield event.peer } diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index 3a574256ae..75cb005f6b 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -159,6 +159,7 @@ "p-queue": "^7.3.4", "p-retry": "^6.0.0", "private-ip": "^3.0.0", + "progress-events": "^1.0.0", "protons-runtime": "^5.0.0", "rate-limiter-flexible": "^3.0.0", "uint8arraylist": "^2.4.3", diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index 5708aa59e3..158c05a37f 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -38,22 +38,31 @@ import type { PeerStore } from '@libp2p/interface/peer-store' import type { Topology } from '@libp2p/interface/topology' import type { StreamHandler, StreamHandlerOptions } from '@libp2p/interface-internal/registrar' import type { Datastore } from 'interface-datastore' +import type { ProgressEvent } from 'progress-events' const log = logger('libp2p') -export class Libp2pNode> extends TypedEventEmitter implements Libp2p { +export class Libp2pNode< + Services extends ServiceMap = Record, + FindPeerProgressEvents extends ProgressEvent = ProgressEvent, + GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent, + ProvideProgressEvents extends ProgressEvent = ProgressEvent, + FindProvidersProgressEvents extends ProgressEvent = ProgressEvent, + PutProgressEvents extends ProgressEvent = ProgressEvent, + GetProgressEvents extends ProgressEvent = ProgressEvent +> extends TypedEventEmitter implements Libp2p { public peerId: PeerId public peerStore: PeerStore - public contentRouting: ContentRouting - public peerRouting: PeerRouting + public contentRouting: ContentRouting + public peerRouting: PeerRouting public keychain: KeyChain public metrics?: Metrics - public services: T + public services: Services public components: Components #started: boolean - constructor (init: Libp2pInit) { + constructor (init: Libp2pInit) { super() // event bus - components can listen to this emitter to be notified of system events @@ -174,7 +183,7 @@ export class Libp2pNode> extends continue } - this.services[name as keyof T] = service + this.services[name as keyof Services] = service this.configureComponent(name, service) if (service[contentRouting] != null) { @@ -407,7 +416,23 @@ export class Libp2pNode> extends * Returns a new Libp2pNode instance - this exposes more of the internals than the * libp2p interface and is useful for testing and debugging. */ -export async function createLibp2pNode > (options: Libp2pOptions): Promise> { +export async function createLibp2pNode < + Services extends ServiceMap = Record, + FindPeerProgressEvents extends ProgressEvent = ProgressEvent, + GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent, + ProvideProgressEvents extends ProgressEvent = ProgressEvent, + FindProvidersProgressEvents extends ProgressEvent = ProgressEvent, + PutProgressEvents extends ProgressEvent = ProgressEvent, + GetProgressEvents extends ProgressEvent = ProgressEvent +> (options: Libp2pOptions): Promise> { if (options.peerId == null) { const datastore = options.datastore as Datastore | undefined diff --git a/packages/libp2p/test/content-routing/content-routing.node.ts b/packages/libp2p/test/content-routing/content-routing.node.ts index 66967667ea..8af738e313 100644 --- a/packages/libp2p/test/content-routing/content-routing.node.ts +++ b/packages/libp2p/test/content-routing/content-routing.node.ts @@ -115,6 +115,38 @@ describe('content-routing', () => { return deferred.promise }) + + it('should call progress handler', async () => { + const deferred = pDefer() + + if (nodes[0].services.dht == null) { + throw new Error('DHT was not configured') + } + + sinon.stub(nodes[0].services.dht, 'findProviders').callsFake(async function * () { + yield { + from: nodes[0].peerId, + type: EventTypes.PROVIDER, + name: 'PROVIDER', + providers: [{ + id: nodes[0].peerId, + multiaddrs: [], + protocols: [] + }] + } + deferred.resolve() + }) + + const onProgress = sinon.stub() + + await drain(nodes[0].contentRouting.findProviders(CID.parse('QmU621oD8AhHw6t25vVyfYKmL9VV3PTgc52FngEhTGACFB'), { + onProgress + })) + + await deferred.promise + + expect(onProgress.called).to.be.true() + }) }) describe('via delegate router', () => {