From 1093890ea5ec5953fbc7793145d0bfe1078718f7 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 17 Aug 2020 15:46:39 +0200 Subject: [PATCH] chore: pubsub router tests --- package.json | 1 + src/pubsub/tests/multiple-nodes-test.js | 328 +++++++++++++++++++++++- src/pubsub/tests/two-nodes-test.js | 230 ++++++++++++++++- src/pubsub/tests/utils.js | 19 ++ 4 files changed, 556 insertions(+), 22 deletions(-) create mode 100644 src/pubsub/tests/utils.js diff --git a/package.json b/package.json index 8375792f9..af544443c 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "multibase": "^3.0.0", "p-defer": "^3.0.0", "p-limit": "^2.3.0", + "p-retry": "^4.2.0", "p-wait-for": "^3.1.0", "peer-id": "^0.14.0", "protons": "^2.0.0", diff --git a/src/pubsub/tests/multiple-nodes-test.js b/src/pubsub/tests/multiple-nodes-test.js index b4b960c5c..85bfbb7ab 100644 --- a/src/pubsub/tests/multiple-nodes-test.js +++ b/src/pubsub/tests/multiple-nodes-test.js @@ -1,24 +1,330 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ 'use strict' -// const chai = require('chai') -// const { expect } = chai +const chai = require('chai') +const { expect } = chai const sinon = require('sinon') +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') + +const { expectSet } = require('./utils') + module.exports = (common) => { describe('pubsub with multiple nodes', () => { - let pubsub + describe('every peer subscribes to the topic', () => { + describe('line', () => { + // line + // ◉────◉────◉ + // a b c + let psA, psB, psC + + // Create and start pubsub nodes + beforeEach(async () => { + [psA, psB, psC] = await common.setup(3) + + // Start pubsub mpdes + await Promise.all([psA, psB, psC].map((p) => p.start())) + }) + + // Connect nodes + beforeEach(async () => { + await psA._libp2p.dial(psB.peerId) + await psB._libp2p.dial(psC.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => + psA.peers.size === 1 && + psC.peers.size === 1 && + psA.peers.size === 1 + ) + }) + + afterEach(async () => { + sinon.restore() + + await Promise.all([psA, psB, psC].map((p) => p.stop())) + await common.teardown() + }) + + it('subscribe to the topic on node a', () => { + const topic = 'Z' + const defer = pDefer() + + psA.subscribe(topic) + expectSet(psA.subscriptions, [topic]) + + psB.once('pubsub:subscription-change', () => { + expect(psB.peers.size).to.equal(2) + + const aPeerId = psA.peerId.toB58String() + expectSet(psB.topics.get(topic), [aPeerId]) + + expect(psC.peers.size).to.equal(1) + expect(psC.topics.get(topic)).to.not.exist() + + defer.resolve() + }) + + return defer.promise + }) + + it('subscribe to the topic on node b', async () => { + const topic = 'Z' + psB.subscribe(topic) + expectSet(psB.subscriptions, [topic]) + + await Promise.all([ + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('pubsub:subscription-change', resolve)) + ]) + + expect(psA.peers.size).to.equal(1) + expectSet(psA.topics.get(topic), [psB.peerId.toB58String()]) + + expect(psC.peers.size).to.equal(1) + expectSet(psC.topics.get(topic), [psB.peerId.toB58String()]) + }) + + it('subscribe to the topic on node c', () => { + const topic = 'Z' + const defer = pDefer() + + psC.subscribe(topic) + expectSet(psC.subscriptions, [topic]) + + psB.once('pubsub:subscription-change', () => { + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(2) + expectSet(psB.topics.get(topic), [psC.peerId.toB58String()]) + + defer.resolve() + }) + + return defer.promise + }) + + it('publish on node a', async () => { + const topic = 'Z' + const defer = pDefer() + + psA.subscribe(topic) + psB.subscribe(topic) + psC.subscribe(topic) + + // await subscription change + await Promise.all([ + new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) + ]) + + let counter = 0 + + psA.on(topic, incMsg) + psB.on(topic, incMsg) + psC.on(topic, incMsg) + + psA.publish(topic, uint8ArrayFromString('hey')) - // Create pubsub router - beforeEach(async () => { - pubsub = await common.setup(2) + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + check() + } + + function check () { + if (++counter === 3) { + psA.removeListener(topic, incMsg) + psB.removeListener(topic, incMsg) + psC.removeListener(topic, incMsg) + defer.resolve() + } + } + + return defer.promise + }) + + // since the topology is the same, just the publish + // gets sent by other peer, we reused the same peers + describe('1 level tree', () => { + // 1 level tree + // ┌◉┐ + // │b│ + // ◉─┘ └─◉ + // a c + + it('publish on node b', async () => { + const topic = 'Z' + const defer = pDefer() + let counter = 0 + + psA.subscribe(topic) + psB.subscribe(topic) + psC.subscribe(topic) + + // await subscription change + await Promise.all([ + new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())), + new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve())) + ]) + + psA.on(topic, incMsg) + psB.on(topic, incMsg) + psC.on(topic, incMsg) + + psB.publish(topic, uint8ArrayFromString('hey')) + + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + check() + } + + function check () { + if (++counter === 3) { + psA.removeListener(topic, incMsg) + psB.removeListener(topic, incMsg) + psC.removeListener(topic, incMsg) + defer.resolve() + } + } + + return defer.promise + }) + }) + }) + + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◉─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◉ + // a + let psA, psB, psC, psD, psE + + // Create and start pubsub nodes + beforeEach(async () => { + [psA, psB, psC, psD, psE] = await common.setup(5) + + // Start pubsub nodes + await Promise.all([psA, psB, psC, psD, psE].map((p) => p.start())) + }) + + // connect nodes + beforeEach(async () => { + await psA._libp2p.dial(psB.peerId) + await psB._libp2p.dial(psC.peerId) + await psC._libp2p.dial(psD.peerId) + await psD._libp2p.dial(psE.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => + psA.peers.size === 1 && + psB.peers.size === 2 && + psC.peers.size === 2 && + psD.peers.size === 2 && + psE.peers.size === 1 + ) + }) + + afterEach(async () => { + await Promise.all([psA, psB, psC, psD, psE].map((p) => p.stop())) + await common.teardown() + }) + + it('subscribes', () => { + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) + psD.subscribe('Z') + expectSet(psD.subscriptions, ['Z']) + psE.subscribe('Z') + expectSet(psE.subscriptions, ['Z']) + }) + + it('publishes from c', async function () { + this.timeout(30 * 1000) + const defer = pDefer() + let counter = 0 + + psA.subscribe('Z', incMsg) + psB.subscribe('Z', incMsg) + psC.subscribe('Z', incMsg) + psD.subscribe('Z', incMsg) + psE.subscribe('Z', incMsg) + + await Promise.all([ + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psD.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psE.once('pubsub:subscription-change', resolve)) + ]) + + psC.publish('Z', uint8ArrayFromString('hey from c')) + + function incMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('hey from c') + check() + } + + function check () { + if (++counter === 5) { + psA.unsubscribe('Z', incMsg) + psB.unsubscribe('Z', incMsg) + psC.unsubscribe('Z', incMsg) + psD.unsubscribe('Z', incMsg) + psE.unsubscribe('Z', incMsg) + defer.resolve() + } + } + + return defer.promise + }) + }) }) - afterEach(async () => { - sinon.restore() - // TODO: array - await pubsub && pubsub.stop() - await common.teardown() + describe('only some nodes subscribe the networks', () => { + describe('line', () => { + // line + // ◉────◎────◉ + // a b c + + before(() => { }) + after(() => { }) + }) + + describe('1 level tree', () => { + // 1 level tree + // ┌◉┐ + // │b│ + // ◎─┘ └─◉ + // a c + + before(() => { }) + after(() => { }) + }) + + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◎─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◎ + // a e + + before(() => { }) + after(() => { }) + }) }) }) } diff --git a/src/pubsub/tests/two-nodes-test.js b/src/pubsub/tests/two-nodes-test.js index fa5e5d711..11ec0968a 100644 --- a/src/pubsub/tests/two-nodes-test.js +++ b/src/pubsub/tests/two-nodes-test.js @@ -1,24 +1,232 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ 'use strict' -// const chai = require('chai') -// const { expect } = chai +const chai = require('chai') +const { expect } = chai const sinon = require('sinon') +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const uint8ArrayFromString = require('uint8arrays/from-string') +const uint8ArrayToString = require('uint8arrays/to-string') + +const { + first, + expectSet +} = require('./utils') + +const topic = 'foo' + +function shouldNotHappen (_) { + expect.fail() +} + module.exports = (common) => { describe('pubsub with two nodes', () => { - let pubsub + describe('fresh nodes', () => { + let psA, psB + + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) + + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + // Start pubsub and connect nodes + await Promise.all([ + psA.start(), + psB.start() + ]) + + await psA._libp2p.dial(psB.peerId) + + // Wait for peers to be ready in pubsub + await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1) + }) + + after(async () => { + sinon.restore() + + psA && await psA.stop() + psB && await psB.stop() + + await common.teardown() + }) + + it('Subscribe to a topic in nodeA', () => { + const defer = pDefer() + + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expectSet(psA.subscriptions, [topic]) + expect(psB.peers.size).to.equal(1) + expectSet(psB.topics.get(topic), [psA.peerId.toB58String()]) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }]) + defer.resolve() + }) + psA.subscribe(topic) + + return defer.promise + }) + + it('Publish to a topic in nodeA', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + expect(uint8ArrayToString(msg.data)).to.equal('hey') + psB.removeListener(topic, shouldNotHappen) + defer.resolve() + }) + + psB.once(topic, shouldNotHappen) + + psA.publish(topic, uint8ArrayFromString('hey')) + + return defer.promise + }) + + it('Publish to a topic in nodeB', () => { + const defer = pDefer() + + psA.once(topic, (msg) => { + psA.once(topic, shouldNotHappen) + expect(uint8ArrayToString(msg.data)).to.equal('banana') + + setTimeout(() => { + psA.removeListener(topic, shouldNotHappen) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + }, 100) + }) + + psB.once(topic, shouldNotHappen) + + psB.publish(topic, uint8ArrayFromString('banana')) + + return defer.promise + }) + + it('Publish 10 msg to a topic in nodeB', () => { + const defer = pDefer() + let counter = 0 - // Create pubsub router - beforeEach(async () => { - pubsub = await common.setup(2) + psB.once(topic, shouldNotHappen) + psA.on(topic, receivedMsg) + + function receivedMsg (msg) { + expect(uint8ArrayToString(msg.data)).to.equal('banana') + expect(msg.from).to.be.eql(psB.peerId.toB58String()) + expect(msg.seqno).to.be.a('Uint8Array') + expect(msg.topicIDs).to.be.eql([topic]) + + if (++counter === 10) { + psA.removeListener(topic, receivedMsg) + psB.removeListener(topic, shouldNotHappen) + + defer.resolve() + } + } + + Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana'))) + + return defer.promise + }) + + it('Unsubscribe from topic in nodeA', () => { + const defer = pDefer() + + psA.unsubscribe(topic) + expect(psA.subscriptions.size).to.equal(0) + + psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => { + expect(psB.peers.size).to.equal(1) + expectSet(psB.topics.get(topic), []) + expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String()) + expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }]) + + defer.resolve() + }) + + return defer.promise + }) + + it('Publish to a topic:Z in nodeA nodeB', () => { + const defer = pDefer() + + psA.once('Z', shouldNotHappen) + psB.once('Z', shouldNotHappen) + + setTimeout(() => { + psA.removeListener('Z', shouldNotHappen) + psB.removeListener('Z', shouldNotHappen) + defer.resolve() + }, 100) + + psB.publish('Z', uint8ArrayFromString('banana')) + psA.publish('Z', uint8ArrayFromString('banana')) + + return defer.promise + }) }) - afterEach(async () => { - sinon.restore() - // TODO: array - await pubsub && pubsub.stop() - await common.teardown() + describe('nodes send state on connection', () => { + let psA, psB + + // Create pubsub nodes and connect them + before(async () => { + [psA, psB] = await common.setup(2) + + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + + // Start pubsub and connect nodes + await Promise.all([ + psA.start(), + psB.start() + ]) + }) + + // Make subscriptions prior to nodes connected + before(() => { + psA.subscribe('Za') + psB.subscribe('Zb') + + expect(psA.peers.size).to.equal(0) + expectSet(psA.subscriptions, ['Za']) + expect(psB.peers.size).to.equal(0) + expectSet(psB.subscriptions, ['Zb']) + }) + + after(async () => { + sinon.restore() + + psA && await psA.stop() + psB && await psB.stop() + + await common.teardown() + }) + + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(10e3) + + await Promise.all([ + psA._libp2p.dial(psB.peerId), + new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)), + new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)) + ]) + + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) + + expectSet(psA.subscriptions, ['Za']) + expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()]) + + expectSet(psB.subscriptions, ['Zb']) + expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()]) + }) }) }) } diff --git a/src/pubsub/tests/utils.js b/src/pubsub/tests/utils.js new file mode 100644 index 000000000..14b49973b --- /dev/null +++ b/src/pubsub/tests/utils.js @@ -0,0 +1,19 @@ +'use strict' + +const { expect } = require('chai') +const pRetry = require('p-retry') + +exports.first = (map) => map.values().next().value + +exports.expectSet = (set, subs) => { + expect(Array.from(set.values())).to.eql(subs) +} + +// wait until a peer know about other peer to subscribe a topic +exports.waitForNotificationOfSubscription = (pubsub, topic, count = 1) => pRetry(async () => { + const res = await pubsub.peers(topic) + + if (!res || !res.length || res.length < count) { + throw new Error('Could not find peer subscribing') + } +}, { retries: 5 })