From 67a63763fcf7d4aa92c2da0bc6d029698031d9cf Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 25 Aug 2020 13:44:16 +0200 Subject: [PATCH] chore: update pubsub (#108) BREAKING CHANGE: using libp2p-interface pubsub with new API --- .aegir.js | 66 +++++ package.json | 26 +- src/index.js | 324 +++---------------------- test/2-nodes.spec.js | 330 ------------------------- test/compliance.spec.js | 36 +++ test/emit-self.spec.js | 68 ------ test/fixtures/peers.js | 26 ++ test/fixtures/relay.js | 13 + test/floodsub.spec.js | 107 ++++++++ test/multiple-nodes.spec.js | 469 ------------------------------------ test/pubsub.spec.js | 142 ----------- test/utils/create-peer.js | 125 ++++++++++ test/utils/index.js | 55 ----- 13 files changed, 416 insertions(+), 1371 deletions(-) create mode 100644 .aegir.js delete mode 100644 test/2-nodes.spec.js create mode 100644 test/compliance.spec.js delete mode 100644 test/emit-self.spec.js create mode 100644 test/fixtures/peers.js create mode 100644 test/fixtures/relay.js create mode 100644 test/floodsub.spec.js delete mode 100644 test/multiple-nodes.spec.js delete mode 100644 test/pubsub.spec.js create mode 100644 test/utils/create-peer.js diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000000..f679dcc03e --- /dev/null +++ b/.aegir.js @@ -0,0 +1,66 @@ +'use strict' + +/** + * This file uses aegir hooks to + * set up a libp2p instance for browser nodes to relay through + * before tests start + */ + +const Libp2p = require('libp2p') +const PeerId = require('peer-id') + +const WS = require('libp2p-websockets') +const MPLEX = require('libp2p-mplex') +const { NOISE } = require('libp2p-noise') + +const RelayPeer = require('./test/fixtures/relay') + +let libp2p + +const before = async () => { + // Use the last peer + const peerId = await PeerId.createFromJSON(RelayPeer) + + libp2p = new Libp2p({ + addresses: { + listen: [RelayPeer.multiaddr] + }, + peerId, + modules: { + transport: [WS], + streamMuxer: [MPLEX], + connEncryption: [NOISE] + }, + config: { + relay: { + enabled: true, + hop: { + enabled: true, + active: false + } + }, + pubsub: { + enabled: false + } + } + }) + + await libp2p.start() +} + +const after = async () => { + await libp2p.stop() +} + +module.exports = { + hooks: { + pre: before, + post: after + }, + webpack: { + node: { + // this is needed until bcrypto stops using node buffers in browser code + Buffer: true + } + } +} diff --git a/package.json b/package.json index 5488e8af20..83cd6dca31 100644 --- a/package.json +++ b/package.json @@ -42,30 +42,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-floodsub#readme", "devDependencies": { - "aegir": "^25.0.0", + "aegir": "^26.0.0", "benchmark": "^2.1.4", "chai": "^4.2.0", - "chai-spies": "^1.0.0", - "detect-node": "^2.0.4", - "dirty-chai": "^2.0.1", - "it-pair": "^1.0.0", - "lodash": "^4.17.15", + "ipfs-utils": "^3.0.0", + "libp2p": "https://github.com/libp2p/js-libp2p#0.29.x", + "libp2p-mplex": "^0.10.0", + "libp2p-noise": "^2.0.0", + "libp2p-websockets": "^0.14.0", "multiaddr": "^8.0.0", - "p-defer": "^3.0.0", + "p-wait-for": "^3.1.0", + "peer-id": "^0.14.00", "sinon": "^9.0.1" }, "dependencies": { - "async.nexttick": "^0.5.2", "debug": "^4.1.1", - "it-length-prefixed": "^3.0.0", - "it-pipe": "^1.0.1", - "libp2p-pubsub": "~0.6.0", - "p-map": "^4.0.0", - "peer-id": "~0.14.0", - "protons": "^2.0.0", + "libp2p-interfaces": "^0.5.1", "time-cache": "^0.3.0", "uint8arrays": "^1.1.0" }, + "peerDependencies": { + "libp2p": "https://github.com/libp2p/js-libp2p#0.29.x" + }, "contributors": [ "David Dias ", "Vasco Santos ", diff --git a/src/index.js b/src/index.js index 4be158f87b..678406b4d2 100644 --- a/src/index.js +++ b/src/index.js @@ -1,39 +1,12 @@ 'use strict' -const debug = require('debug') const debugName = 'libp2p:floodsub' -const log = debug(debugName) -log.error = debug(`${debugName}:error`) -const pipe = require('it-pipe') -const lp = require('it-length-prefixed') -const pMap = require('p-map') const TimeCache = require('time-cache') -const nextTick = require('async.nexttick') -const PeerId = require('peer-id') -const BaseProtocol = require('libp2p-pubsub') -const { message, utils } = require('libp2p-pubsub') -const { multicodec } = require('./config') - -const ensureArray = utils.ensureArray - -function validateRegistrar (registrar) { - if (typeof registrar !== 'object') { - throw new Error('a registrar object is required') - } - - if (typeof registrar.handle !== 'function') { - throw new Error('a handle function must be provided in registrar') - } - - if (typeof registrar.register !== 'function') { - throw new Error('a register function must be provided in registrar') - } +const BaseProtocol = require('libp2p-interfaces/src/pubsub') +const { utils } = require('libp2p-interfaces/src/pubsub') - if (typeof registrar.unregister !== 'function') { - throw new Error('a unregister function must be provided in registrar') - } -} +const { multicodec } = require('./config') /** * FloodSub (aka dumbsub is an implementation of pubsub focused on @@ -42,309 +15,74 @@ function validateRegistrar (registrar) { */ class FloodSub extends BaseProtocol { /** - * @param {PeerId} peerId instance of the peer's PeerId - * @param {Object} registrar - * @param {function} registrar.handle - * @param {function} registrar.register - * @param {function} registrar.unregister + * @param {Libp2p} libp2p instance of libp2p * @param {Object} [options] * @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to false * @constructor */ - constructor (peerId, registrar, options = {}) { - if (!PeerId.isPeerId(peerId)) { - throw new Error('peerId must be an instance of `peer-id`') - } - - validateRegistrar(registrar) - + constructor (libp2p, options = {}) { super({ debugName: debugName, multicodecs: multicodec, - peerId: peerId, - registrar: registrar, + libp2p, + canRelayMessage: true, ...options }) - /** - * List of our subscriptions - * @type {Set} - */ - this.subscriptions = new Set() - /** * Cache of seen messages * * @type {TimeCache} */ this.seenCache = new TimeCache() - - /** - * Pubsub options - */ - this._options = { - emitSelf: false, - ...options - } - - this._onRpc = this._onRpc.bind(this) } /** - * Peer connected successfully with pubsub protocol. + * Process incoming message + * Extends base implementation to check router cache. * @override - * @param {PeerId} peerId peer id - * @param {Connection} conn connection to the peer + * @param {InMessage} message The message to process * @returns {Promise} */ - async _onPeerConnected (peerId, conn) { - await super._onPeerConnected(peerId, conn) - const idB58Str = peerId.toB58String() - const peer = this.peers.get(idB58Str) - - if (peer && peer.isWritable) { - // Immediately send my own subscriptions to the newly established conn - peer.sendSubscriptions(this.subscriptions) - } - } - - /** - * Overriding the implementation of _processConnection should keep the connection and is - * responsible for processing each RPC message received by other peers. - * @override - * @param {string} idB58Str peer id string in base58 - * @param {Connection} conn connection - * @param {Peer} peer peer - * @returns {void} - * - */ - async _processMessages (idB58Str, conn, peer) { - const onRpcFunc = this._onRpc - try { - await pipe( - conn, - lp.decode(), - async function (source) { - for await (const data of source) { - const rpc = data instanceof Uint8Array ? data : data.slice() - - onRpcFunc(idB58Str, message.rpc.RPC.decode(rpc)) - } - } - ) - } catch (err) { - this._onPeerDisconnected(peer.id, err) - } - } - - /** - * Called for each RPC call received from the given peer - * @private - * @param {string} idB58Str b58 string PeerId of the connected peer - * @param {rpc.RPC} rpc The pubsub RPC message - */ - _onRpc (idB58Str, rpc) { - if (!rpc) { - return - } - - log('rpc from', idB58Str) - const subs = rpc.subscriptions - const msgs = rpc.msgs - - if (msgs && msgs.length) { - msgs.forEach((msg) => this._processRpcMessage(msg)) - } - - const peer = this.peers.get(idB58Str) - - if (peer && subs && subs.length) { - peer.updateSubscriptions(subs) - this.emit('floodsub:subscription-change', peer.id, peer.topics, subs) - } - } - - /** - * @private - * @param {rpc.RPC.Message} message The message to process - * @returns {void} - */ async _processRpcMessage (message) { - const msg = utils.normalizeInRpcMessage(message) - const seqno = utils.msgId(msg.from, msg.seqno) - // 1. check if I've seen the message, if yes, ignore + // Check if I've seen the message, if yes, ignore + const seqno = this.getMsgId(message) if (this.seenCache.has(seqno)) { return } - this.seenCache.put(seqno) - // 2. validate the message (signature verification) - let isValid - let error - - try { - isValid = await this.validate(message) - } catch (err) { - error = err - } - - if (error || !isValid) { - log('Message could not be validated, dropping it. isValid=%s', isValid, error) - return - } - - // 3. if message is valid, emit to self - this._emitMessages(msg.topicIDs, [msg]) - - // 4. if message is valid, propagate msg to others - this._forwardMessages(msg.topicIDs, [msg]) - } - - _emitMessages (topics, messages) { - topics.forEach((topic) => { - if (!this.subscriptions.has(topic)) { - return - } - - messages.forEach((message) => { - this.emit(topic, message) - }) - }) - } - - _forwardMessages (topics, messages) { - this.peers.forEach((peer) => { - if (!peer.isWritable || !utils.anyMatch(peer.topics, topics)) { - return - } - - peer.sendMessages(utils.normalizeOutRpcMessages(messages)) - - log('publish msgs on topics', topics, peer.id.toB58String()) - }) + await super._processRpcMessage(message) } /** - * Unmounts the floodsub protocol and shuts down every connection + * Publish message created. Forward it to the peers. * @override - * @returns {Promise} - */ - async stop () { - await super.stop() - - this.subscriptions = new Set() - } - - /** - * Publish messages to the given topics. - * @override - * @param {Array|string} topics - * @param {Array|any} messages - * @returns {Promise} - */ - async publish (topics, messages) { - if (!this.started) { - throw new Error('FloodSub is not started') - } - - log('publish', topics, messages) - - topics = ensureArray(topics) - messages = ensureArray(messages) - - const from = this.peerId.toB58String() - - const buildMessage = (msg) => { - const seqno = utils.randomSeqno() - this.seenCache.put(utils.msgId(from, seqno)) - - const message = { - from: from, - data: msg, - seqno: seqno, - topicIDs: topics - } - - // Emit to self if I'm interested and it is enabled - this._options.emitSelf && this._emitMessages(topics, [message]) - - return this._buildMessage(message) - } - - const msgObjects = await pMap(messages, buildMessage) - - // send to all the other peers - this._forwardMessages(topics, msgObjects) - } - - /** - * Subscribe to the given topic(s). - * @override - * @param {Array|string} topics + * @param {InMessage} message * @returns {void} */ - subscribe (topics) { - if (!this.started) { - throw new Error('FloodSub is not started') - } - - topics = ensureArray(topics) - topics.forEach((topic) => this.subscriptions.add(topic)) - - this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) - - // make sure that FloodSub is already mounted - function sendSubscriptionsOnceReady (peer) { - if (peer && peer.isWritable) { - return peer.sendSubscriptions(topics) - } - const onConnection = () => { - peer.removeListener('connection', onConnection) - sendSubscriptionsOnceReady(peer) - } - peer.on('connection', onConnection) - peer.once('close', () => peer.removeListener('connection', onConnection)) - } + _publish (message) { + this._forwardMessage(message) } /** - * Unsubscribe from the given topic(s). - * @override - * @param {Array|string} topics + * Forward message to peers. + * @param {InMessage} message * @returns {void} */ - unsubscribe (topics) { - if (!this.started) { - throw new Error('FloodSub is not started') - } - - topics = ensureArray(topics) - - topics.forEach((topic) => this.subscriptions.delete(topic)) - - this.peers.forEach((peer) => checkIfReady(peer)) - // make sure that FloodSub is already mounted - function checkIfReady (peer) { - if (peer && peer.isWritable) { - peer.sendUnsubscriptions(topics) - } else { - nextTick(checkIfReady.bind(peer)) + _forwardMessage (message) { + message.topicIDs.forEach((topic) => { + const peers = this.topics.get(topic) + if (!peers) { + return } - } - } - - /** - * Get the list of topics which the peer is subscribed to. - * @override - * @returns {Array} - */ - getTopics () { - if (!this.started) { - throw new Error('FloodSub is not started') - } - - return Array.from(this.subscriptions) + peers.forEach((id) => { + this.log('publish msgs on topics', message.topicIDs, id) + if (id !== this.peerId.toB58String()) { + this._sendRpc(id, { msgs: [utils.normalizeOutRpcMessage(message)] }) + } + }) + }) } } diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js deleted file mode 100644 index 1d9f7b5d8a..0000000000 --- a/test/2-nodes.spec.js +++ /dev/null @@ -1,330 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-spies')) -const expect = chai.expect -const uint8ArrayFromString = require('uint8arrays/from-string') -const uint8ArrayToString = require('uint8arrays/to-string') -const pDefer = require('p-defer') -const times = require('lodash/times') - -const FloodSub = require('../src') -const { multicodec } = require('../src') -const { - defOptions, - first, - createPeerId, - createMockRegistrar, - expectSet, - ConnectionPair -} = require('./utils') - -function shouldNotHappen (_) { - expect.fail() -} - -describe('basics between 2 nodes', () => { - describe('fresh nodes', () => { - let peerIdA, peerIdB - let fsA, fsB - - const registrarRecordA = {} - const registrarRecordB = {} - - // Mount pubsub protocol - before(async () => { - [peerIdA, peerIdB] = await Promise.all([ - createPeerId(), - createPeerId() - ]) - - fsA = new FloodSub(peerIdA, createMockRegistrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerIdB, createMockRegistrar(registrarRecordB), defOptions) - - expect(fsA.peers.size).to.be.eql(0) - expect(fsA.subscriptions.size).to.eql(0) - expect(fsB.peers.size).to.be.eql(0) - expect(fsB.subscriptions.size).to.eql(0) - }) - - // Start pubsub - before(() => Promise.all([ - fsA.start(), - fsB.start() - ])) - - // Connect floodsub nodes - before(async () => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - const handleA = registrarRecordA[multicodec].handler - const handleB = registrarRecordB[multicodec].handler - - // Notice peers of connection - const [c0, c1] = ConnectionPair() - await onConnectA(peerIdB, c0) - await onConnectB(peerIdA, c1) - - await handleB({ - protocol: multicodec, - stream: c1.stream, - connection: { - remotePeer: peerIdA - } - }) - - await handleA({ - protocol: multicodec, - stream: c0.stream, - connection: { - remotePeer: peerIdB - } - }) - - expect(fsA.peers.size).to.be.eql(1) - expect(fsB.peers.size).to.be.eql(1) - }) - - after(() => { - return Promise.all([ - fsA.started && fsA.stop(), - fsB.started && fsB.stop() - ]) - }) - - it('Subscribe to a topic:Z in nodeA', () => { - const defer = pDefer() - - fsA.subscribe('Z') - fsB.once('floodsub:subscription-change', (changedPeerId, changedTopics, changedSubs) => { - expectSet(fsA.subscriptions, ['Z']) - expect(fsB.peers.size).to.equal(1) - expectSet(first(fsB.peers).topics, ['Z']) - expect(changedPeerId.toB58String()).to.equal(first(fsB.peers).id.toB58String()) - expectSet(changedTopics, ['Z']) - expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: true }]) - defer.resolve() - }) - - return defer.promise - }) - - it('Publish to a topic:Z in nodeA', () => { - const defer = pDefer() - - fsA.once('Z', (msg) => { - expect(uint8ArrayToString(msg.data)).to.equal('hey') - fsB.removeListener('Z', shouldNotHappen) - defer.resolve() - }) - - fsB.once('Z', shouldNotHappen) - - fsA.publish('Z', uint8ArrayFromString('hey')) - - return defer.promise - }) - - it('Publish to a topic:Z in nodeB', () => { - const defer = pDefer() - - fsA.once('Z', (msg) => { - fsA.once('Z', shouldNotHappen) - expect(uint8ArrayToString(msg.data)).to.equal('banana') - - setTimeout(() => { - fsA.removeListener('Z', shouldNotHappen) - fsB.removeListener('Z', shouldNotHappen) - - defer.resolve() - }, 100) - }) - - fsB.once('Z', shouldNotHappen) - - fsB.publish('Z', uint8ArrayFromString('banana')) - - return defer.promise - }) - - it('Publish 10 msg to a topic:Z in nodeB', () => { - const defer = pDefer() - let counter = 0 - - fsB.once('Z', shouldNotHappen) - fsA.on('Z', receivedMsg) - - function receivedMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('banana') - expect(msg.from).to.be.eql(fsB.peerId.toB58String()) - expect(msg.seqno).to.be.a('Uint8Array') - expect(msg.topicIDs).to.be.eql(['Z']) - - if (++counter === 10) { - fsA.removeListener('Z', receivedMsg) - fsB.removeListener('Z', shouldNotHappen) - - defer.resolve() - } - } - times(10, () => fsB.publish('Z', uint8ArrayFromString('banana'))) - - return defer.promise - }) - - it('Publish 10 msg to a topic:Z in nodeB as array', () => { - const defer = pDefer() - let counter = 0 - - fsB.once('Z', shouldNotHappen) - fsA.on('Z', receivedMsg) - - function receivedMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('banana') - expect(msg.from).to.be.eql(fsB.peerId.toB58String()) - expect(msg.seqno).to.be.a('Uint8Array') - expect(msg.topicIDs).to.be.eql(['Z']) - - if (++counter === 10) { - fsA.removeListener('Z', receivedMsg) - fsB.removeListener('Z', shouldNotHappen) - - defer.resolve() - } - } - - const msgs = [] - times(10, () => msgs.push(uint8ArrayFromString('banana'))) - fsB.publish('Z', msgs) - - return defer.promise - }) - - it('Unsubscribe from topic:Z in nodeA', () => { - const defer = pDefer() - - fsA.unsubscribe('Z') - expect(fsA.subscriptions.size).to.equal(0) - - fsB.once('floodsub:subscription-change', (changedPeerId, changedTopics, changedSubs) => { - expect(fsB.peers.size).to.equal(1) - expectSet(first(fsB.peers).topics, []) - expect(changedPeerId.toB58String()).to.equal(first(fsB.peers).id.toB58String()) - expectSet(changedTopics, []) - expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: false }]) - - defer.resolve() - }) - - return defer.promise - }) - - it('Publish to a topic:Z in nodeA nodeB', () => { - const defer = pDefer() - - fsA.once('Z', shouldNotHappen) - fsB.once('Z', shouldNotHappen) - - setTimeout(() => { - fsA.removeListener('Z', shouldNotHappen) - fsB.removeListener('Z', shouldNotHappen) - defer.resolve() - }, 100) - - fsB.publish('Z', uint8ArrayFromString('banana')) - fsA.publish('Z', uint8ArrayFromString('banana')) - - return defer.promise - }) - }) - - describe('nodes send state on connection', () => { - let peerIdA, peerIdB - let fsA, fsB - - const registrarRecordA = {} - const registrarRecordB = {} - - // Mount pubsub protocol - before(async () => { - [peerIdA, peerIdB] = await Promise.all([ - createPeerId(), - createPeerId() - ]) - - fsA = new FloodSub(peerIdA, createMockRegistrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerIdB, createMockRegistrar(registrarRecordB), defOptions) - }) - - // Start pubsub - before(() => Promise.all([ - fsA.start(), - fsB.start() - ])) - - // Make subscriptions prior to new nodes - before(() => { - fsA.subscribe('Za') - fsB.subscribe('Zb') - - expect(fsA.peers.size).to.equal(0) - expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(0) - expectSet(fsB.subscriptions, ['Zb']) - }) - - after(() => { - return Promise.all([ - fsA.started && fsA.stop(), - fsB.started && fsB.stop() - ]) - }) - - it('existing subscriptions are sent upon peer connection', async () => { - const dial = async () => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - const handleA = registrarRecordA[multicodec].handler - const handleB = registrarRecordB[multicodec].handler - - // Notice peers of connection - const [c0, c1] = ConnectionPair() - await onConnectA(peerIdB, c0) - await handleB({ - protocol: multicodec, - stream: c1.stream, - connection: { - remotePeer: peerIdA - } - }) - - await onConnectB(peerIdA, c1) - await handleA({ - protocol: multicodec, - stream: c0.stream, - connection: { - remotePeer: peerIdB - } - }) - } - - await Promise.all([ - dial(), - new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)), - new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve)) - ]) - - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - - expectSet(fsA.subscriptions, ['Za']) - expectSet(first(fsB.peers).topics, ['Za']) - - expectSet(fsB.subscriptions, ['Zb']) - expectSet(first(fsA.peers).topics, ['Zb']) - }) - }) -}) diff --git a/test/compliance.spec.js b/test/compliance.spec.js new file mode 100644 index 0000000000..f652b9f5b3 --- /dev/null +++ b/test/compliance.spec.js @@ -0,0 +1,36 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('libp2p-interfaces/src/pubsub/tests') + +const Floodsub = require('../src') +const { createPeers } = require('./utils/create-peer') + +describe('interface compliance', () => { + let peers + let pubsubNodes = [] + + tests({ + async setup (number = 1, options = {}) { + peers = await createPeers({ number }) + + peers.forEach((peer) => { + const floodsub = new Floodsub(peer, { + emitSelf: true, + ...options + }) + + pubsubNodes.push(floodsub) + }) + + return pubsubNodes + }, + async teardown () { + await Promise.all(pubsubNodes.map(ps => ps.stop())) + peers.length && await Promise.all(peers.map(peer => peer.stop())) + + peers = undefined + pubsubNodes = [] + } + }) +}) diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js deleted file mode 100644 index 0bf22cf5df..0000000000 --- a/test/emit-self.spec.js +++ /dev/null @@ -1,68 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-spies')) -const expect = chai.expect -const uint8ArrayFromString = require('uint8arrays/from-string') -const FloodSub = require('../src') - -const { - createPeerId, mockRegistrar -} = require('./utils') - -const shouldNotHappen = (_) => expect.fail() - -describe('emit self', () => { - let floodsub - let peerId - const topic = 'Z' - - describe('enabled', () => { - before(async () => { - peerId = await createPeerId() - floodsub = new FloodSub(peerId, mockRegistrar, { emitSelf: true }) - }) - - before(async () => { - await floodsub.start() - - floodsub.subscribe(topic) - }) - - after(() => floodsub.stop()) - - it('should emit to self on publish', () => { - const promise = new Promise((resolve) => floodsub.once(topic, resolve)) - - floodsub.publish(topic, uint8ArrayFromString('hey')) - - return promise - }) - }) - - describe('disabled', () => { - before(async () => { - peerId = await createPeerId() - floodsub = new FloodSub(peerId, mockRegistrar, { emitSelf: false }) - }) - - before(async () => { - await floodsub.start() - - floodsub.subscribe(topic) - }) - - after(() => floodsub.stop()) - - it('should emit to self on publish', () => { - floodsub.once(topic, (m) => shouldNotHappen) - - floodsub.publish(topic, uint8ArrayFromString('hey')) - - // Wait 1 second to guarantee that self is not noticed - return new Promise((resolve) => setTimeout(() => resolve(), 1000)) - }) - }) -}) diff --git a/test/fixtures/peers.js b/test/fixtures/peers.js new file mode 100644 index 0000000000..e73697be5c --- /dev/null +++ b/test/fixtures/peers.js @@ -0,0 +1,26 @@ +'use strict' + +/** + * These peer id / keypairs are used across tests to seed peers + */ +module.exports = [{ + id: 'QmNMMAqSxPetRS1cVMmutW5BCN1qQQyEr4u98kUvZjcfEw', + privKey: 'CAASpQkwggShAgEAAoIBAQDPek2aeHMa0blL42RTKd6xgtkk4Zkldvq4LHxzcag5uXepiQzWANEUvoD3KcUTmMRmx14PvsxdLCNst7S2JSa0R2n5wSRs14zGy6892lx4H4tLBD1KSpQlJ6vabYM1CJhIQRG90BtzDPrJ/X1iJ2HA0PPDz0Mflam2QUMDDrU0IuV2m7gSCJ5r4EmMs3U0xnH/1gShkVx4ir0WUdoWf5KQUJOmLn1clTRHYPv4KL9A/E38+imNAXfkH3c2T7DrCcYRkZSpK+WecjMsH1dCX15hhhggNqfp3iulO1tGPxHjm7PDGTPUjpCWKpD5e50sLqsUwexac1ja6ktMfszIR+FPAgMBAAECggEAB2H2uPRoRCAKU+T3gO4QeoiJaYKNjIO7UCplE0aMEeHDnEjAKC1HQ1G0DRdzZ8sb0fxuIGlNpFMZv5iZ2ZFg2zFfV//DaAwTek9tIOpQOAYHUtgHxkj5FIlg2BjlflGb+ZY3J2XsVB+2HNHkUEXOeKn2wpTxcoJE07NmywkO8Zfr1OL5oPxOPlRN1gI4ffYH2LbfaQVtRhwONR2+fs5ISfubk5iKso6BX4moMYkxubYwZbpucvKKi/rIjUA3SK86wdCUnno1KbDfdXSgCiUlvxt/IbRFXFURQoTV6BOi3sP5crBLw8OiVubMr9/8WE6KzJ0R7hPd5+eeWvYiYnWj4QKBgQD6jRlAFo/MgPO5NZ/HRAk6LUG+fdEWexA+GGV7CwJI61W/Dpbn9ZswPDhRJKo3rquyDFVZPdd7+RlXYg1wpmp1k54z++L1srsgj72vlg4I8wkZ4YLBg0+zVgHlQ0kxnp16DvQdOgiRFvMUUMEgetsoIx1CQWTd67hTExGsW+WAZQKBgQDT/WaHWvwyq9oaZ8G7F/tfeuXvNTk3HIJdfbWGgRXB7lJ7Gf6FsX4x7PeERfL5a67JLV6JdiLLVuYC2CBhipqLqC2DB962aKMvxobQpSljBBZvZyqP1IGPoKskrSo+2mqpYkeCLbDMuJ1nujgMP7gqVjabs2zj6ACKmmpYH/oNowJ/T0ZVtvFsjkg+1VsiMupUARRQuPUWMwa9HOibM1NIZcoQV2NGXB5Z++kR6JqxQO0DZlKArrviclderUdY+UuuY4VRiSEprpPeoW7ZlbTku/Ap8QZpWNEzZorQDro7bnfBW91fX9/81ets/gCPGrfEn+58U3pdb9oleCOQc/ifpQKBgBTYGbi9bYbd9vgZs6bd2M2um+VFanbMytS+g5bSIn2LHXkVOT2UEkB+eGf9KML1n54QY/dIMmukA8HL1oNAyalpw+/aWj+9Ui5kauUhGEywHjSeBEVYM9UXizxz+m9rsoktLLLUI0o97NxCJzitG0Kub3gn0FEogsUeIc7AdinZAoGBANnM1vcteSQDs7x94TDEnvvqwSkA2UWyLidD2jXgE0PG4V6tTkK//QPBmC9eq6TIqXkzYlsErSw4XeKO91knFofmdBzzVh/ddgx/NufJV4tXF+a2iTpqYBUJiz9wpIKgf43/Ob+P1EA99GAhSdxz1ess9O2aTqf3ANzn6v6g62Pv', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDPek2aeHMa0blL42RTKd6xgtkk4Zkldvq4LHxzcag5uXepiQzWANEUvoD3KcUTmMRmx14PvsxdLCNst7S2JSa0R2n5wSRs14zGy6892lx4H4tLBD1KSpQlJ6vabYM1CJhIQRG90BtzDPrJ/X1iJ2HA0PPDz0Mflam2QUMDDrU0IuV2m7gSCJ5r4EmMs3U0xnH/1gShkVx4ir0WUdoWf5KQUJOmLn1clTRHYPv4KL9A/E38+imNAXfkH3c2T7DrCcYRkZSpK+WecjMsH1dCX15hhhggNqfp3iulO1tGPxHjm7PDGTPUjpCWKpD5e50sLqsUwexac1ja6ktMfszIR+FPAgMBAAE=' +}, { + id: 'QmW8rAgaaA6sRydK1k6vonShQME47aDxaFidbtMevWs73t', + privKey: 'CAASpwkwggSjAgEAAoIBAQCTU3gVDv3SRXLOsFln9GEf1nJ/uCEDhOG10eC0H9l9IPpVxjuPT1ep+ykFUdvefq3D3q+W3hbmiHm81o8dYv26RxZIEioToUWp7Ec5M2B/niYoE93za9/ZDwJdl7eh2hNKwAdxTmdbXUPjkIU4vLyHKRFbJIn9X8w9djldz8hoUvC1BK4L1XrT6F2l0ruJXErH2ZwI1youfSzo87TdXIoFKdrQLuW6hOtDCGKTiS+ab/DkMODc6zl8N47Oczv7vjzoWOJMUJs1Pg0ZsD1zmISY38P0y/QyEhatZn0B8BmSWxlLQuukatzOepQI6k+HtfyAAjn4UEqnMaXTP1uwLldVAgMBAAECggEAHq2f8MqpYjLiAFZKl9IUs3uFZkEiZsgx9BmbMAb91Aec+WWJG4OLHrNVTG1KWp+IcaQablEa9bBvoToQnS7y5OpOon1d066egg7Ymfmv24NEMM5KRpktCNcOSA0CySpPIB6yrg6EiUr3ixiaFUGABKkxmwgVz/Q15IqM0ZMmCUsC174PMAz1COFZxD0ZX0zgHblOJQW3dc0X3XSzhht8vU02SMoVObQHQfeXEHv3K/RiVj/Ax0bTc5JVkT8dm8xksTtsFCNOzRBqFS6MYqX6U/u0Onz3Jm5Jt7fLWb5n97gZR4SleyGrqxYNb46d9X7mP0ie7E6bzFW0DsWBIeAqVQKBgQDW0We2L1n44yOvJaMs3evpj0nps13jWidt2I3RlZXjWzWHiYQfvhWUWqps/xZBnAYgnN/38xbKzHZeRNhrqOo+VB0WK1IYl0lZVE4l6TNKCsLsUfQzsb1pePkd1eRZA+TSqsi+I/IOQlQU7HA0bMrah/5FYyUBP0jYvCOvYTlZuwKBgQCvkcVRydVlzjUgv7lY5lYvT8IHV5iYO4Qkk2q6Wjv9VUKAJZauurMdiy05PboWfs5kbETdwFybXMBcknIvZO4ihxmwL8mcoNwDVZHI4bXapIKMTCyHgUKvJ9SeTcKGC7ZuQJ8mslRmYox/HloTOXEJgQgPRxXcwa3amzvdZI+6LwKBgQCLsnQqgxKUi0m6bdR2qf7vzTH4258z6X34rjpT0F5AEyF1edVFOz0XU/q+lQhpNEi7zqjLuvbYfSyA026WXKuwSsz7jMJ/oWqev/duKgAjp2npesY/E9gkjfobD+zGgoS9BzkyhXe1FCdP0A6L2S/1+zg88WOwMvJxl6/xLl24XwKBgCm60xSajX8yIQyUpWBM9yUtpueJ2Xotgz4ST+bVNbcEAddll8gWFiaqgug9FLLuFu5lkYTHiPtgc1RNdphvO+62/9MRuLDixwh/2TPO+iNqwKDKJjda8Nei9vVddCPaOtU/xNQ0xLzFJbG9LBmvqH9izOCcu8SJwGHaTcNUeJj/AoGADCJ26cY30c13F/8awAAmFYpZWCuTP5ppTsRmjd63ixlrqgkeLGpJ7kYb5fXkcTycRGYgP0e1kssBGcmE7DuG955fx3ZJESX3GQZ+XfMHvYGONwF1EiK1f0p6+GReC2VlQ7PIkoD9o0hojM6SnWvv9EXNjCPALEbfPFFvcniKVsE=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCTU3gVDv3SRXLOsFln9GEf1nJ/uCEDhOG10eC0H9l9IPpVxjuPT1ep+ykFUdvefq3D3q+W3hbmiHm81o8dYv26RxZIEioToUWp7Ec5M2B/niYoE93za9/ZDwJdl7eh2hNKwAdxTmdbXUPjkIU4vLyHKRFbJIn9X8w9djldz8hoUvC1BK4L1XrT6F2l0ruJXErH2ZwI1youfSzo87TdXIoFKdrQLuW6hOtDCGKTiS+ab/DkMODc6zl8N47Oczv7vjzoWOJMUJs1Pg0ZsD1zmISY38P0y/QyEhatZn0B8BmSWxlLQuukatzOepQI6k+HtfyAAjn4UEqnMaXTP1uwLldVAgMBAAE=' +}, { + id: 'QmZqCdSzgpsmB3Qweb9s4fojAoqELWzqku21UVrqtVSKi4', + privKey: 'CAASpgkwggSiAgEAAoIBAQCdbSEsTmw7lp5HagRcx57DaLiSUEkh4iBcKc7Y+jHICEIA8NIVi9FlfGEZj9G21FpiTR4Cy+BLVEuf8Nm90bym4iV+cSumeS21fvD8xGTEbeKGljs6OYHy3M45JhWF85gqHQJOqZufI2NRDuRgMZEO2+qGEXmSlv9mMXba/+9ecze8nSpB7bG2Z2pnKDeYwhF9Cz+ElMyn7TBWDjJERGVgFbTpdM3rBnbhB/TGpvs732QqZmIBlxnDb/Jn0l1gNZCgkEDcJ/0NDMBJTQ8vbvcdmaw3eaMPLkn1ix4wdu9QWCA0IBtuY1R7vSUtf4irnLJG7DnAw2GfM5QrF3xF1GLXAgMBAAECggEAQ1N0qHoxl5pmvqv8iaFlqLSUmx5y6GbI6CGJMQpvV9kQQU68yjItr3VuIXx8d/CBZyEMAK4oko7OeOyMcr3MLKLy3gyQWnXgsopDjhZ/8fH8uwps8g2+IZuFJrO+6LaxEPGvFu06fOiphPUVfn40R2KN/iBjGeox+AaXijmCqaV2vEdNJJPpMfz6VKZBDLTrbiqvo/3GN1U99PUqfPWpOWR29oAhh/Au6blSqvqTUPXB2+D/X6e1JXv31mxMPK68atDHSUjZWKB9lE4FMK1bkSKJRbyXmNIlbZ9V8X4/0r8/6T7JnW7ZT8ugRkquohmwgG7KkDXB1YsOCKXYUqzVYQKBgQDtnopFXWYl7XUyePJ/2MA5i7eoko9jmF44L31irqmHc5unNf6JlNBjlxTNx3WyfzhUzrn3c18psnGkqtow0tkBj5hmqn8/WaPbc5UA/5R1FNaNf8W5khn7MDm6KtYRPjN9djqTDiVHyC6ljONYd+5S+MqyKVWZ3t/xvG60sw85qwKBgQCpmpDtL+2JBwkfeUr3LyDcQxvbfzcv8lXj2otopWxWiLiZF1HzcqgAa2CIwu9kCGEt9Zr+9E4uINbe1To0b01/FhvR6xKO/ukceGA/mBB3vsKDcRmvpBUp+3SmnhY0nOk+ArQl4DhJ34k8pDM3EDPrixPf8SfVdU/8IM32lsdHhQKBgHLgpvCKCwxjFLnmBzcPzz8C8TOqR3BbBZIcQ34l+wflOGdKj1hsfaLoM8KYn6pAHzfBCd88A9Hg11hI0VuxVACRL5jS7NnvuGwsIOluppNEE8Ys86aXn7/0vLPoab3EWJhbRE48FIHzobmft3nZ4XpzlWs02JGfUp1IAC2UM9QpAoGAeWy3pZhSr2/iEC5+hUmwdQF2yEbj8+fDpkWo2VrVnX506uXPPkQwE1zM2Bz31t5I9OaJ+U5fSpcoPpDaAwBMs1fYwwlRWB8YNdHY1q6/23svN3uZsC4BGPV2JnO34iMUudilsRg+NGVdk5TbNejbwx7nM8Urh59djFzQGGMKeSECgYA0QMCARPpdMY50Mf2xQaCP7HfMJhESSPaBq9V3xY6ToEOEnXgAR5pNjnU85wnspHp+82r5XrKfEQlFxGpj2YA4DRRmn239sjDa29qP42UNAFg1+C3OvXTht1d5oOabaGhU0udwKmkEKUbb0bG5xPQJ5qeSJ5T1gLzLk3SIP0GlSw==', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCdbSEsTmw7lp5HagRcx57DaLiSUEkh4iBcKc7Y+jHICEIA8NIVi9FlfGEZj9G21FpiTR4Cy+BLVEuf8Nm90bym4iV+cSumeS21fvD8xGTEbeKGljs6OYHy3M45JhWF85gqHQJOqZufI2NRDuRgMZEO2+qGEXmSlv9mMXba/+9ecze8nSpB7bG2Z2pnKDeYwhF9Cz+ElMyn7TBWDjJERGVgFbTpdM3rBnbhB/TGpvs732QqZmIBlxnDb/Jn0l1gNZCgkEDcJ/0NDMBJTQ8vbvcdmaw3eaMPLkn1ix4wdu9QWCA0IBtuY1R7vSUtf4irnLJG7DnAw2GfM5QrF3xF1GLXAgMBAAE=' +}, { + id: 'QmR5VwgsL7jyfZHAGyp66tguVrQhCRQuRc3NokocsCZ3fA', + privKey: 'CAASpwkwggSjAgEAAoIBAQCGXYU+uc2nn1zuJhfdFOl34upztnrD1gpHu58ousgHdGlGgYgbqLBAvIAauXdEL0+e30HofjA634SQxE+9nV+0FQBam1DDzHQlXsuwHV+2SKvSDkk4bVllMFpu2SJtts6VH+OXC/2ANJOm+eTALykQPYXgLIBxrhp/eD+Jz5r6wW2nq3k6OmYyK/4pgGzFjo5UyX+fa/171AJ68UPboFpDy6BZCcUjS0ondxPvD7cv5jMNqqMKIB/7rpi8n+Q3oeccRqVL56wH+FE3/QLjwYHwY6ILNRyvNXRqHjwBEXB2R5moXN0AFUWTw9rt3KhFiEjR1U81BTw5/xS7W2Iu0FgZAgMBAAECggEAS64HK8JZfE09eYGJNWPe8ECmD1C7quw21BpwVe+GVPSTizvQHswPohbKDMNj0srXDMPxCnNw1OgqcaOwyjsGuZaOoXoTroTM8nOHRIX27+PUqzaStS6aCG2IsiCozKUHjGTuupftS7XRaF4eIsUtWtFcQ1ytZ9pJYHypRQTi5NMSrTze5ThjnWxtHilK7gnBXik+aR0mYEVfSn13czQEC4rMOs+b9RAc/iibDNoLopfIdvmCCvfxzmySnR7Cu1iSUAONkir7PB+2Mt/qRFCH6P+jMamtCgQ8AmifXgVmDUlun+4MnKg3KrPd6ZjOEKhVe9mCHtGozk65RDREShfDdQKBgQDi+x2MuRa9peEMOHnOyXTS+v+MFcfmG0InsO08rFNBKZChLB+c9UHBdIvexpfBHigSyERfuDye4z6lxi8ZnierWMYJP30nxmrnxwTGTk1MQquhfs1A0kpmDnPsjlOS/drEIEIssNx2WbfJ7YtMxLWBtp+BJzGpQmr0LKC+NHRSrwKBgQCXiy2kJESIUkIs2ihV55hhT6/bZo1B1O5DPA2nkjOBXqXF6fvijzMDX82JjLd07lQZlI0n1Q/Hw0p4iYi9YVd2bLkLXF5UIb2qOeHj76enVFOrPHUSkC9Y2g/0Xs+60Ths2xRd8RrrfQU3kl5iVpBywkCIrb2M5+wRnNTk1W3TtwKBgQCvplyrteAfSurpJhs9JzE8w/hWU9SqAZYkWQp91W1oE95Um2yrbjBAoQxMjaqKS+f/APPIjy56Vqj4aHGyhW11b/Fw3qzfxvCcBKtxOs8eoMlo5FO6QgJJEA4tlcafDcvp0nzjUMqK28safLU7503+33B35fjMXxWdd5u9FaKfCQKBgC4W6j6tuRosymuRvgrCcRnHfpify/5loEFallyMnpWOD6Tt0OnK25z/GifnYDRz96gAAh5HMpFy18dpLOlMHamqz2yhHx8/U8vd5tHIJZlCkF/X91M5/uxrBccwvsT2tM6Got8fYSyVzWxlW8dUxIHiinYHQUsFjkqdBDLEpq5pAoGASoTw5RBEWFM0GuAZdXsyNyxU+4S+grkTS7WdW/Ymkukh+bJZbnvF9a6MkSehqXnknthmufonds2AFNS//63gixENsoOhzT5+2cdfc6tJECvJ9xXVXkf85AoQ6T/RrXF0W4m9yQyCngNJUrKUOIH3oDIfdZITlYzOC3u1ojj7VuQ=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCGXYU+uc2nn1zuJhfdFOl34upztnrD1gpHu58ousgHdGlGgYgbqLBAvIAauXdEL0+e30HofjA634SQxE+9nV+0FQBam1DDzHQlXsuwHV+2SKvSDkk4bVllMFpu2SJtts6VH+OXC/2ANJOm+eTALykQPYXgLIBxrhp/eD+Jz5r6wW2nq3k6OmYyK/4pgGzFjo5UyX+fa/171AJ68UPboFpDy6BZCcUjS0ondxPvD7cv5jMNqqMKIB/7rpi8n+Q3oeccRqVL56wH+FE3/QLjwYHwY6ILNRyvNXRqHjwBEXB2R5moXN0AFUWTw9rt3KhFiEjR1U81BTw5/xS7W2Iu0FgZAgMBAAE=' +}, { + id: 'QmScLDqRg7H6ipCYxm9fVk152UWavQFKscTdoT4YNHxgqp', + privKey: 'CAASpwkwggSjAgEAAoIBAQCWEHaTZ6LBLFP5OPrUqjDM/cF4b2zrfh1Zm3kd02ZtgQB3iYtZqRPJT5ctT3A7WdVF/7dCxPGOCkJlLekTx4Y4gD8JtjA+EfN9fR/2RBKbti2N3CD4vkGp9ss4hbBFcXIhl8zuD/ELHutbV6b8b4QXJGnxfp/B+1kNPnyd7SJznS0QyvI8OLI1nAkVKdYLDRW8kPKeHyx1xhdNDuTQVTFyAjRGQ4e3UYFB7bYIHW3E6kCtCoJDlj+JPC02Yt1LHzIzZVLvPvNFnYY2mag6OiGFuh/oMBIqvnPc1zRZ3eLUqeGZjQVaoR0kdgZUKz7Q2TBeNldxK/s6XO0DnkQTlelNAgMBAAECggEAdmt1dyswR2p4tdIeNpY7Pnj9JNIhTNDPznefI0dArCdBvBMhkVaYk6MoNIxcj6l7YOrDroAF8sXr0TZimMY6B/pERKCt/z1hPWTxRQBBAvnHhwvwRPq2jK6BfhAZoyM8IoBNKowP9mum5QUNdGV4Al8s73KyFX0IsCfgZSvNpRdlt+DzPh+hu/CyoZaMpRchJc1UmK8Fyk3KfO+m0DZNfHP5P08lXNfM6MZLgTJVVgERHyG+vBOzTd2RElMe19nVCzHwb3dPPRZSQ7Fnz3rA+GeLqsM2Zi4HNhfbD1OcD9C4wDj5tYL6hWTkdz4IlfVcjCeUHxgIOhdDV2K+OwbuAQKBgQD0FjUZ09UW2FQ/fitbvIB5f1SkXWPxTF9l6mAeuXhoGv2EtQUO4vq/PK6N08RjrZdWQy6UsqHgffi7lVQ8o3hvCKdbtf4sP+cM92OrY0WZV89os79ndj4tyvmnP8WojwRjt/2XEfgdoWcgWxW9DiYINTOQVimZX+X/3on4s8hEgQKBgQCdY3kOMbyQeLTRkqHXjVTY4ddO+v4S4wOUa1l4rTqAbq1W3JYWwoDQgFuIu3limIHmjnSJpCD4EioXFsM7p6csenoc20sHxsaHnJ6Mn5Te41UYmY9EW0otkQ0C3KbXM0hwQkjyplnEmZawGKmjEHW8DJ3vRYTv9TUCgYKxDHgOzQKBgB4A/NYH7BG61eBYKgxEx6YnuMfbkwV+Vdu5S8d7FQn3B2LgvZZu4FPRqcNVXLbEB+5ao8czjiKCWaj1Wj15+rvrXGcxn+Tglg5J+r5+nXeUC7LbJZQaPNp0MOwWMr3dlrSLUWjYlJ9Pz9VyXOG4c4Rexc/gR4zK9QLW4C7qKpwBAoGAZzyUb0cYlPtYQA+asTU3bnvVKy1f8yuNcZFowst+EDiI4u0WVh+HNzy6zdmLKa03p+/RaWeLaK0hhrubnEnAUmCUMNF3ScaM+u804LDcicc8TkKLwx7ObU0z56isl4RAA8K27tNHFrpYKXJD834cfBkaj5ReOrfw6Y/iFhhDuBECgYEA8gbC76uz7LSHhW30DSRTcqOzTyoe2oYKQaxuxYNp7vSSOkcdRen+mrdflDvud2q/zN2QdL4pgqdldHlR35M/lJ0f0B6zp74jlzbO9700wzsOqreezGc5eWiroDL100U9uIZ50BKb8CKtixIHpinUSPIUcVDkSAZ2y7mbfCxQwqQ=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCWEHaTZ6LBLFP5OPrUqjDM/cF4b2zrfh1Zm3kd02ZtgQB3iYtZqRPJT5ctT3A7WdVF/7dCxPGOCkJlLekTx4Y4gD8JtjA+EfN9fR/2RBKbti2N3CD4vkGp9ss4hbBFcXIhl8zuD/ELHutbV6b8b4QXJGnxfp/B+1kNPnyd7SJznS0QyvI8OLI1nAkVKdYLDRW8kPKeHyx1xhdNDuTQVTFyAjRGQ4e3UYFB7bYIHW3E6kCtCoJDlj+JPC02Yt1LHzIzZVLvPvNFnYY2mag6OiGFuh/oMBIqvnPc1zRZ3eLUqeGZjQVaoR0kdgZUKz7Q2TBeNldxK/s6XO0DnkQTlelNAgMBAAE=' +}] diff --git a/test/fixtures/relay.js b/test/fixtures/relay.js new file mode 100644 index 0000000000..47057a2cea --- /dev/null +++ b/test/fixtures/relay.js @@ -0,0 +1,13 @@ +'use strict' + +/** + * This peer id / keypair / multiaddr is used to seed a relay node, + * used in browser tests to coordinate / relay messages between browser peers + */ + +module.exports = { + id: 'QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN', + privKey: 'CAASpwkwggSjAgEAAoIBAQC1/GFud/7xutux7qRfMj1sIdMRh99/chR6HqVj6LQqrgk4jil0mdN/LCk/tqPqmDtObHdmEhCoybzuhLbCKgUqryKDwO6yBJHSKWY9QqrKZtLJ37SgKwGjE3+NUD4r1dJHhtQrICFdOdSCBzs/v8gi+J+KZLHo7+Nms4z09ysy7qZh94Pd7cW4gmSMergqUeANLD9C0ERw1NXolswOW7Bi7UGr7yuBxejICLO3nkxe0OtpQBrYrqdCD9vs3t/HQZbPWVoiRj4VO7fxkAPKLl30HzcIfxj/ayg8NHcH59d08D+N2v5Sdh28gsiYKIPE9CXvuw//HUY2WVRY5fDC5JglAgMBAAECggEBAKb5aN/1w3pBqz/HqRMbQpYLNuD33M3PexBNPAy+P0iFpDo63bh5Rz+A4lvuFNmzUX70MFz7qENlzi6+n/zolxMB29YtWBUH8k904rTEjXXl//NviQgITZk106tx+4k2x5gPEm57LYGfBOdFAUzNhzDnE2LkXwRNzkS161f7zKwOEsaGWRscj6UvhO4MIFxjb32CVwt5eK4yOVqtyMs9u30K4Og+AZYTlhtm+bHg6ndCCBO6CQurCQ3jD6YOkT+L3MotKqt1kORpvzIB0ujZRf49Um8wlcjC5G9aexBeGriXaVdPF62zm7GA7RMsbQM/6aRbA1fEQXvJhHUNF9UFeaECgYEA8wCjKqQA7UQnHjRwTsktdwG6szfxd7z+5MTqHHTWhWzgcQLgdh5/dO/zanEoOThadMk5C1Bqjq96gH2xim8dg5XQofSVtV3Ui0dDa+XRB3E3fyY4D3RF5hHv85O0GcvQc6DIb+Ja1oOhvHowFB1C+CT3yEgwzX/EK9xpe+KtYAkCgYEAv7hCnj/DcZFU3fAfS+unBLuVoVJT/drxv66P686s7J8UM6tW+39yDBZ1IcwY9vHFepBvxY2fFfEeLI02QFM+lZXVhNGzFkP90agNHK01psGgrmIufl9zAo8WOKgkLgbYbSHzkkDeqyjEPU+B0QSsZOCE+qLCHSdsnTmo/TjQhj0CgYAz1+j3yfGgrS+jVBC53lXi0+2fGspbf2jqKdDArXSvFqFzuudki/EpY6AND4NDYfB6hguzjD6PnoSGMUrVfAtR7X6LbwEZpqEX7eZGeMt1yQPMDr1bHrVi9mS5FMQR1NfuM1lP9Xzn00GIUpE7WVrWUhzDEBPJY/7YVLf0hFH08QKBgDWBRQZJIVBmkNrHktRrVddaSq4U/d/Q5LrsCrpymYwH8WliHgpeTQPWmKXwAd+ZJdXIzYjCt202N4eTeVqGYOb6Q/anV2WVYBbM4avpIxoA28kPGY6nML+8EyWIt2ApBOmgGgvtEreNzwaVU9NzjHEyv6n7FlVwlT1jxCe3XWq5AoGASYPKQoPeDlW+NmRG7z9EJXJRPVtmLL40fmGgtju9QIjLnjuK8XaczjAWT+ySI93Whu+Eujf2Uj7Q+NfUjvAEzJgwzuOd3jlQvoALq11kuaxlNQTn7rx0A1QhBgUJE8AkvShPC9FEnA4j/CLJU0re9H/8VvyN6qE0Mho0+YbjpP8=', + pubKey: 'CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC1/GFud/7xutux7qRfMj1sIdMRh99/chR6HqVj6LQqrgk4jil0mdN/LCk/tqPqmDtObHdmEhCoybzuhLbCKgUqryKDwO6yBJHSKWY9QqrKZtLJ37SgKwGjE3+NUD4r1dJHhtQrICFdOdSCBzs/v8gi+J+KZLHo7+Nms4z09ysy7qZh94Pd7cW4gmSMergqUeANLD9C0ERw1NXolswOW7Bi7UGr7yuBxejICLO3nkxe0OtpQBrYrqdCD9vs3t/HQZbPWVoiRj4VO7fxkAPKLl30HzcIfxj/ayg8NHcH59d08D+N2v5Sdh28gsiYKIPE9CXvuw//HUY2WVRY5fDC5JglAgMBAAE=', + multiaddr: '/ip4/127.0.0.1/tcp/15001/ws/p2p/QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN' +} diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js new file mode 100644 index 0000000000..dfba2935f1 --- /dev/null +++ b/test/floodsub.spec.js @@ -0,0 +1,107 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') +const sinon = require('sinon') +const uint8ArrayFromString = require('uint8arrays/from-string') + +const { utils } = require('libp2p-interfaces/src/pubsub') +const pWaitFor = require('p-wait-for') + +const Floodsub = require('../src') +const { createPeers } = require('./utils/create-peer') + +const defOptions = { + emitSelf: true +} + +const topic = 'my-topic' +const message = uint8ArrayFromString('a neat message') + +describe('floodsub', () => { + let floodsub1, floodsub2 + let peer1, peer2 + + before(async () => { + expect(Floodsub.multicodec).to.exist() + + ;[peer1, peer2] = await createPeers({ number: 2 }) + floodsub1 = new Floodsub(peer1, defOptions) + floodsub2 = new Floodsub(peer2, defOptions) + }) + + beforeEach(() => { + return Promise.all([ + floodsub1.start(), + floodsub2.start() + ]) + }) + + afterEach(async () => { + sinon.restore() + await floodsub1.stop() + await floodsub2.stop() + await peer1.stop() + await peer2.stop() + }) + + it('checks cache when processing incoming message', async () => { + sinon.spy(floodsub2.seenCache, 'has') + sinon.spy(floodsub2.seenCache, 'put') + sinon.spy(floodsub2, '_processRpcMessage') + sinon.spy(floodsub2, '_publish') + + let messageReceived = false + function checkMessage (msg) { + messageReceived = true + } + + // connect peers + await floodsub1._libp2p.dial(floodsub2._libp2p.peerId) + + // subscribe and wait for subscription to be received in the other peer + floodsub2.subscribe(topic) + floodsub2.on(topic, checkMessage) + await pWaitFor(() => { + const subs = floodsub1.getSubscribers(topic) + + return subs.length === 1 + }) + + await floodsub1.publish(topic, message) + await pWaitFor(() => messageReceived === true) + + expect(floodsub2.seenCache.has.callCount).to.eql(2) // Put also calls .has + expect(floodsub2.seenCache.put.callCount).to.eql(1) + expect(floodsub2._publish.callCount).to.eql(1) // Forward message + + const [msgProcessed] = floodsub2._processRpcMessage.getCall(0).args + + // Force a second process for the message + await floodsub2._processRpcMessage(msgProcessed) + + expect(floodsub2.seenCache.has.callCount).to.eql(3) + expect(floodsub2.seenCache.put.callCount).to.eql(1) // No new put + expect(floodsub2._publish.callCount).to.eql(1) // Not forwarded + }) + + it('forwards normalized messages on publish', async () => { + sinon.spy(floodsub1, '_forwardMessage') + sinon.spy(utils, 'randomSeqno') + + await floodsub1.publish(topic, message) + expect(floodsub1._forwardMessage.callCount).to.eql(1) + const [messageToEmit] = floodsub1._forwardMessage.getCall(0).args + + const expected = utils.normalizeInRpcMessage( + await floodsub1._buildMessage({ + receivedFrom: peer1.peerId.toB58String(), + from: peer1.peerId.toB58String(), + data: message, + seqno: utils.randomSeqno.getCall(0).returnValue, + topicIDs: [topic] + })) + + expect(messageToEmit).to.eql(expected) + }) +}) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js deleted file mode 100644 index 04aef9f819..0000000000 --- a/test/multiple-nodes.spec.js +++ /dev/null @@ -1,469 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 8] */ -'use strict' - -const { expect } = require('aegir/utils/chai') -const uint8ArrayFromString = require('uint8arrays/from-string') -const uint8ArrayToString = require('uint8arrays/to-string') -const pDefer = require('p-defer') - -const FloodSub = require('../src') -const { multicodec } = require('../src') -const { - createPeerId, - createMockRegistrar, - first, - expectSet, - ConnectionPair -} = require('./utils') - -async function spawnPubSubNode (peerId, reg) { - const ps = new FloodSub(peerId, reg, { emitSelf: true }) - - await ps.start() - return ps -} - -describe('multiple nodes (more than 2)', () => { - describe('every peer subscribes to the topic', () => { - describe('line', () => { - // line - // ◉────◉────◉ - // a b c - let psA, psB, psC - let peerIdA, peerIdB, peerIdC - - const registrarRecordA = {} - const registrarRecordB = {} - const registrarRecordC = {} - - before(async () => { - [peerIdA, peerIdB, peerIdC] = await Promise.all([ - createPeerId(), - createPeerId(), - createPeerId() - ]); - - [psA, psB, psC] = await Promise.all([ - spawnPubSubNode(peerIdA, createMockRegistrar(registrarRecordA)), - spawnPubSubNode(peerIdB, createMockRegistrar(registrarRecordB)), - spawnPubSubNode(peerIdC, createMockRegistrar(registrarRecordC)) - ]) - }) - - // connect nodes - before(async () => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - const onConnectC = registrarRecordC[multicodec].onConnect - const handleA = registrarRecordA[multicodec].handler - const handleB = registrarRecordB[multicodec].handler - const handleC = registrarRecordC[multicodec].handler - - // Notice peers of connection - const [d0, d1] = ConnectionPair() - await onConnectA(peerIdB, d0) - await handleB({ - protocol: multicodec, - stream: d1.stream, - connection: { - remotePeer: peerIdA - } - }) - await onConnectB(peerIdA, d1) - await handleA({ - protocol: multicodec, - stream: d0.stream, - connection: { - remotePeer: peerIdB - } - }) - - const [d2, d3] = ConnectionPair() - await onConnectB(peerIdC, d2) - await handleC({ - protocol: multicodec, - stream: d3.stream, - connection: { - remotePeer: peerIdB - } - }) - await onConnectC(peerIdB, d3) - await handleB({ - protocol: multicodec, - stream: d2.stream, - connection: { - remotePeer: peerIdC - } - }) - }) - - after(() => Promise.all([ - psA.stop(), - psB.stop(), - psC.stop() - ])) - - it('subscribe to the topic on node a', () => { - const defer = pDefer() - - psA.subscribe('Z') - expectSet(psA.subscriptions, ['Z']) - - psB.once('floodsub:subscription-change', () => { - expect(psB.peers.size).to.equal(2) - const aPeerId = psA.peerId.toB58String() - const topics = psB.peers.get(aPeerId).topics - expectSet(topics, ['Z']) - - expect(psC.peers.size).to.equal(1) - expectSet(first(psC.peers).topics, []) - - defer.resolve() - }) - - return defer.promise - }) - - it('subscribe to the topic on node b', async () => { - psB.subscribe('Z') - expectSet(psB.subscriptions, ['Z']) - - await Promise.all([ - new Promise((resolve) => psA.once('floodsub:subscription-change', resolve)), - new Promise((resolve) => psC.once('floodsub:subscription-change', resolve)) - ]) - - expect(psA.peers.size).to.equal(1) - expectSet(first(psA.peers).topics, ['Z']) - - expect(psC.peers.size).to.equal(1) - expectSet(first(psC.peers).topics, ['Z']) - }) - - it('subscribe to the topic on node c', () => { - const defer = pDefer() - - psC.subscribe('Z') - expectSet(psC.subscriptions, ['Z']) - - psB.once('floodsub:subscription-change', () => { - expect(psA.peers.size).to.equal(1) - expectSet(first(psA.peers).topics, ['Z']) - - expect(psB.peers.size).to.equal(2) - psB.peers.forEach((peer) => { - expectSet(peer.topics, ['Z']) - }) - - defer.resolve() - }) - - return defer.promise - }) - - it('publish on node a', () => { - const defer = pDefer() - - let counter = 0 - - psA.on('Z', incMsg) - psB.on('Z', incMsg) - psC.on('Z', incMsg) - - psA.publish('Z', uint8ArrayFromString('hey')) - - function incMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('hey') - check() - } - - function check () { - if (++counter === 3) { - psA.removeListener('Z', incMsg) - psB.removeListener('Z', incMsg) - psC.removeListener('Z', incMsg) - defer.resolve() - } - } - - return defer.promise - }) - - it('publish array on node a', () => { - const defer = pDefer() - let counter = 0 - - psA.on('Z', incMsg) - psB.on('Z', incMsg) - psC.on('Z', incMsg) - - psA.publish('Z', [uint8ArrayFromString('hey'), uint8ArrayFromString('hey')]) - - function incMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('hey') - check() - } - - function check () { - if (++counter === 6) { - psA.removeListener('Z', incMsg) - psB.removeListener('Z', incMsg) - psC.removeListener('Z', incMsg) - defer.resolve() - } - } - - return defer.promise - }) - - // since the topology is the same, just the publish - // gets sent by other peer, we reused the same peers - describe('1 level tree', () => { - // 1 level tree - // ┌◉┐ - // │b│ - // ◉─┘ └─◉ - // a c - - it('publish on node b', () => { - const defer = pDefer() - let counter = 0 - - psA.on('Z', incMsg) - psB.on('Z', incMsg) - psC.on('Z', incMsg) - - psB.publish('Z', uint8ArrayFromString('hey')) - - function incMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('hey') - check() - } - - function check () { - if (++counter === 3) { - psA.removeListener('Z', incMsg) - psB.removeListener('Z', incMsg) - psC.removeListener('Z', incMsg) - defer.resolve() - } - } - - return defer.promise - }) - }) - }) - - describe('2 level tree', () => { - // 2 levels tree - // ┌◉┐ - // │c│ - // ┌◉─┘ └─◉┐ - // │b d│ - // ◉─┘ └─◉ - // a - let psA, psB, psC, psD, psE - let peerIdA, peerIdB, peerIdC, peerIdD, peerIdE - - const registrarRecordA = {} - const registrarRecordB = {} - const registrarRecordC = {} - const registrarRecordD = {} - const registrarRecordE = {} - - before(async () => { - [peerIdA, peerIdB, peerIdC, peerIdD, peerIdE] = await Promise.all([ - createPeerId(), - createPeerId(), - createPeerId(), - createPeerId(), - createPeerId() - ]); - - [psA, psB, psC, psD, psE] = await Promise.all([ - spawnPubSubNode(peerIdA, createMockRegistrar(registrarRecordA)), - spawnPubSubNode(peerIdB, createMockRegistrar(registrarRecordB)), - spawnPubSubNode(peerIdC, createMockRegistrar(registrarRecordC)), - spawnPubSubNode(peerIdD, createMockRegistrar(registrarRecordD)), - spawnPubSubNode(peerIdE, createMockRegistrar(registrarRecordE)) - ]) - }) - - // connect nodes - before(async () => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - const onConnectC = registrarRecordC[multicodec].onConnect - const onConnectD = registrarRecordD[multicodec].onConnect - const onConnectE = registrarRecordE[multicodec].onConnect - const handleA = registrarRecordA[multicodec].handler - const handleB = registrarRecordB[multicodec].handler - const handleC = registrarRecordC[multicodec].handler - const handleD = registrarRecordD[multicodec].handler - const handleE = registrarRecordE[multicodec].handler - - // Notice peers of connection - const [d0, d1] = ConnectionPair() // A <-> B - await onConnectA(peerIdB, d0) - await handleB({ - protocol: multicodec, - stream: d1.stream, - connection: { - remotePeer: peerIdA - } - }) - await onConnectB(peerIdA, d1) - await handleA({ - protocol: multicodec, - stream: d0.stream, - connection: { - remotePeer: peerIdB - } - }) - - const [d2, d3] = ConnectionPair() // B <-> C - await onConnectB(peerIdC, d2) - await handleC({ - protocol: multicodec, - stream: d3.stream, - connection: { - remotePeer: peerIdB - } - }) - await onConnectC(peerIdB, d3) - await handleB({ - protocol: multicodec, - stream: d2.stream, - connection: { - remotePeer: peerIdC - } - }) - - const [d4, d5] = ConnectionPair() // C <-> D - await onConnectC(peerIdD, d4) - await handleD({ - protocol: multicodec, - stream: d5.stream, - connection: { - remotePeer: peerIdC - } - }) - await onConnectD(peerIdC, d5) - await handleC({ - protocol: multicodec, - stream: d4.stream, - connection: { - remotePeer: peerIdD - } - }) - - const [d6, d7] = ConnectionPair() // D <-> E - await onConnectD(peerIdE, d6) - await handleE({ - protocol: multicodec, - stream: d7.stream, - connection: { - remotePeer: peerIdD - } - }) - await onConnectE(peerIdD, d7) - await handleD({ - protocol: multicodec, - stream: d6.stream, - connection: { - remotePeer: peerIdE - } - }) - }) - - after(() => Promise.all([ - psA.stop(), - psB.stop(), - psC.stop(), - psD.stop(), - psE.stop() - ])) - - it('subscribes', () => { - psA.subscribe('Z') - expectSet(psA.subscriptions, ['Z']) - psB.subscribe('Z') - expectSet(psB.subscriptions, ['Z']) - psC.subscribe('Z') - expectSet(psC.subscriptions, ['Z']) - psD.subscribe('Z') - expectSet(psD.subscriptions, ['Z']) - psE.subscribe('Z') - expectSet(psE.subscriptions, ['Z']) - }) - - it('publishes from c', function () { - this.timeout(30 * 1000) - const defer = pDefer() - let counter = 0 - - psA.on('Z', incMsg) - psB.on('Z', incMsg) - psC.on('Z', incMsg) - psD.on('Z', incMsg) - psE.on('Z', incMsg) - - psC.publish('Z', uint8ArrayFromString('hey from c')) - - function incMsg (msg) { - expect(uint8ArrayToString(msg.data)).to.equal('hey from c') - check() - } - - function check () { - if (++counter === 5) { - psA.removeListener('Z', incMsg) - psB.removeListener('Z', incMsg) - psC.removeListener('Z', incMsg) - psD.removeListener('Z', incMsg) - psE.removeListener('Z', incMsg) - defer.resolve() - } - } - - return defer.promise - }) - }) - }) - - describe('only some nodes subscribe the networks', () => { - describe('line', () => { - // line - // ◉────◎────◉ - // a b c - - before(() => { }) - after(() => { }) - }) - - describe('1 level tree', () => { - // 1 level tree - // ┌◉┐ - // │b│ - // ◎─┘ └─◉ - // a c - - before(() => { }) - after(() => { }) - }) - - describe('2 level tree', () => { - // 2 levels tree - // ┌◉┐ - // │c│ - // ┌◎─┘ └─◉┐ - // │b d│ - // ◉─┘ └─◎ - // a e - - before(() => { }) - after(() => { }) - }) - }) -}) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js deleted file mode 100644 index f93950dd4f..0000000000 --- a/test/pubsub.spec.js +++ /dev/null @@ -1,142 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const { expect } = require('aegir/utils/chai') -const sinon = require('sinon') -const uint8ArrayFromString = require('uint8arrays/from-string') -const Floodsub = require('../src') -const { createPeerId, mockRegistrar } = require('./utils') -const { utils } = require('libp2p-pubsub') - -const defOptions = { - emitSelf: true -} - -describe('pubsub', () => { - let floodsub - let peerId - - before(async () => { - expect(Floodsub.multicodec).to.exist() - - peerId = await createPeerId() - floodsub = new Floodsub(peerId, mockRegistrar, defOptions) - }) - - beforeEach(() => { - return floodsub.start() - }) - - afterEach(() => { - sinon.restore() - return floodsub.stop() - }) - - describe('publish', () => { - it('should emit non normalized messages', async () => { - sinon.spy(floodsub, '_emitMessages') - sinon.spy(utils, 'randomSeqno') - - const topic = 'my-topic' - const message = uint8ArrayFromString('a neat message') - - await floodsub.publish(topic, message) - expect(floodsub._emitMessages.callCount).to.eql(1) - - const [topics, messages] = floodsub._emitMessages.getCall(0).args - expect(topics).to.eql([topic]) - expect(messages).to.eql([{ - from: peerId.toB58String(), - data: message, - seqno: utils.randomSeqno.getCall(0).returnValue, - topicIDs: topics - }]) - }) - - it('should forward normalized messages', async () => { - sinon.spy(floodsub, '_forwardMessages') - sinon.spy(utils, 'randomSeqno') - - const topic = 'my-topic' - const message = uint8ArrayFromString('a neat message') - - await floodsub.publish(topic, message) - expect(floodsub._forwardMessages.callCount).to.eql(1) - const [topics, messages] = floodsub._forwardMessages.getCall(0).args - - const expected = await floodsub._buildMessage({ - from: peerId.toB58String(), - data: message, - seqno: utils.randomSeqno.getCall(0).returnValue, - topicIDs: topics - }) - - expect(topics).to.eql([topic]) - expect(messages).to.eql([ - expected - ]) - }) - }) - - describe('validate', () => { - it('should drop unsigned messages', () => { - sinon.spy(floodsub, '_emitMessages') - sinon.spy(floodsub, '_forwardMessages') - sinon.spy(floodsub, 'validate') - - const topic = 'my-topic' - const rpc = { - subscriptions: [], - msgs: [{ - from: peerId.id, - data: uint8ArrayFromString('an unsigned message'), - seqno: utils.randomSeqno(), - topicIDs: [topic] - }] - } - - floodsub._onRpc('QmAnotherPeer', rpc) - - return new Promise((resolve) => { - setTimeout(() => { - expect(floodsub.validate.callCount).to.eql(1) - expect(floodsub._emitMessages.called).to.eql(false) - expect(floodsub._forwardMessages.called).to.eql(false) - - resolve() - }, 50) - }) - }) - - it('should not drop unsigned messages if strict signing is disabled', () => { - sinon.spy(floodsub, '_emitMessages') - sinon.spy(floodsub, '_forwardMessages') - sinon.spy(floodsub, 'validate') - sinon.stub(floodsub, 'strictSigning').value(false) - - const topic = 'my-topic' - const rpc = { - subscriptions: [], - msgs: [{ - from: peerId.id, - data: uint8ArrayFromString('an unsigned message'), - seqno: utils.randomSeqno(), - topicIDs: [topic] - }] - } - - floodsub._onRpc('QmAnotherPeer', rpc) - - return new Promise((resolve) => { - setTimeout(() => { - expect(floodsub.validate.callCount).to.eql(1) - expect(floodsub._emitMessages.called).to.eql(true) - expect(floodsub._forwardMessages.called).to.eql(true) - - resolve() - }, 50) - }) - }) - }) -}) diff --git a/test/utils/create-peer.js b/test/utils/create-peer.js new file mode 100644 index 0000000000..01a976af98 --- /dev/null +++ b/test/utils/create-peer.js @@ -0,0 +1,125 @@ +'use strict' + +/** + * These utilities rely on the fixtures defined in test/fixtures + * + * We create peers for use in browser/node environments + * configured to either connect directly (websocket listening multiaddr) + * or connecting through a well-known relay + */ + +const Libp2p = require('libp2p') +const multiaddr = require('multiaddr') +const PeerId = require('peer-id') + +const WS = require('libp2p-websockets') +const MPLEX = require('libp2p-mplex') +const { NOISE } = require('libp2p-noise') + +const { isNode } = require('ipfs-utils/src/env') + +const Peers = require('../fixtures/peers') +const RelayPeer = require('../fixtures/relay') + +const defaultConfig = { + modules: { + transport: [WS], + streamMuxer: [MPLEX], + connEncryption: [NOISE] + }, + config: { + pubsub: { + enabled: false + }, + peerDiscovery: { + autoDial: false + } + } +} + +/** + * Create libp2p node, selectively determining the listen address based on the operating environment + * If no peerId is given, default to the first peer in the fixtures peer list + */ +async function createPeer ({ peerId, started = true, config = {} } = {}) { + if (!peerId) { + peerId = await PeerId.createFromJSON(Peers[0]) + } + const libp2p = await Libp2p.create({ + peerId: peerId, + addresses: { + listen: [getListenAddress(peerId)] + }, + ...defaultConfig, + ...config + }) + + if (started) { + await libp2p.start() + } + + return libp2p +} + +/** + * Create libp2p nodes from known peer ids, preconfigured to use fixture peer ids + * @param {Object} [properties] + * @param {Object} [properties.config] + * @param {number} [properties.number] number of peers (default: 1). + * @param {boolean} [properties.started] nodes should start (default: true) + * @param {boolean} [properties.seedAddressBook] nodes should have each other in their addressbook + * @return {Promise>} + */ +async function createPeers ({ number = 1, started = true, seedAddressBook = true, config = {} } = {}) { + const peerIds = await Promise.all( + Array.from({ length: number }, (_, i) => Peers[i] ? PeerId.createFromJSON(Peers[i]) : PeerId.create()) + ) + const peers = await Promise.all( + Array.from({ length: number }, (_, i) => createPeer({ peerId: peerIds[i], started: false, config: config })) + ) + + if (started) { + await Promise.all( + peers.map((p) => p.start()) + ) + + if (seedAddressBook) { + addPeersToAddressBook(peers) + } + } + + return peers +} + +/** + * Selectively determine the listen address based on the operating environment + * + * If in node, use websocket address + * If in browser, use relay address + * @param {PeerId} peerId + * @return {multiaddr} + */ +function getListenAddress (peerId) { + if (isNode) { + // node + return multiaddr('/ip4/127.0.0.1/tcp/0/ws') + } else { + // browser + return multiaddr(`${RelayPeer.multiaddr}/p2p-circuit/p2p/${peerId.toB58String()}`) + } +} + +function addPeersToAddressBook (peers) { + for (let i = 0; i < peers.length; i++) { + for (let j = 0; j < peers.length; j++) { + if (i !== j) { + peers[i].peerStore.addressBook.set(peers[j].peerId, peers[j].multiaddrs) + } + } + } +} + +module.exports = { + createPeer, + createPeers +} diff --git a/test/utils/index.js b/test/utils/index.js index 6e04cf5ef1..232ffe8fae 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -1,8 +1,5 @@ 'use strict' -const PeerId = require('peer-id') -const DuplexPair = require('it-pair/duplex') - const { expect } = require('chai') exports.first = (map) => map.values().next().value @@ -11,58 +8,6 @@ exports.expectSet = (set, subs) => { expect(Array.from(set.values())).to.eql(subs) } -exports.createPeerId = async () => { - const peerId = await PeerId.create({ bits: 1024 }) - - return peerId -} - -exports.mockRegistrar = { - handle: () => {}, - register: () => {}, - unregister: () => {} -} - -exports.createMockRegistrar = (registrarRecord) => ({ - handle: (multicodecs, handler) => { - const rec = registrarRecord[multicodecs[0]] || {} - - registrarRecord[multicodecs[0]] = { - ...rec, - handler - } - }, - register: ({ multicodecs, _onConnect, _onDisconnect }) => { - const rec = registrarRecord[multicodecs[0]] || {} - - registrarRecord[multicodecs[0]] = { - ...rec, - onConnect: _onConnect, - onDisconnect: _onDisconnect - } - - return multicodecs[0] - }, - unregister: (id) => { - delete registrarRecord[id] - } -}) - -exports.ConnectionPair = () => { - const [d0, d1] = DuplexPair() - - return [ - { - stream: d0, - newStream: () => Promise.resolve({ stream: d0 }) - }, - { - stream: d1, - newStream: () => Promise.resolve({ stream: d1 }) - } - ] -} - exports.defOptions = { emitSelf: true }