Skip to content

Commit

Permalink
feat: add optional topology filter (#2544)
Browse files Browse the repository at this point in the history
Adds a `filter` option to topologies to allow filtering out duplicate
notifications.
  • Loading branch information
achingbrain authored May 15, 2024
1 parent b0554a5 commit 3c73707
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 326 deletions.
28 changes: 27 additions & 1 deletion packages/interface/src/topology/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
import type { Connection } from '../connection/index.js'
import type { PeerId } from '../peer-id/index.js'

/**
* A topology filter
*
* @see https://libp2p.github.io/js-libp2p/functions/_libp2p_peer_collections.peerFilter-1.html
*/
export interface TopologyFilter {
has (peerId: PeerId): boolean
add (peerId: PeerId): void
remove (peerId: PeerId): void
}

/**
* A topology is a network overlay that contains a subset of peers in the
* complete network.
*
* It is a way to be notified when peers that support a given protocol connect
* to or disconnect from the current node.
*/
export interface Topology {
/**
* An optional filter can prevent duplicate topology notifications for the
* same peer.
*/
filter?: TopologyFilter

/**
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
* and/or data-limited) connections. (default: false)
* and/or data-limited) connections
*
* @default false
*/
notifyOnTransient?: boolean

Expand Down
15 changes: 15 additions & 0 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ export class DefaultRegistrar implements Registrar {
}

for (const topology of topologies.values()) {
if (topology.filter?.has(remotePeer) === false) {
continue
}

topology.filter?.remove(remotePeer)
topology.onDisconnect?.(remotePeer)
}
}
Expand Down Expand Up @@ -195,6 +200,11 @@ export class DefaultRegistrar implements Registrar {
}

for (const topology of topologies.values()) {
if (topology.filter?.has(peer.id) === false) {
continue
}

topology.filter?.remove(peer.id)
topology.onDisconnect?.(peer.id)
}
}
Expand Down Expand Up @@ -222,6 +232,11 @@ export class DefaultRegistrar implements Registrar {
continue
}

if (topology.filter?.has(peerId) === true) {
continue
}

topology.filter?.add(peerId)
topology.onConnect?.(peerId, connection)
}
}
Expand Down
54 changes: 54 additions & 0 deletions packages/libp2p/test/registrar/errors.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* eslint-env mocha */

import { TypedEventEmitter, type ConnectionGater, type PeerId } from '@libp2p/interface'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { PersistentPeerStore } from '@libp2p/peer-store'
import { expect } from 'aegir/chai'
import { MemoryDatastore } from 'datastore-core/memory'
import { stubInterface } from 'sinon-ts'
import { defaultComponents } from '../../src/components.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultRegistrar } from '../../src/registrar.js'
import type { Components } from '../../src/components.js'
import type { Registrar, TransportManager } from '@libp2p/interface-internal'

describe('registrar errors', () => {
let components: Components
let registrar: Registrar
let peerId: PeerId

before(async () => {
peerId = await createEd25519PeerId()
const events = new TypedEventEmitter()
components = defaultComponents({
peerId,
events,
datastore: new MemoryDatastore(),
upgrader: mockUpgrader({ events }),
transportManager: stubInterface<TransportManager>(),
connectionGater: stubInterface<ConnectionGater>()
})
components.peerStore = new PersistentPeerStore(components)
components.connectionManager = new DefaultConnectionManager(components, {
minConnections: 50,
maxConnections: 1000,
inboundUpgradeTimeout: 1000
})
registrar = new DefaultRegistrar(components)
})

it('should fail to register a protocol if no multicodec is provided', () => {
// @ts-expect-error invalid parameters
return expect(registrar.register()).to.eventually.be.rejected()
})

it('should fail to register a protocol if an invalid topology is provided', () => {
const fakeTopology = {
random: 1
}

// @ts-expect-error invalid parameters
return expect(registrar.register(fakeTopology)).to.eventually.be.rejected()
})
})
58 changes: 58 additions & 0 deletions packages/libp2p/test/registrar/protocols.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/* eslint-env mocha */

import { yamux } from '@chainsafe/libp2p-yamux'
import { mplex } from '@libp2p/mplex'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { plaintext } from '@libp2p/plaintext'
import { webSockets } from '@libp2p/websockets'
import { expect } from 'aegir/chai'
import pDefer from 'p-defer'
import { createLibp2pNode } from '../../src/libp2p.js'
import type { Components } from '../../src/components.js'
import type { Libp2pNode } from '../../src/libp2p.js'

describe('registrar protocols', () => {
let libp2p: Libp2pNode

it('should be able to register and unregister a handler', async () => {
const deferred = pDefer<Components>()

libp2p = await createLibp2pNode({
peerId: await createEd25519PeerId(),
transports: [
webSockets()
],
streamMuxers: [
yamux(),
mplex()
],
connectionEncryption: [
plaintext()
],
services: {
test: (components: any) => {
deferred.resolve(components)
}
}
})

const components = await deferred.promise

const registrar = components.registrar

expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])

const echoHandler = (): void => {}
await libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler)
expect(registrar.getHandler('/echo/1.0.0')).to.have.property('handler', echoHandler)
expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler)

await libp2p.unhandle(['/echo/1.0.0'])
expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0'])
expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler)

await expect(libp2p.peerStore.get(libp2p.peerId)).to.eventually.have.deep.property('protocols', [
'/echo/1.0.1'
])
})
})
Loading

0 comments on commit 3c73707

Please sign in to comment.