Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: invoke onProgress callback with events during routing #1975

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"
},
Expand Down
16 changes: 11 additions & 5 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,12 @@ import type { CID } from 'multiformats/cid'
*/
export const contentRouting = Symbol.for('@libp2p/content-routing')

export interface ContentRouting {
export interface ContentRouting<
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* The implementation of this method should ensure that network peers know the
* caller can provide content that corresponds to the passed CID.
Expand All @@ -35,7 +41,7 @@ export interface ContentRouting {
* await contentRouting.provide(cid)
* ```
*/
provide(cid: CID, options?: AbortOptions): Promise<void>
provide(cid: CID, options?: AbortOptions & ProgressOptions<ProvideProgressEvents>): Promise<void>

/**
* Find the providers of the passed CID.
Expand All @@ -49,7 +55,7 @@ export interface ContentRouting {
* }
* ```
*/
findProviders(cid: CID, options?: AbortOptions): AsyncIterable<PeerInfo>
findProviders(cid: CID, options?: AbortOptions & ProgressOptions<FindProvidersProgressEvents>): AsyncIterable<PeerInfo>

/**
* Puts a value corresponding to the passed key in a way that can later be
Expand All @@ -65,7 +71,7 @@ export interface ContentRouting {
* await contentRouting.put(key, value)
* ```
*/
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise<void>
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions & ProgressOptions<PutProgressEvents>): Promise<void>

/**
* Retrieves a value from the network corresponding to the passed key.
Expand All @@ -79,5 +85,5 @@ export interface ContentRouting {
* const value = await contentRouting.get(key)
* ```
*/
get(key: Uint8Array, options?: AbortOptions): Promise<Uint8Array>
get(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetProgressEvents>): Promise<Uint8Array>
}
55 changes: 48 additions & 7 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type { StreamHandler, StreamHandlerOptions } from './stream-handler/index
import type { Topology } from './topology/index.js'
import type { Listener } from './transport/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressEvent } from 'progress-events'

/**
* Used by the connection manager to sort addresses into order before dialling
Expand Down Expand Up @@ -113,7 +114,15 @@ export interface IdentifyResult {
* Event names are `noun:verb` so the first part is the name of the object
* being acted on and the second is the action.
*/
export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
export interface Libp2pEvents<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* This event is dispatched when a new network peer is discovered.
*
Expand Down Expand Up @@ -240,7 +249,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'start': CustomEvent<Libp2p<T>>
'start': CustomEvent<Libp2p<
Copy link
Member

@maschad maschad Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty verbose, should we have this as a seperate interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of how this is implemented so I'm certainly open to suggestions of how to make it simpler.

The idea is to derive the types of content/peer routing progress events you'll get from the config so they can be type safe. I not sure it's reliably possible, and it leads to these sorts of generics contortions - we may be better off just making it untyped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree there, I don't see any type safety benefits as is, I think we are better off with

export interface Libp2pEvents<
  Services extends ServiceMap = ServiceMap,
  ProgressEvents extends ProgressEvent = ProgressEvent
> {
  'start': CustomEvent<Libp2p<Services, ProgressEvents>>;
  'stop': CustomEvent<Libp2p<Services, ProgressEvents>>;
}

given the DHTContentRouting class already specifies the ProgessEvent types in it's methods.

Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>

