From c687899df8956f8db1823b912b80de8d2d72290b Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sun, 18 Dec 2016 10:43:06 +0100 Subject: [PATCH] Clean up some todos & some refactoring (#12) --- .gitignore | 3 +- README.md | 13 +- benchmarks/index.js | 69 +++++++++ benchmarks/utils.js | 26 ++++ package.json | 27 ++-- src/dial-floodsub.js | 70 --------- src/index.js | 332 +++++++++++++++++++++++++++-------------- src/mount-floodsub.js | 102 ------------- src/peer.js | 151 +++++++++++++++++++ src/utils.js | 59 +++++++- test/2-nodes.js | 216 +++++++++------------------ test/multiple-nodes.js | 240 ++++++++++++++++++----------- test/utils.js | 37 +++++ test/utils.spec.js | 49 ++++++ 14 files changed, 855 insertions(+), 539 deletions(-) create mode 100644 benchmarks/index.js create mode 100644 benchmarks/utils.js delete mode 100644 src/dial-floodsub.js delete mode 100644 src/mount-floodsub.js create mode 100644 src/peer.js create mode 100644 test/utils.js create mode 100644 test/utils.spec.js diff --git a/.gitignore b/.gitignore index 254988dc81..0e9a0d0925 100644 --- a/.gitignore +++ b/.gitignore @@ -31,5 +31,6 @@ build # https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git node_modules -lib dist + +docs \ No newline at end of file diff --git a/README.md b/README.md index c71e30b843..067a220c67 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,6 @@ js-libp2p-floodsub ## Install -``` ```sh > npm install libp2p-floodsub ``` @@ -46,17 +45,7 @@ fsub.publish('fruit', new Buffer('banana')) ## API -### `ps.subscribe()` - -### `ps.unsubscribe()` - -### `ps.publish(, )` - -### `ps.on(, callback)` - -### `ps.getPeerSet()` - -### `ps.getSubscriptions()` +See https://libp2p.github.io/js-libp2p-floodsub ## Contribute diff --git a/benchmarks/index.js b/benchmarks/index.js new file mode 100644 index 0000000000..154329ecd7 --- /dev/null +++ b/benchmarks/index.js @@ -0,0 +1,69 @@ +'use strict' + +const Benchmark = require('benchmark') +const crypto = require('crypto') +const map = require('async/map') +const parallel = require('async/parallel') + +const PSG = require('../src') +const utils = require('../test/utils') + +const suite = new Benchmark.Suite('pubsub') + +// Simple benchmark, how many messages can we send from +// one node to another. + +map([0, 1], (i, cb) => { + utils.createNode('/ip4/127.0.0.1/tcp/0', (err, node) => { + if (err) { + return cb(err) + } + + cb(null, { + libp2p: node, + ps: new PSG(node) + }) + }) +}, (err, peers) => { + if (err) { + throw err + } + + parallel([ + (cb) => peers[0].libp2p.dialByPeerInfo(peers[1].libp2p.peerInfo, cb), + (cb) => setTimeout(() => { + peers[0].ps.subscribe('Z') + peers[1].ps.subscribe('Z') + cb(null, peers) + }, 200) + ], (err, res) => { + if (err) { + throw err + } + const peers = res[1] + + suite.add('publish and receive', (deferred) => { + const onMsg = (msg) => { + deferred.resolve() + peers[1].ps.removeListener('Z', onMsg) + } + + peers[1].ps.on('Z', onMsg) + + peers[0].ps.publish('Z', crypto.randomBytes(1024)) + }, { + defer: true + }) + + suite + .on('cycle', (event) => { + console.log(String(event.target)) + }) + .on('complete', () => { + process.exit() + }) + .run({ + async: true + }) + }) +}) diff --git a/benchmarks/utils.js b/benchmarks/utils.js new file mode 100644 index 0000000000..656b70d0c8 --- /dev/null +++ b/benchmarks/utils.js @@ -0,0 +1,26 @@ +'use strict' + +const Benchmark = require('benchmark') + +if (typeof window !== 'undefined') { + window.Benchmark = Benchmark +} + +const utils = require('../src/utils') + +const suite = new Benchmark.Suite('utils') + +let res = [] + +suite.add('randomSeqno', () => { + res.push(utils.randomSeqno()) +}) + +suite + .on('cycle', (event) => { + console.log(String(event.target)) + res = [] + }) + .run({ + async: true + }) diff --git a/package.json b/package.json index 370ac96489..b7a5be765b 100644 --- a/package.json +++ b/package.json @@ -13,9 +13,10 @@ "test:node:cli": "TEST=cli npm run test:node", "test:browser": "gulp test:browser", "build": "gulp build", - "release": "gulp release", - "release-minor": "gulp release --type minor", - "release-major": "gulp release --type major", + "docs": "aegir-docs", + "release": "gulp release --docs", + "release-minor": "gulp release --type minor --docs", + "release-major": "gulp release --type major --docs", "coverage-publish": "aegir-coverage publish" }, "pre-commit": [ @@ -41,28 +42,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-floodsub#readme", "devDependencies": { - "aegir": "^9.1.2", + "aegir": "^9.3.0", "async": "^2.1.4", + "benchmark": "^2.1.2", "chai": "^3.5.0", - "libp2p-ipfs-nodejs": "^0.16.0", + "libp2p-ipfs-nodejs": "^0.17.0", "lodash.times": "^4.3.2", "multiaddr": "^2.1.1", - "peer-id": "^0.8.0", + "peer-id": "^0.8.1", "peer-info": "^0.8.1", - "pre-commit": "^1.1.3" + "pre-commit": "^1.2.2" }, "dependencies": { - "debug": "^2.3.3", - "git-sha1": "^0.1.2", + "debug": "^2.4.5", "length-prefixed-stream": "^1.5.1", - "lodash.intersection": "^4.4.0", - "lodash.uniq": "^4.5.0", + "libp2p-crypto": "^0.7.5", "lodash.values": "^4.3.0", - "time-cache": "^0.2.3" + "pull-pushable": "^2.0.1", + "time-cache": "^0.3.0" }, "contributors": [ "David Dias ", "Gavin McDermott ", "Haad " ] -} \ No newline at end of file +} diff --git a/src/dial-floodsub.js b/src/dial-floodsub.js deleted file mode 100644 index d426599120..0000000000 --- a/src/dial-floodsub.js +++ /dev/null @@ -1,70 +0,0 @@ -'use strict' - -const config = require('./config') -const pb = require('./message') -const log = config.log -const multicodec = config.multicodec -const stream = require('stream') -const PassThrough = stream.PassThrough -const toPull = require('stream-to-pull-stream') -const lp = require('pull-length-prefixed') -const pull = require('pull-stream') - -module.exports = (libp2pNode, peerSet, subscriptions) => { - return (peerInfo) => { - const idB58Str = peerInfo.id.toB58String() - - // If already have a PubSub conn, ignore - if (peerSet[idB58Str] && peerSet[idB58Str].conn) { - return - } - - libp2pNode.dialByPeerInfo(peerInfo, multicodec, gotConn) - - function gotConn (err, conn) { - if (err) { - return log.err(err) - } - - // If already had a dial to me, just add the conn - if (peerSet[idB58Str]) { - peerSet[idB58Str].conn = conn - } else { - peerSet[idB58Str] = { - conn: conn, - peerInfo: peerInfo, - topics: [] - } - } - - // TODO change to pull-pushable - const pt1 = new PassThrough() - const pt2 = new PassThrough() - - peerSet[idB58Str].stream = pt1 - pt1.pipe(pt2) - const ptPull = toPull.duplex(pt2) - - pull( - ptPull, - lp.encode(), - conn - ) - - // Immediately send my own subscriptions to the newly established conn - if (subscriptions.length > 0) { - const subopts = subscriptions.map((topic) => { - return { - subscribe: true, - topicCID: topic - } - }) - const rpc = pb.rpc.RPC.encode({ - subscriptions: subopts - }) - - peerSet[idB58Str].stream.write(rpc) - } - } - } -} diff --git a/src/index.js b/src/index.js index 742ea4b749..cf8378227e 100644 --- a/src/index.js +++ b/src/index.js @@ -1,160 +1,274 @@ 'use strict' -const EE = require('events').EventEmitter -const util = require('util') +const EventEmitter = require('events') const TimeCache = require('time-cache') +const values = require('lodash.values') +const pull = require('pull-stream') +const lp = require('pull-length-prefixed') + +const Peer = require('./peer') const utils = require('./utils') const pb = require('./message') const config = require('./config') + const log = config.log -const _intersection = require('lodash.intersection') -const dialOnFloodSub = require('./dial-floodsub.js') -const mountFloodSub = require('./mount-floodsub.js') -const _values = require('lodash.values') +const multicodec = config.multicodec +const ensureArray = utils.ensureArray -module.exports = PubSubGossip +/** + * PubSubGossip, also known as pubsub-flood or just dumbsub, + * this implementation of pubsub focused on delivering an API + * for Publish/Subscribe, but with no CastTree Forming + * (it just floods the network). + */ +class FloodSub extends EventEmitter { + /** + * @param {Object} libp2p + * @returns {PubSubGossip} + */ + constructor (libp2p) { + super() -util.inherits(PubSubGossip, EE) + this.libp2p = libp2p -function PubSubGossip (libp2pNode, dagService) { - if (!(this instanceof PubSubGossip)) { - return new PubSubGossip(libp2pNode) - } + /** + * Time based cache for sequence numbers. + * + * @type {TimeCache} + */ + this.cache = new TimeCache() - EE.call(this) + /** + * Map of peers. + * + * @type {Map} + */ + this.peers = new Map() - const tc = new TimeCache() + /** + * List of our subscriptions + * @type {Set} + */ + this.subscriptions = new Set() - // map of peerIdBase58Str: { conn, topics, peerInfo } - const peerSet = {} + const onConnection = this._onConnection.bind(this) + this.libp2p.handle(multicodec, onConnection) - // list of our subscriptions - const subscriptions = [] + // Speed up any new peer that comes in my way + this.libp2p.swarm.on('peer-mux-established', (p) => { + this._dialPeer(p) + }) - // map of peerId: [] (size 10) - // check if not contained + newer than older - // if passes, shift, push, sort - // (if needed, i.e. not the newest) + // Dial already connected peers + values(this.libp2p.peerBook.getAll()).forEach((p) => { + this._dialPeer(p) + }) + } - const dial = dialOnFloodSub(libp2pNode, peerSet, subscriptions) - mountFloodSub(libp2pNode, peerSet, tc, subscriptions, this) + _dialPeer (peerInfo) { + const idB58Str = peerInfo.id.toB58String() + log('dialing %s', idB58Str) - this.publish = (topics, messages) => { - log('publish', topics, messages) - if (!Array.isArray(topics)) { - topics = [topics] - } - if (!Array.isArray(messages)) { - messages = [messages] + // If already have a PubSub conn, ignore + const peer = this.peers.get(idB58Str) + if (peer && peer.isConnected) { + return } - // emit to self if I'm interested - topics.forEach((topic) => { - if (subscriptions.indexOf(topic) !== -1) { - messages.forEach((message) => { - this.emit(topic, message) - }) + this.libp2p.dialByPeerInfo(peerInfo, multicodec, (err, conn) => { + if (err) { + return log.err(err) } + + this._onDial(peerInfo, conn) }) + } - // send to all the other peers - const peers = Object - .keys(peerSet) - .map((idB58Str) => peerSet[idB58Str]) - - peers.forEach((peer) => { - if (_intersection(peer.topics, topics).length > 0) { - const msgs = messages.map((message) => { - const msg = { - from: libp2pNode.peerInfo.id.toB58String(), - data: message, - seqno: new Buffer(utils.randomSeqno()), - topicCIDs: topics - } - tc.put(utils.msgId(msg.from, msg.seqno.toString())) - return msg - }) - const rpc = pb.rpc.RPC.encode({ - msgs: msgs - }) - - peer.stream.write(rpc) - log('publish msgs on topics', topics, peer.peerInfo.id.toB58String()) + _onDial (peerInfo, conn) { + const idB58Str = peerInfo.id.toB58String() + + // If already had a dial to me, just add the conn + if (!this.peers.has(idB58Str)) { + this.peers.set(idB58Str, new Peer(peerInfo)) + } + + const peer = this.peers.get(idB58Str) + peer.attachConnection(conn) + + // Immediately send my own subscriptions to the newly established conn + peer.sendSubscriptions(this.subscriptions) + } + + _onConnection (protocol, conn) { + conn.getPeerInfo((err, peerInfo) => { + if (err) { + log.err('Failed to identify incomming conn', err) + return pull(pull.empty(), conn) + } + + const idB58Str = peerInfo.id.toB58String() + + if (!this.peers.has(idB58Str)) { + log('new peer', idB58Str) + this.peers.set(idB58Str, new Peer(peerInfo)) } + + this._processConnection(idB58Str, conn) }) } - this.subscribe = (topics) => { - if (!Array.isArray(topics)) { - topics = [topics] + _processConnection (idB58Str, conn) { + pull( + conn, + lp.decode(), + pull.map((data) => pb.rpc.RPC.decode(data)), + pull.drain( + (rpc) => this._onRpc(idB58Str, rpc), + (err) => this._onConnectionEnd(err) + ) + ) + } + + _onRpc (idB58Str, rpc) { + if (!rpc) { + return } - topics.forEach((topic) => { - if (subscriptions.indexOf(topic) === -1) { - subscriptions.push(topic) + const subs = rpc.subscriptions + const msgs = rpc.msgs + + if (subs && subs.length) { + const peer = this.peers.get(idB58Str) + peer.updateSubscriptions(subs) + } + + if (msgs && msgs.length) { + this._processRpcMessages(rpc.msgs) + } + } + + _processRpcMessages (msgs) { + msgs.forEach((msg) => { + const seqno = utils.msgId(msg.from, msg.seqno.toString()) + // 1. check if I've seen the message, if yes, ignore + if (this.cache.has(seqno)) { + return } - }) - const peers = Object - .keys(peerSet) - .map((idB58Str) => peerSet[idB58Str]) + this.cache.put(seqno) - peers.forEach((peer) => { - const subopts = topics.map((topic) => { - return { - subscribe: true, - topicCID: topic - } - }) - const rpc = pb.rpc.RPC.encode({ - subscriptions: subopts - }) + // 2. emit to self + this._emitMessages(msg.topicCIDs, [msg.data]) - peer.stream.write(rpc) + // 3. propagate msg to others + this._forwardMessages(msg.topicCIDs, [msg]) }) } - this.unsubscribe = (topics) => { - if (!Array.isArray(topics)) { - topics = [topics] + _onConnectionEnd (idB58Str, err) { + // socket hang up, means the one side canceled + if (err && err.message !== 'socket hang up') { + log.err(err) } + this.peers.delete(idB58Str) + } + + _emitMessages (topics, messages) { topics.forEach((topic) => { - const index = subscriptions.indexOf(topic) - if (index > -1) { - subscriptions.splice(index, 1) + if (!this.subscriptions.has(topic)) { + return } - }) - _values(peerSet).forEach((peer) => { - const subopts = topics.map((topic) => { - return { - subscribe: false, - topicCID: topic - } + messages.forEach((message) => { + this.emit(topic, message) }) - const rpc = pb.rpc.RPC.encode({ - subscriptions: subopts - }) - - peer.stream.write(rpc) }) } - this.getPeerSet = () => { - return peerSet + _forwardMessages (topics, messages) { + this.peers.forEach((peer) => { + if (!peer.isWritable || + !utils.anyMatch(peer.topics, topics)) { + return + } + + peer.sendMessages(messages) + + log('publish msgs on topics', topics, peer.info.id.toB58String()) + }) } - this.getSubscriptions = () => { - return subscriptions + /** + * Publish messages to the given topics. + * + * @param {Array|string} topics + * @param {Array|any} messages + * @returns {undefined} + * + */ + publish (topics, messages) { + log('publish', topics, messages) + + topics = ensureArray(topics) + messages = ensureArray(messages) + + // Emit to self if I'm interested + this._emitMessages(topics, messages) + + const from = this.libp2p.peerInfo.id.toB58String() + + const buildMessage = (msg) => { + const seqno = utils.randomSeqno() + this.cache.put(utils.msgId(from, seqno)) + + return { + from: from, + data: msg, + seqno: new Buffer(seqno), + topicCIDs: topics + } + } + + // send to all the other peers + this._forwardMessages(topics, messages.map(buildMessage)) } - function onStart () { - const connectedPeers = libp2pNode.peerBook.getAll() - _values(connectedPeers).forEach(dial) + /** + * Subscribe to the given topic(s). + * + * @param {Array|string} topics + * @returns {undefined} + */ + subscribe (topics) { + topics = ensureArray(topics) + + topics.forEach((topic) => { + this.subscriptions.add(topic) + }) + + this.peers.forEach((peer) => { + peer.sendSubscriptions(topics) + }) } - onStart() - // speed up any new peer that comes in my way - libp2pNode.swarm.on('peer-mux-established', dial) + /** + * Unsubscribe from the given topic(s). + * + * @param {Array|string} topics + * @returns {undefined} + */ + unsubscribe (topics) { + topics = ensureArray(topics) + + topics.forEach((topic) => { + this.subscriptions.delete(topic) + }) + + this.peers.forEach((peer) => { + peer.sendUnsubscriptions(topics) + }) + } } + +module.exports = FloodSub diff --git a/src/mount-floodsub.js b/src/mount-floodsub.js deleted file mode 100644 index af185cda98..0000000000 --- a/src/mount-floodsub.js +++ /dev/null @@ -1,102 +0,0 @@ -'use strict' - -const config = require('./config') -const log = config.log -const multicodec = config.multicodec -const pull = require('pull-stream') -const _uniq = require('lodash.uniq') -const _intersection = require('lodash.intersection') -const lp = require('pull-length-prefixed') -const pb = require('./message') -const utils = require('./utils') - -module.exports = mountFloodSub - -function mountFloodSub (libp2pNode, peerSet, tc, subscriptions, ee) { - // note: we don't use the incomming conn to send, just to receive - libp2pNode.handle(multicodec, incConn) - - function incConn (protocol, conn) { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - log.err('Failed to identify incomming conn', err) - return pull( - pull.empty(), - conn - ) - } - - // populate - const idB58Str = peerInfo.id.toB58String() - - if (!peerSet[idB58Str]) { - peerSet[idB58Str] = { - peerInfo: peerInfo, - topics: [] - } - } - - // process the messages - pull( - conn, - lp.decode(), - pull.drain((data) => { - const rpc = pb.rpc.RPC.decode(data) - if (rpc.subscriptions) { - rpc.subscriptions.forEach((subopt) => { - if (subopt.subscribe) { - peerSet[idB58Str].topics.push(subopt.topicCID) - } else { - const index = peerSet[idB58Str].topics.indexOf(subopt.topicCID) - if (index > -1) { - peerSet[idB58Str].topics.splice(index, 1) - } - } - }) - - peerSet[idB58Str].topics = _uniq(peerSet[idB58Str].topics) - } - - if (rpc.msgs.length > 0) { - rpc.msgs.forEach((msg) => { - // 1. check if I've seen the message, if yes, ignore - if (tc.has(utils.msgId(msg.from, msg.seqno.toString()))) { - return - } else { - tc.put(utils.msgId(msg.from, msg.seqno.toString())) - } - - // 2. emit to self - msg.topicCIDs.forEach((topic) => { - if (subscriptions.indexOf(topic) !== -1) { - ee.emit(topic, msg.data) - } - }) - - // 3. propagate msg to others - const peers = Object - .keys(peerSet) - .map((idB58Str) => peerSet[idB58Str]) - - peers.forEach((peer) => { - if (_intersection(peer.topics, msg.topicCIDs).length > 0) { - const rpc = pb.rpc.RPC.encode({ - msgs: [msg] - }) - - peer.stream.write(rpc) - } - }) - }) - } - }, (err) => { - if (err) { - return log.err(err) - } - // TODO - // remove peer from peerSet - }) - ) - }) - } -} diff --git a/src/peer.js b/src/peer.js new file mode 100644 index 0000000000..16880e46f1 --- /dev/null +++ b/src/peer.js @@ -0,0 +1,151 @@ +'use strict' + +const lp = require('pull-length-prefixed') +const Pushable = require('pull-pushable') +const pull = require('pull-stream') + +const rpc = require('./message').rpc.RPC + +/** + * The known state of a connected peer. + */ +class Peer { + /** + * @param {PeerInfo} info + */ + constructor (info) { + /** + * @type {PeerInfo} + */ + this.info = info + /** + * @type {Connection} + */ + this.conn = null + /** + * @type {Set} + */ + this.topics = new Set() + /** + * @type {Pushable} + */ + this.stream = null + } + + /** + * Is the peer connected currently? + * + * @type {boolean} + */ + get isConnected () { + return Boolean(this.conn) + } + + /** + * Do we have a connection to write on? + * + * @type {boolean} + */ + get isWritable () { + return Boolean(this.stream) + } + + /** + * Send a message to this peer. + * Throws if there is no `stream` to write to available. + * + * @param {Buffer} msg + * @returns {undefined} + */ + write (msg) { + if (!this.isWritable) { + const id = this.info.id.toB58String() + throw new Error('No writable connection to ' + id) + } + + this.stream.push(msg) + } + + /** + * Attach the peer to a connection and setup a write stream + * + * @param {Connection} conn + * @returns {undefined} + */ + attachConnection (conn) { + this.conn = conn + this.stream = new Pushable() + + pull( + this.stream, + lp.encode(), + conn + ) + } + + _sendRawSubscriptions (topics, subscribe) { + if (topics.size === 0) { + return + } + + const subs = [] + topics.forEach((topic) => { + subs.push({ + subscribe: subscribe, + topicCID: topic + }) + }) + + this.write(rpc.encode({ + subscriptions: subs + })) + } + + /** + * Send the given subscriptions to this peer. + * @param {Set|Array} topics + * @returns {undefined} + */ + sendSubscriptions (topics) { + this._sendRawSubscriptions(topics, true) + } + + /** + * Send the given unsubscriptions to this peer. + * @param {Set|Array} topics + * @returns {undefined} + */ + sendUnsubscriptions (topics) { + this._sendRawSubscriptions(topics, false) + } + + /** + * Send messages to this peer. + * + * @param {Array} msgs + * @returns {undefined} + */ + sendMessages (msgs) { + this.write(rpc.encode({ + msgs: msgs + })) + } + + /** + * Bulk process subscription updates. + * + * @param {Array} changes + * @returns {undefined} + */ + updateSubscriptions (changes) { + changes.forEach((subopt) => { + if (subopt.subscribe) { + this.topics.add(subopt.topicCID) + } else { + this.topics.delete(subopt.topicCID) + } + }) + } +} + +module.exports = Peer diff --git a/src/utils.js b/src/utils.js index 1a0b00c1ce..33d76290c1 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,13 +1,68 @@ 'use strict' -const sha1 = require('git-sha1') +const crypto = require('libp2p-crypto') exports = module.exports +/** + * Generatea random sequence number. + * + * @returns {string} + * @private + */ exports.randomSeqno = () => { - return sha1((~~(Math.random() * 1e9)).toString(36) + Date.now()) + return crypto.randomBytes(20).toString('hex') } +/** + * Generate a message id, based on the `from` and `seqno`. + * + * @param {string} from + * @param {string} seqno + * @returns {string} + * @private + */ exports.msgId = (from, seqno) => { return from + seqno } + +/** + * Check if any member of the first set is also a member + * of the second set. + * + * @param {Set|Array} a + * @param {Set|Array} b + * @returns {boolean} + * @private + */ +exports.anyMatch = (a, b) => { + let bHas + if (Array.isArray(b)) { + bHas = (val) => b.indexOf(val) > -1 + } else { + bHas = (val) => b.has(val) + } + + for (let val of a) { + if (bHas(val)) { + return true + } + } + + return false +} + +/** + * Make everything an array. + * + * @param {any} maybeArray + * @returns {Array} + * @private + */ +exports.ensureArray = (maybeArray) => { + if (!Array.isArray(maybeArray)) { + return [maybeArray] + } + + return maybeArray +} diff --git a/test/2-nodes.js b/test/2-nodes.js index eb131a529f..37efa9dd9f 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -3,16 +3,15 @@ 'use strict' const expect = require('chai').expect -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') -const Node = require('libp2p-ipfs-nodejs') const parallel = require('async/parallel') const series = require('async/series') const _times = require('lodash.times') -const _values = require('lodash.values') const PSG = require('../src') +const utils = require('./utils') +const first = utils.first +const createNode = utils.createNode +const expectSet = utils.expectSet describe('basics', () => { let nodeA @@ -23,91 +22,55 @@ describe('basics', () => { describe('fresh nodes', () => { before((done) => { series([ - (cb) => { - PeerId.create((err, idA) => { - expect(err).to.not.exist - PeerInfo.create(idA, (err, peerA) => { - expect(err).to.not.exist - peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeA = new Node(peerA) - cb() - }) - }) - }, - (cb) => { - PeerId.create((err, idB) => { - expect(err).to.not.exist - PeerInfo.create(idB, (err, peerB) => { - expect(err).to.not.exist - peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeB = new Node(peerB) - cb() - }) - }) - }, - (cb) => { - parallel([ - (cb) => { - nodeA.start(cb) - }, - (cb) => { - nodeB.start(cb) - } - ], cb) + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (err, nodes) => { + if (err) { + return done(err) } - ], done) + nodeA = nodes[0] + nodeB = nodes[1] + done() + }) }) after((done) => { parallel([ - (cb) => { - nodeA.stop(cb) - }, - (cb) => { - nodeB.stop(cb) - } + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) ], done) }) it('Mount the pubsub protocol', (done) => { - parallel([ - (cb) => { - psA = new PSG(nodeA) - setTimeout(() => { - expect(psA.getPeerSet()).to.eql({}) - expect(psA.getSubscriptions()).to.eql([]) - cb() - }, 50) - }, - (cb) => { - psB = new PSG(nodeB) - setTimeout(() => { - expect(psB.getPeerSet()).to.eql({}) - expect(psB.getSubscriptions()).to.eql([]) - cb() - }, 50) - } - ], done) + psA = new PSG(nodeA) + psB = new PSG(nodeB) + + setTimeout(() => { + expect(psA.peers.size).to.be.eql(0) + expect(psA.subscriptions.size).to.eql(0) + expect(psB.peers.size).to.be.eql(0) + expect(psB.subscriptions.size).to.eql(0) + done() + }, 50) }) it('Dial from nodeA to nodeB', (done) => { - nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { - expect(err).to.not.exist - setTimeout(() => { - expect(Object.keys(psA.getPeerSet()).length).to.equal(1) - expect(Object.keys(psB.getPeerSet()).length).to.equal(1) - done() + series([ + (cb) => nodeA.dialByPeerInfo(nodeB.peerInfo, cb), + (cb) => setTimeout(() => { + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + cb() }, 250) - }) + ], done) }) it('Subscribe to a topic:Z in nodeA', (done) => { psA.subscribe('Z') setTimeout(() => { - expect(psA.getSubscriptions()).to.eql(['Z']) - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql(['Z']) + expectSet(psA.subscriptions, ['Z']) + expect(psB.peers.size).to.equal(1) + expectSet(first(psB.peers).topics, ['Z']) done() }, 100) }) @@ -169,12 +132,11 @@ describe('basics', () => { it('Unsubscribe from topic:Z in nodeA', (done) => { psA.unsubscribe('Z') - expect(psA.getSubscriptions()).to.eql([]) + expect(psA.subscriptions.size).to.equal(0) setTimeout(() => { - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql([]) + expect(psB.peers.size).to.equal(1) + expectSet(first(psB.peers).topics, []) done() }, 100) }) @@ -197,67 +159,33 @@ describe('basics', () => { describe('long running nodes (already have state)', () => { before((done) => { series([ - (cb) => { - PeerId.create((err, idA) => { - expect(err).to.not.exist - PeerInfo.create(idA, (err, peerA) => { - expect(err).to.not.exist - peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeA = new Node(peerA) - cb() - }) - }) - }, - (cb) => { - PeerId.create((err, idB) => { - expect(err).to.not.exist - PeerInfo.create(idB, (err, peerB) => { - expect(err).to.not.exist - peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - nodeB = new Node(peerB) - cb() - }) - }) - }, - (cb) => { - parallel([ - (cb) => { - nodeA.start(cb) - }, - (cb) => { - nodeB.start(cb) - } - ], cb) - }, - (cb) => { - psA = new PSG(nodeA) - psA.subscribe('Za') - setTimeout(() => { - expect(psA.getPeerSet()).to.eql({}) - expect(psA.getSubscriptions()).to.eql(['Za']) - cb() - }, 50) - }, - (cb) => { - psB = new PSG(nodeB) - psB.subscribe('Zb') - setTimeout(() => { - expect(psB.getPeerSet()).to.eql({}) - expect(psB.getSubscriptions()).to.eql(['Zb']) - cb() - }, 50) - } - ], done) + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb), + (cb) => createNode('/ip4/127.0.0.1/tcp/0', cb) + ], (cb, nodes) => { + nodeA = nodes[0] + nodeB = nodes[1] + + psA = new PSG(nodeA) + psB = new PSG(nodeB) + + psA.subscribe('Za') + psB.subscribe('Zb') + + setTimeout(() => { + expect(psA.peers.size).to.equal(0) + expectSet(psA.subscriptions, ['Za']) + expect(psB.peers.size).to.equal(0) + expectSet(psB.subscriptions, ['Zb']) + + done() + }, 50) + }) }) after((done) => { parallel([ - (cb) => { - nodeA.stop(cb) - }, - (cb) => { - nodeB.stop(cb) - } + (cb) => nodeA.stop(cb), + (cb) => nodeB.stop(cb) ], done) }) @@ -265,18 +193,16 @@ describe('basics', () => { nodeA.dialByPeerInfo(nodeB.peerInfo, (err) => { expect(err).to.not.exist setTimeout(() => { - expect(Object.keys(psA.getPeerSet()).length).to.equal(1) - expect(Object.keys(psB.getPeerSet()).length).to.equal(1) - - expect(psA.getSubscriptions()).to.eql(['Za']) - const peersB = _values(psB.getPeerSet()) - expect(peersB.length).to.equal(1) - expect(peersB[0].topics).to.eql(['Za']) - - expect(psB.getSubscriptions()).to.eql(['Zb']) - const peersA = _values(psA.getPeerSet()) - expect(peersA.length).to.equal(1) - expect(peersA[0].topics).to.eql(['Zb']) + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + + expectSet(psA.subscriptions, ['Za']) + expect(psB.peers.size).to.equal(1) + expectSet(first(psB.peers).topics, ['Za']) + + expectSet(psB.subscriptions, ['Zb']) + expect(psA.peers.size).to.equal(1) + expectSet(first(psA.peers).topics, ['Zb']) done() }, 250) diff --git a/test/multiple-nodes.js b/test/multiple-nodes.js index 8c90585db3..fc38313130 100644 --- a/test/multiple-nodes.js +++ b/test/multiple-nodes.js @@ -3,13 +3,13 @@ 'use strict' const expect = require('chai').expect -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') -const Node = require('libp2p-ipfs-nodejs') const parallel = require('async/parallel') + const PSG = require('../src') -const _values = require('lodash.values') +const utils = require('./utils') +const first = utils.first +const createNode = utils.createNode +const expectSet = utils.expectSet describe('multiple nodes', () => { describe('every peer subscribes to the topic', () => { @@ -23,34 +23,19 @@ describe('multiple nodes', () => { before((done) => { parallel([ - (cb) => { - spawnPubSubNode((err, node) => { - if (err) { - return cb(err) - } - a = node - cb() - }) - }, - (cb) => { - spawnPubSubNode((err, node) => { - if (err) { - return cb(err) - } - b = node - cb() - }) - }, - (cb) => { - spawnPubSubNode((err, node) => { - if (err) { - return cb(err) - } - c = node - cb() - }) + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb) + ], (err, nodes) => { + if (err) { + return done(err) } - ], done) + a = nodes[0] + b = nodes[1] + c = nodes[2] + + done() + }) }) after((done) => { @@ -58,15 +43,9 @@ describe('multiple nodes', () => { // before swarm does its dials setTimeout(() => { parallel([ - (cb) => { - a.libp2p.stop(cb) - }, - (cb) => { - b.libp2p.stop(cb) - }, - (cb) => { - c.libp2p.stop(cb) - } + (cb) => a.libp2p.stop(cb), + (cb) => b.libp2p.stop(cb), + (cb) => c.libp2p.stop(cb) ], done) }, 1000) }) @@ -88,49 +67,48 @@ describe('multiple nodes', () => { it('subscribe to the topic on node a', (done) => { a.ps.subscribe('Z') - expect(a.ps.getSubscriptions()).to.eql(['Z']) + expectSet(a.ps.subscriptions, ['Z']) setTimeout(() => { - const peersB = _values(b.ps.getPeerSet()) - expect(peersB.length).to.equal(2) - expect(peersB[1].topics).to.eql(['Z']) + expect(b.ps.peers.size).to.equal(2) + const topics = Array.from(b.ps.peers.values())[1].topics + expectSet(topics, ['Z']) + + expect(c.ps.peers.size).to.equal(1) + expectSet(first(c.ps.peers).topics, []) - const peersC = _values(c.ps.getPeerSet()) - expect(peersC.length).to.equal(1) - expect(peersC[0].topics).to.eql([]) done() }, 200) }) it('subscribe to the topic on node b', (done) => { b.ps.subscribe('Z') - expect(b.ps.getSubscriptions()).to.eql(['Z']) + expectSet(b.ps.subscriptions, ['Z']) setTimeout(() => { - const peersA = _values(a.ps.getPeerSet()) - expect(peersA.length).to.equal(1) - expect(peersA[0].topics).to.eql(['Z']) + expect(a.ps.peers.size).to.equal(1) + expectSet(first(a.ps.peers).topics, ['Z']) + + expect(c.ps.peers.size).to.equal(1) + expectSet(first(c.ps.peers).topics, ['Z']) - const peersC = _values(c.ps.getPeerSet()) - expect(peersC.length).to.equal(1) - expect(peersC[0].topics).to.eql(['Z']) done() }, 200) }) it('subscribe to the topic on node c', (done) => { c.ps.subscribe('Z') - expect(c.ps.getSubscriptions()).to.eql(['Z']) + expectSet(c.ps.subscriptions, ['Z']) setTimeout(() => { - const peersA = _values(a.ps.getPeerSet()) - expect(peersA.length).to.equal(1) - expect(peersA[0].topics).to.eql(['Z']) - - const peersB = _values(b.ps.getPeerSet()) - expect(peersB.length).to.equal(2) - expect(peersB[0].topics).to.eql(['Z']) - expect(peersB[1].topics).to.eql(['Z']) + expect(a.ps.peers.size).to.equal(1) + expectSet(first(a.ps.peers).topics, ['Z']) + + expect(b.ps.peers.size).to.equal(2) + b.ps.peers.forEach((peer) => { + expectSet(peer.topics, ['Z']) + }) + done() }, 200) }) @@ -203,8 +181,108 @@ describe('multiple nodes', () => { // ◉─┘ └─◉ // a e - before((done) => {}) - after((done) => {}) + let a + let b + let c + let d + let e + + before((done) => { + parallel([ + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb), + (cb) => spawnPubSubNode(cb) + ], (err, nodes) => { + if (err) { + return done(err) + } + a = nodes[0] + b = nodes[1] + c = nodes[2] + d = nodes[3] + e = nodes[4] + + done() + }) + }) + + after((done) => { + // note: setTimeout to avoid the tests finishing + // before swarm does its dials + setTimeout(() => { + parallel([ + (cb) => a.libp2p.stop(cb), + (cb) => b.libp2p.stop(cb), + (cb) => c.libp2p.stop(cb), + (cb) => d.libp2p.stop(cb), + (cb) => e.libp2p.stop(cb) + ], done) + }, 1000) + }) + + it('establish the connections', (done) => { + parallel([ + (cb) => { + a.libp2p.dialByPeerInfo(b.libp2p.peerInfo, cb) + }, + (cb) => { + b.libp2p.dialByPeerInfo(c.libp2p.peerInfo, cb) + }, + (cb) => { + c.libp2p.dialByPeerInfo(d.libp2p.peerInfo, cb) + }, + (cb) => { + d.libp2p.dialByPeerInfo(e.libp2p.peerInfo, cb) + } + ], (err) => { + expect(err).to.not.exist + // wait for the pubsub pipes to be established + setTimeout(done, 200) + }) + }) + + it('subscribes', () => { + a.ps.subscribe('Z') + expectSet(a.ps.subscriptions, ['Z']) + b.ps.subscribe('Z') + expectSet(b.ps.subscriptions, ['Z']) + c.ps.subscribe('Z') + expectSet(c.ps.subscriptions, ['Z']) + d.ps.subscribe('Z') + expectSet(d.ps.subscriptions, ['Z']) + e.ps.subscribe('Z') + expectSet(e.ps.subscriptions, ['Z']) + }) + + it('publishes from c', (done) => { + let counter = 0 + + a.ps.on('Z', incMsg) + b.ps.on('Z', incMsg) + c.ps.on('Z', incMsg) + d.ps.on('Z', incMsg) + e.ps.on('Z', incMsg) + + c.ps.publish('Z', new Buffer('hey from c')) + + function incMsg (msg) { + expect(msg.toString()).to.equal('hey from c') + check() + } + + function check () { + if (++counter === 5) { + a.ps.removeListener('Z', incMsg) + b.ps.removeListener('Z', incMsg) + c.ps.removeListener('Z', incMsg) + d.ps.removeListener('Z', incMsg) + e.ps.removeListener('Z', incMsg) + done() + } + } + }) }) }) @@ -216,7 +294,9 @@ describe('multiple nodes', () => { before((done) => { }) - after((done) => {}) + + after((done) => { + }) }) describe('1 level tree', () => { @@ -246,24 +326,14 @@ describe('multiple nodes', () => { }) function spawnPubSubNode (callback) { - PeerId.create((err, id) => { - expect(err).to.not.exist - PeerInfo.create(id, (err, peer) => { - expect(err).to.not.exist - peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0')) - const node = new Node(peer) - let ps - - node.start((err) => { - if (err) { - return callback(err) - } - ps = new PSG(node) - callback(null, { - libp2p: node, - ps: ps - }) - }) + createNode('/ip4/127.0.0.1/tcp/0', (err, node) => { + if (err) { + return callback(err) + } + + callback(null, { + libp2p: node, + ps: new PSG(node) }) }) } diff --git a/test/utils.js b/test/utils.js new file mode 100644 index 0000000000..fa8da24a1b --- /dev/null +++ b/test/utils.js @@ -0,0 +1,37 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const Node = require('libp2p-ipfs-nodejs') +const waterfall = require('async/waterfall') +const expect = require('chai').expect + +exports.first = (map) => map.values().next().value + +exports.expectSet = (set, subs) => { + expect( + Array.from(set.values()) + ).to.be.eql( + subs + ) +} + +exports.createNode = (maddr, callback) => { + waterfall([ + (cb) => PeerId.create(cb), + (id, cb) => PeerInfo.create(id, cb), + (peer, cb) => { + peer.multiaddr.add(multiaddr(maddr)) + cb(null, new Node(peer)) + }, + (node, cb) => { + node.start((err) => { + if (err) { + return cb(err) + } + cb(null, node) + }) + } + ], callback) +} diff --git a/test/utils.spec.js b/test/utils.spec.js new file mode 100644 index 0000000000..dc457be098 --- /dev/null +++ b/test/utils.spec.js @@ -0,0 +1,49 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect + +const utils = require('../src/utils') + +describe('utils', () => { + it('randomSeqno', () => { + const first = utils.randomSeqno() + const second = utils.randomSeqno() + + expect(first).to.have.length(40) + expect(second).to.have.length(40) + expect(first).to.not.be.eql(second) + }) + + it('msgId', () => { + expect(utils.msgId('hello', 'world')).to.be.eql('helloworld') + }) + + it('anyMatch', () => { + [ + [[1, 2, 3], [4, 5, 6], false], + [[1, 2], [1, 2], true], + [[1, 2, 3], [4, 5, 1], true], + [[5, 6, 1], [1, 2, 3], true], + [[], [], false], + [[1], [2], false] + ].forEach((test) => { + expect( + utils.anyMatch(new Set(test[0]), new Set(test[1])) + ).to.be.eql( + test[2] + ) + + expect( + utils.anyMatch(new Set(test[0]), test[1]) + ).to.be.eql( + test[2] + ) + }) + }) + + it('ensureArray', () => { + expect(utils.ensureArray('hello')).to.be.eql(['hello']) + expect(utils.ensureArray([1, 2])).to.be.eql([1, 2]) + }) +})