From 3c73707ff5c1635d4ab26dcc39499ab497d217a6 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 15 May 2024 12:33:03 +0100 Subject: [PATCH] feat: add optional topology filter (#2544) Adds a `filter` option to topologies to allow filtering out duplicate notifications. --- packages/interface/src/topology/index.ts | 28 +- packages/libp2p/src/registrar.ts | 15 + packages/libp2p/test/registrar/errors.spec.ts | 54 ++ .../libp2p/test/registrar/protocols.spec.ts | 58 ++ .../libp2p/test/registrar/registrar.spec.ts | 629 +++++++++--------- 5 files changed, 458 insertions(+), 326 deletions(-) create mode 100644 packages/libp2p/test/registrar/errors.spec.ts create mode 100644 packages/libp2p/test/registrar/protocols.spec.ts diff --git a/packages/interface/src/topology/index.ts b/packages/interface/src/topology/index.ts index 76bb64c1ee..1df12a601e 100644 --- a/packages/interface/src/topology/index.ts +++ b/packages/interface/src/topology/index.ts @@ -1,10 +1,36 @@ import type { Connection } from '../connection/index.js' import type { PeerId } from '../peer-id/index.js' +/** + * A topology filter + * + * @see https://libp2p.github.io/js-libp2p/functions/_libp2p_peer_collections.peerFilter-1.html + */ +export interface TopologyFilter { + has (peerId: PeerId): boolean + add (peerId: PeerId): void + remove (peerId: PeerId): void +} + +/** + * A topology is a network overlay that contains a subset of peers in the + * complete network. + * + * It is a way to be notified when peers that support a given protocol connect + * to or disconnect from the current node. + */ export interface Topology { + /** + * An optional filter can prevent duplicate topology notifications for the + * same peer. + */ + filter?: TopologyFilter + /** * If true, invoke `onConnect` for this topology on transient (e.g. short-lived - * and/or data-limited) connections. (default: false) + * and/or data-limited) connections + * + * @default false */ notifyOnTransient?: boolean diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index d6f669d724..0936096c41 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -164,6 +164,11 @@ export class DefaultRegistrar implements Registrar { } for (const topology of topologies.values()) { + if (topology.filter?.has(remotePeer) === false) { + continue + } + + topology.filter?.remove(remotePeer) topology.onDisconnect?.(remotePeer) } } @@ -195,6 +200,11 @@ export class DefaultRegistrar implements Registrar { } for (const topology of topologies.values()) { + if (topology.filter?.has(peer.id) === false) { + continue + } + + topology.filter?.remove(peer.id) topology.onDisconnect?.(peer.id) } } @@ -222,6 +232,11 @@ export class DefaultRegistrar implements Registrar { continue } + if (topology.filter?.has(peerId) === true) { + continue + } + + topology.filter?.add(peerId) topology.onConnect?.(peerId, connection) } } diff --git a/packages/libp2p/test/registrar/errors.spec.ts b/packages/libp2p/test/registrar/errors.spec.ts new file mode 100644 index 0000000000..ff78d2097d --- /dev/null +++ b/packages/libp2p/test/registrar/errors.spec.ts @@ -0,0 +1,54 @@ +/* eslint-env mocha */ + +import { TypedEventEmitter, type ConnectionGater, type PeerId } from '@libp2p/interface' +import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { PersistentPeerStore } from '@libp2p/peer-store' +import { expect } from 'aegir/chai' +import { MemoryDatastore } from 'datastore-core/memory' +import { stubInterface } from 'sinon-ts' +import { defaultComponents } from '../../src/components.js' +import { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import { DefaultRegistrar } from '../../src/registrar.js' +import type { Components } from '../../src/components.js' +import type { Registrar, TransportManager } from '@libp2p/interface-internal' + +describe('registrar errors', () => { + let components: Components + let registrar: Registrar + let peerId: PeerId + + before(async () => { + peerId = await createEd25519PeerId() + const events = new TypedEventEmitter() + components = defaultComponents({ + peerId, + events, + datastore: new MemoryDatastore(), + upgrader: mockUpgrader({ events }), + transportManager: stubInterface(), + connectionGater: stubInterface() + }) + components.peerStore = new PersistentPeerStore(components) + components.connectionManager = new DefaultConnectionManager(components, { + minConnections: 50, + maxConnections: 1000, + inboundUpgradeTimeout: 1000 + }) + registrar = new DefaultRegistrar(components) + }) + + it('should fail to register a protocol if no multicodec is provided', () => { + // @ts-expect-error invalid parameters + return expect(registrar.register()).to.eventually.be.rejected() + }) + + it('should fail to register a protocol if an invalid topology is provided', () => { + const fakeTopology = { + random: 1 + } + + // @ts-expect-error invalid parameters + return expect(registrar.register(fakeTopology)).to.eventually.be.rejected() + }) +}) diff --git a/packages/libp2p/test/registrar/protocols.spec.ts b/packages/libp2p/test/registrar/protocols.spec.ts new file mode 100644 index 0000000000..698cfcdd08 --- /dev/null +++ b/packages/libp2p/test/registrar/protocols.spec.ts @@ -0,0 +1,58 @@ +/* eslint-env mocha */ + +import { yamux } from '@chainsafe/libp2p-yamux' +import { mplex } from '@libp2p/mplex' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { plaintext } from '@libp2p/plaintext' +import { webSockets } from '@libp2p/websockets' +import { expect } from 'aegir/chai' +import pDefer from 'p-defer' +import { createLibp2pNode } from '../../src/libp2p.js' +import type { Components } from '../../src/components.js' +import type { Libp2pNode } from '../../src/libp2p.js' + +describe('registrar protocols', () => { + let libp2p: Libp2pNode + + it('should be able to register and unregister a handler', async () => { + const deferred = pDefer() + + libp2p = await createLibp2pNode({ + peerId: await createEd25519PeerId(), + transports: [ + webSockets() + ], + streamMuxers: [ + yamux(), + mplex() + ], + connectionEncryption: [ + plaintext() + ], + services: { + test: (components: any) => { + deferred.resolve(components) + } + } + }) + + const components = await deferred.promise + + const registrar = components.registrar + + expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1']) + + const echoHandler = (): void => {} + await libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) + expect(registrar.getHandler('/echo/1.0.0')).to.have.property('handler', echoHandler) + expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler) + + await libp2p.unhandle(['/echo/1.0.0']) + expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0']) + expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler) + + await expect(libp2p.peerStore.get(libp2p.peerId)).to.eventually.have.deep.property('protocols', [ + '/echo/1.0.1' + ]) + }) +}) diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index a0ca3995f4..3e0671d540 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -1,437 +1,416 @@ /* eslint-env mocha */ -import { yamux } from '@chainsafe/libp2p-yamux' -import { TypedEventEmitter, type TypedEventTarget, type Libp2pEvents, type ConnectionGater, type PeerId, type PeerStore, type Topology } from '@libp2p/interface' +import { TypedEventEmitter } from '@libp2p/interface' import { matchPeerId } from '@libp2p/interface-compliance-tests/matchers' -import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks' +import { mockDuplex, mockMultiaddrConnection, mockConnection } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' -import { mplex } from '@libp2p/mplex' +import { peerFilter } from '@libp2p/peer-collections' import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { PersistentPeerStore } from '@libp2p/peer-store' -import { plaintext } from '@libp2p/plaintext' -import { webSockets } from '@libp2p/websockets' import { expect } from 'aegir/chai' -import { MemoryDatastore } from 'datastore-core/memory' import pDefer from 'p-defer' -import { type StubbedInstance, stubInterface } from 'sinon-ts' -import { type Components, defaultComponents } from '../../src/components.js' -import { DefaultConnectionManager } from '../../src/connection-manager/index.js' -import { createLibp2pNode, type Libp2pNode } from '../../src/libp2p.js' +import { stubInterface } from 'sinon-ts' import { DefaultRegistrar } from '../../src/registrar.js' -import type { ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' +import type { TypedEventTarget, Libp2pEvents, PeerId, PeerStore, Topology, Peer } from '@libp2p/interface' +import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' +import type { StubbedInstance } from 'sinon-ts' const protocol = '/test/1.0.0' -describe('registrar', () => { - let components: Components +describe('registrar topologies', () => { let registrar: Registrar let peerId: PeerId - let libp2p: Libp2pNode before(async () => { peerId = await createEd25519PeerId() }) - describe('errors', () => { - beforeEach(() => { - const events = new TypedEventEmitter() - components = defaultComponents({ - peerId, - events, - datastore: new MemoryDatastore(), - upgrader: mockUpgrader({ events }), - transportManager: stubInterface(), - connectionGater: stubInterface() - }) - components.peerStore = new PersistentPeerStore(components) - components.connectionManager = new DefaultConnectionManager(components, { - minConnections: 50, - maxConnections: 1000, - inboundUpgradeTimeout: 1000 - }) - registrar = new DefaultRegistrar(components) - }) + let connectionManager: StubbedInstance + let peerStore: StubbedInstance + let events: TypedEventTarget - it('should fail to register a protocol if no multicodec is provided', () => { - // @ts-expect-error invalid parameters - return expect(registrar.register()).to.eventually.be.rejected() + beforeEach(async () => { + peerId = await createEd25519PeerId() + connectionManager = stubInterface() + peerStore = stubInterface() + events = new TypedEventEmitter() + + registrar = new DefaultRegistrar({ + peerId, + connectionManager, + peerStore, + events, + logger: defaultLogger() }) + }) - it('should fail to register a protocol if an invalid topology is provided', () => { - const fakeTopology = { - random: 1 - } + it('should be able to register a protocol', async () => { + const topology: Topology = { + onConnect: () => { }, + onDisconnect: () => { } + } - // @ts-expect-error invalid parameters - return expect(registrar.register(fakeTopology)).to.eventually.be.rejected() - }) + expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) + + const identifier = await registrar.register(protocol, topology) + + expect(identifier).to.exist() + expect(registrar.getTopologies(protocol)).to.have.lengthOf(1) }) - describe('registration', () => { - let registrar: Registrar - let peerId: PeerId - let connectionManager: StubbedInstance - let peerStore: StubbedInstance - let events: TypedEventTarget - - beforeEach(async () => { - peerId = await createEd25519PeerId() - connectionManager = stubInterface() - peerStore = stubInterface() - events = new TypedEventEmitter() - - registrar = new DefaultRegistrar({ - peerId, - connectionManager, - peerStore, - events, - logger: defaultLogger() - }) - }) + it('should be able to unregister a protocol', async () => { + const topology: Topology = { + onConnect: () => { }, + onDisconnect: () => { } + } - it('should be able to register a protocol', async () => { - const topology: Topology = { - onConnect: () => { }, - onDisconnect: () => { } - } + expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) - expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) + const identifier = await registrar.register(protocol, topology) - const identifier = await registrar.register(protocol, topology) + expect(registrar.getTopologies(protocol)).to.have.lengthOf(1) - expect(identifier).to.exist() - expect(registrar.getTopologies(protocol)).to.have.lengthOf(1) - }) + registrar.unregister(identifier) - it('should be able to unregister a protocol', async () => { - const topology: Topology = { - onConnect: () => { }, - onDisconnect: () => { } - } + expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) + }) - expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) + it('should not error if unregistering unregistered topology handler', () => { + registrar.unregister('bad-identifier') + }) - const identifier = await registrar.register(protocol, topology) + it('should call onConnect handler for connected peers after register', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() - expect(registrar.getTopologies(protocol)).to.have.lengthOf(1) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - registrar.unregister(identifier) + // return connection from connection manager + connectionManager.getConnections.withArgs(remotePeerId).returns([conn]) - expect(registrar.getTopologies(protocol)).to.have.lengthOf(0) - }) + const topology: Topology = { + onConnect: (peerId, connection) => { + expect(peerId.equals(remotePeerId)).to.be.true() + expect(connection.id).to.eql(conn.id) - it('should not error if unregistering unregistered topology handler', () => { - registrar.unregister('bad-identifier') + onConnectDefer.resolve() + }, + onDisconnect: (peerId) => { + expect(peerId.equals(remotePeerId)).to.be.true() + + onDisconnectDefer.resolve() + } + } + + // Register protocol + await registrar.register(protocol, topology) + + // Peer data is in the peer store + peerStore.get.withArgs(matchPeerId(remotePeerId)).resolves({ + id: remotePeerId, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() }) - it('should call onConnect handler for connected peers after register', async () => { - const onConnectDefer = pDefer() - const onDisconnectDefer = pDefer() + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + await onConnectDefer.promise - // Setup connections before registrar - const remotePeerId = await createEd25519PeerId() - const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + // remote peer disconnects + await conn.close() + events.safeDispatchEvent('peer:disconnect', { + detail: remotePeerId + }) + await onDisconnectDefer.promise + }) - // return connection from connection manager - connectionManager.getConnections.withArgs(remotePeerId).returns([conn]) + it('should call onConnect handler after register, once a peer is connected and protocols are updated', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() - const topology: Topology = { - onConnect: (peerId, connection) => { - expect(peerId.equals(remotePeerId)).to.be.true() - expect(connection.id).to.eql(conn.id) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - onConnectDefer.resolve() - }, - onDisconnect: (peerId) => { - expect(peerId.equals(remotePeerId)).to.be.true() + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) - onDisconnectDefer.resolve() - } + const topology: Topology = { + onConnect: () => { + onConnectDefer.resolve() + }, + onDisconnect: () => { + onDisconnectDefer.resolve() } + } - // Register protocol - await registrar.register(protocol, topology) + // Register protocol + await registrar.register(protocol, topology) - // Peer data is in the peer store - peerStore.get.withArgs(matchPeerId(remotePeerId)).resolves({ - id: remotePeerId, - addresses: [], + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, protocols: [protocol], - metadata: new Map(), - tags: new Map() - }) - - // remote peer connects - events.safeDispatchEvent('peer:identify', { - detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn - } - }) - await onConnectDefer.promise + connection: conn + } + }) - // remote peer disconnects - await conn.close() - events.safeDispatchEvent('peer:disconnect', { - detail: remotePeerId - }) - await onDisconnectDefer.promise + // Can get details after identify + peerStore.get.withArgs(matchPeerId(conn.remotePeer)).resolves({ + id: conn.remotePeer, + addresses: [], + protocols: [protocol], + metadata: new Map(), + tags: new Map() }) - it('should call onConnect handler after register, once a peer is connected and protocols are updated', async () => { - const onConnectDefer = pDefer() - const onDisconnectDefer = pDefer() + // we have a connection to this peer + connectionManager.getConnections.withArgs(matchPeerId(conn.remotePeer)).returns([conn]) - // Setup connections before registrar - const remotePeerId = await createEd25519PeerId() - const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + // identify completes + events.safeDispatchEvent('peer:update', { + detail: { + peer: { + id: conn.remotePeer, + protocols: [protocol], + addresses: [], + metadata: new Map() + } + } + }) - // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + await onConnectDefer.promise - const topology: Topology = { - onConnect: () => { - onConnectDefer.resolve() + // Peer no longer supports the protocol our topology is registered for + events.safeDispatchEvent('peer:update', { + detail: { + peer: { + id: conn.remotePeer, + protocols: [], + addresses: [], + metadata: new Map() }, - onDisconnect: () => { - onDisconnectDefer.resolve() + previous: { + id: conn.remotePeer, + protocols: [protocol], + addresses: [], + metadata: new Map() } } + }) - // Register protocol - await registrar.register(protocol, topology) + await onDisconnectDefer.promise + }) - // remote peer connects - events.safeDispatchEvent('peer:identify', { - detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn - } - }) + it('should not call topology handlers for transient connection', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() - // Can get details after identify - peerStore.get.withArgs(matchPeerId(conn.remotePeer)).resolves({ - id: conn.remotePeer, - addresses: [], - protocols: [protocol], - metadata: new Map(), - tags: new Map() - }) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // we have a connection to this peer - connectionManager.getConnections.withArgs(matchPeerId(conn.remotePeer)).returns([conn]) + // connection is transient + conn.transient = true - // identify completes - events.safeDispatchEvent('peer:update', { - detail: { - peer: { - id: conn.remotePeer, - protocols: [protocol], - addresses: [], - metadata: new Map() - } - } - }) + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) - await onConnectDefer.promise + const topology: Topology = { + onConnect: () => { + onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection')) + }, + onDisconnect: () => { + onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection')) + } + } - // Peer no longer supports the protocol our topology is registered for - events.safeDispatchEvent('peer:update', { - detail: { - peer: { - id: conn.remotePeer, - protocols: [], - addresses: [], - metadata: new Map() - }, - previous: { - id: conn.remotePeer, - protocols: [protocol], - addresses: [], - metadata: new Map() - } - } - }) + // register topology for protocol + await registrar.register(protocol, topology) - await onDisconnectDefer.promise + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) - it('should not call topology handlers for transient connection', async () => { - const onConnectDefer = pDefer() - const onDisconnectDefer = pDefer() + await expect(Promise.any([ + onConnectDefer.promise, + onDisconnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 1000) + }) + ])).to.eventually.not.be.rejected() + }) - // Setup connections before registrar - const remotePeerId = await createEd25519PeerId() - const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + it('should call topology onConnect handler for transient connection when explicitly requested', async () => { + const onConnectDefer = pDefer() - // connection is transient - conn.transient = true + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + // connection is transient + conn.transient = true - const topology: Topology = { - onConnect: () => { - onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection')) - }, - onDisconnect: () => { - onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection')) - } + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + onConnectDefer.resolve() } + } - // Register topology for protocol - await registrar.register(protocol, topology) + // register topology for protocol + await registrar.register(protocol, topology) - // remote peer connects - events.safeDispatchEvent('peer:identify', { - detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn - } - }) - - await expect(Promise.any([ - onConnectDefer.promise, - onDisconnectDefer.promise, - new Promise((resolve) => { - setTimeout(() => { - resolve() - }, 1000) - }) - ])).to.eventually.not.be.rejected() + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) - it('should call topology onConnect handler for transient connection when explicitly requested', async () => { - const onConnectDefer = pDefer() - - // Setup connections before registrar - const remotePeerId = await createEd25519PeerId() - const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) - // connection is transient - conn.transient = true + it('should call topology handlers for non-transient connection opened after transient connection', async () => { + const onConnectDefer = pDefer() + let callCount = 0 - // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + callCount++ - const topology: Topology = { - notifyOnTransient: true, - onConnect: () => { + if (callCount === 2) { onConnectDefer.resolve() } } + } - // Register topology for protocol - await registrar.register(protocol, topology) + // register topology for protocol + await registrar.register(protocol, topology) - // remote peer connects - events.safeDispatchEvent('peer:identify', { - detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn - } - }) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + transientConnection.transient = true - await expect(onConnectDefer.promise).to.eventually.be.undefined() - }) + const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + nonTransientConnection.transient = false - it('should call topology handlers for non-transient connection opened after transient connection', async () => { - const onConnectDefer = pDefer() - let callCount = 0 + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ + transientConnection, + nonTransientConnection + ]) - const topology: Topology = { - notifyOnTransient: true, - onConnect: () => { - callCount++ + // remote peer connects over transient connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: transientConnection + } + }) - if (callCount === 2) { - onConnectDefer.resolve() - } - } + // remote peer opens non-transient connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: nonTransientConnection } + }) - // Register topology for protocol - await registrar.register(protocol, topology) + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) - // Setup connections before registrar - const remotePeerId = await createEd25519PeerId() - const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - transientConnection.transient = true + it('should use a filter to prevent duplicate onConnect notifications', async () => { + const topology: Topology = stubInterface({ + filter: peerFilter(1024) + }) - const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - nonTransientConnection.transient = false + // register topology for protocol + await registrar.register(protocol, topology) - // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ - transientConnection, - nonTransientConnection - ]) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const connection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // remote peer connects over transient connection + // remote peer runs identify a few times + for (let i = 0; i < 5; i++) { events.safeDispatchEvent('peer:identify', { detail: { peerId: remotePeerId, protocols: [protocol], - connection: transientConnection + connection } }) + } - // remote peer opens non-transient connection - events.safeDispatchEvent('peer:identify', { + // remote peer updates details a few times + for (let i = 0; i < 5; i++) { + events.safeDispatchEvent('peer:update', { detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: nonTransientConnection - } - }) - - await expect(onConnectDefer.promise).to.eventually.be.undefined() - }) - - it('should be able to register and unregister a handler', async () => { - const deferred = pDefer() - - libp2p = await createLibp2pNode({ - peerId: await createEd25519PeerId(), - transports: [ - webSockets() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - test: (components: any) => { - deferred.resolve(components) + peer: { + id: remotePeerId, + protocols: [protocol] + }, + previous: { + protocols: [] } } }) + } - const components = await deferred.promise + // should only have notified once + expect(topology.onConnect).to.have.property('callCount', 1) + }) - const registrar = components.registrar + it('should use a filter to prevent onDisconnect notifications that had no previous onConnect notification', async () => { + const topology: Topology = stubInterface({ + filter: peerFilter(1024) + }) - expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1']) + // register topology for protocol + await registrar.register(protocol, topology) - const echoHandler = (): void => {} - await libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) - expect(registrar.getHandler('/echo/1.0.0')).to.have.property('handler', echoHandler) - expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler) + // setup connections before registrar + const remotePeerId = await createEd25519PeerId() - await libp2p.unhandle(['/echo/1.0.0']) - expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0']) - expect(registrar.getHandler('/echo/1.0.1')).to.have.property('handler', echoHandler) + // peer exists in peer store with the regsitered protocol + peerStore.get.withArgs(remotePeerId).resolves(stubInterface({ + protocols: [protocol] + })) - await expect(libp2p.peerStore.get(libp2p.peerId)).to.eventually.have.deep.property('protocols', [ - '/echo/1.0.1' - ]) + // the peer disconnects + events.safeDispatchEvent('peer:disconnect', { + detail: remotePeerId }) + + // should not have notified + expect(topology.onConnect).to.have.property('called', false) + expect(topology.onDisconnect).to.have.property('called', false) }) })