/**
* This event notifies listeners that the node has stopped
Expand All @@ -251,7 +268,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'stop': CustomEvent<Libp2p<T>>
'stop': CustomEvent<Libp2p<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>
}

/**
Expand Down Expand Up @@ -308,7 +333,23 @@ export interface PendingDial {
/**
* Libp2p nodes implement this interface.
*/
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, TypedEventTarget<Libp2pEvents<T>> {
export interface Libp2p<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> extends Startable, TypedEventTarget<Libp2pEvents<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>> {
/**
* The PeerId is a unique identifier for a node on the network.
*
Expand Down Expand Up @@ -359,7 +400,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
peerRouting: PeerRouting
peerRouting: PeerRouting<FindPeerProgressEvents, GetClosestPeersProgressEvents>

/**
* The content routing subsystem allows the user to find providers for content,
Expand All @@ -375,7 +416,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
contentRouting: ContentRouting
contentRouting: ContentRouting<ProvideProgressEvents, FindProvidersProgressEvents, GetProgressEvents, PutProgressEvents>

/**
* The keychain contains the keys used by the current node, and can create new
Expand Down Expand Up @@ -602,7 +643,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
/**
* A set of user defined services
*/
services: T
services: Services
}

/**
Expand Down
10 changes: 7 additions & 3 deletions packages/interface/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerId } from '../peer-id/index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,10 @@ import type { PeerInfo } from '../peer-info/index.js'
*/
export const peerRouting = Symbol.for('@libp2p/peer-routing')

export interface PeerRouting {
export interface PeerRouting<
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* Searches the network for peer info corresponding to the passed peer id.
*
Expand All @@ -34,7 +38,7 @@ export interface PeerRouting {
* const peer = await peerRouting.findPeer(peerId, options)
* ```
*/
findPeer(peerId: PeerId, options?: AbortOptions): Promise<PeerInfo>
findPeer(peerId: PeerId, options?: AbortOptions & ProgressOptions<FindPeerProgressEvents>): Promise<PeerInfo>

/**
* Search the network for peers that are closer to the passed key. Peer
Expand All @@ -49,5 +53,5 @@ export interface PeerRouting {
* }
* ```
*/
getClosestPeers(key: Uint8Array, options?: AbortOptions): AsyncIterable<PeerInfo>
getClosestPeers(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetClosestPeersProgressEvents>): AsyncIterable<PeerInfo>
}
61 changes: 50 additions & 11 deletions packages/kad-dht/src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,73 @@
import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import merge from 'it-merge'
import isPrivate from 'private-ip'
import { CustomProgressEvent } from 'progress-events'
import { DefaultKadDHT } from './kad-dht.js'
import { queryErrorEvent } from './query/events.js'
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type ProvideProgressEvents =
ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent>

export type FindProvidersProgressEvents =
ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent>

export type PutProgressEvents =
ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent>

export type GetProgressEvents =
ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent>

const log = logger('libp2p:kad-dht')

/**
* Wrapper class to convert events into returned values
*/
class DHTContentRouting implements ContentRouting {
class DHTContentRouting implements ContentRouting<
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
async provide (cid: CID, options: QueryOptions & ProgressOptions<ProvideProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.provide(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event))
}

Check warning on line 51 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L49-L51

Added lines #L49 - L51 were not covered by tests
}

async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
async * findProviders (cid: CID, options: QueryOptions & ProgressOptions<FindProvidersProgressEvents> = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event))

Check warning on line 57 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L56-L57

Added lines #L56 - L57 were not covered by tests
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
async put (key: Uint8Array, value: Uint8Array, options: QueryOptions & ProgressOptions<PutProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.put(key, value, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event))
}

Check warning on line 67 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L65-L67

Added lines #L65 - L67 were not covered by tests
}

async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
async get (key: Uint8Array, options: QueryOptions & ProgressOptions<GetProgressEvents> = {}): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event))

Check warning on line 73 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L72-L73

Added lines #L72 - L73 were not covered by tests
if (event.name === 'VALUE') {
return event.value
}
Expand All @@ -54,18 +80,29 @@
}
}

export type FindPeerProgressEvents =
ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent>

export type GetClosestPeersProgressEvents =
ProgressEvent<'libp2p:peer-routing:get-closest-peers:dht:event', QueryEvent>

/**
* Wrapper class to convert events into returned values
*/
class DHTPeerRouting implements PeerRouting {
class DHTPeerRouting implements PeerRouting<
FindPeerProgressEvents,
GetClosestPeersProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
async findPeer (peerId: PeerId, options: QueryOptions & ProgressOptions<FindPeerProgressEvents> = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event))

Check warning on line 105 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L104-L105

Added lines #L104 - L105 were not covered by tests
if (event.name === 'FINAL_PEER') {
return event.peer
}
Expand All @@ -74,8 +111,10 @@
throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
async * getClosestPeers (key: Uint8Array, options: QueryOptions & ProgressOptions<GetClosestPeersProgressEvents> = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peers:dht:event', event))

Check warning on line 117 in packages/kad-dht/src/dual-kad-dht.ts

View check run for this annotation

Codecov / codecov/patch

packages/kad-dht/src/dual-kad-dht.ts#L116-L117

Added lines #L116 - L117 were not covered by tests
if (event.name === 'FINAL_PEER') {
yield event.peer
}
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"p-queue": "^7.3.4",
"p-retry": "^6.0.0",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"rate-limiter-flexible": "^3.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
Loading
Loading