Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
chore: pubsub router tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Aug 17, 2020
1 parent af3487a commit 1093890
Show file tree
Hide file tree
Showing 4 changed files with 556 additions and 22 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"multibase": "^3.0.0",
"p-defer": "^3.0.0",
"p-limit": "^2.3.0",
"p-retry": "^4.2.0",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.0",
"protons": "^2.0.0",
Expand Down
328 changes: 317 additions & 11 deletions src/pubsub/tests/multiple-nodes-test.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,330 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'

// const chai = require('chai')
// const { expect } = chai
const chai = require('chai')
const { expect } = chai
const sinon = require('sinon')

const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')

const { expectSet } = require('./utils')

module.exports = (common) => {
describe('pubsub with multiple nodes', () => {
let pubsub
describe('every peer subscribes to the topic', () => {
describe('line', () => {
// line
// ◉────◉────◉
// a b c
let psA, psB, psC

// Create and start pubsub nodes
beforeEach(async () => {
[psA, psB, psC] = await common.setup(3)

// Start pubsub mpdes
await Promise.all([psA, psB, psC].map((p) => p.start()))
})

// Connect nodes
beforeEach(async () => {
await psA._libp2p.dial(psB.peerId)
await psB._libp2p.dial(psC.peerId)

// Wait for peers to be ready in pubsub
await pWaitFor(() =>
psA.peers.size === 1 &&
psC.peers.size === 1 &&
psA.peers.size === 1
)
})

afterEach(async () => {
sinon.restore()

await Promise.all([psA, psB, psC].map((p) => p.stop()))
await common.teardown()
})

it('subscribe to the topic on node a', () => {
const topic = 'Z'
const defer = pDefer()

psA.subscribe(topic)
expectSet(psA.subscriptions, [topic])

psB.once('pubsub:subscription-change', () => {
expect(psB.peers.size).to.equal(2)

const aPeerId = psA.peerId.toB58String()
expectSet(psB.topics.get(topic), [aPeerId])

expect(psC.peers.size).to.equal(1)
expect(psC.topics.get(topic)).to.not.exist()

defer.resolve()
})

return defer.promise
})

it('subscribe to the topic on node b', async () => {
const topic = 'Z'
psB.subscribe(topic)
expectSet(psB.subscriptions, [topic])

await Promise.all([
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psC.once('pubsub:subscription-change', resolve))
])

expect(psA.peers.size).to.equal(1)
expectSet(psA.topics.get(topic), [psB.peerId.toB58String()])

expect(psC.peers.size).to.equal(1)
expectSet(psC.topics.get(topic), [psB.peerId.toB58String()])
})

it('subscribe to the topic on node c', () => {
const topic = 'Z'
const defer = pDefer()

psC.subscribe(topic)
expectSet(psC.subscriptions, [topic])

psB.once('pubsub:subscription-change', () => {
expect(psA.peers.size).to.equal(1)
expect(psB.peers.size).to.equal(2)
expectSet(psB.topics.get(topic), [psC.peerId.toB58String()])

defer.resolve()
})

return defer.promise
})

it('publish on node a', async () => {
const topic = 'Z'
const defer = pDefer()

psA.subscribe(topic)
psB.subscribe(topic)
psC.subscribe(topic)

// await subscription change
await Promise.all([
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())),
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())),
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve()))
])

let counter = 0

psA.on(topic, incMsg)
psB.on(topic, incMsg)
psC.on(topic, incMsg)

psA.publish(topic, uint8ArrayFromString('hey'))

// Create pubsub router
beforeEach(async () => {
pubsub = await common.setup(2)
function incMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
check()
}

function check () {
if (++counter === 3) {
psA.removeListener(topic, incMsg)
psB.removeListener(topic, incMsg)
psC.removeListener(topic, incMsg)
defer.resolve()
}
}

return defer.promise
})

// since the topology is the same, just the publish
// gets sent by other peer, we reused the same peers
describe('1 level tree', () => {
// 1 level tree
// ┌◉┐
// │b│
// ◉─┘ └─◉
// a c

it('publish on node b', async () => {
const topic = 'Z'
const defer = pDefer()
let counter = 0

psA.subscribe(topic)
psB.subscribe(topic)
psC.subscribe(topic)

// await subscription change
await Promise.all([
new Promise(resolve => psA.once('pubsub:subscription-change', () => resolve())),
new Promise(resolve => psB.once('pubsub:subscription-change', () => resolve())),
new Promise(resolve => psC.once('pubsub:subscription-change', () => resolve()))
])

psA.on(topic, incMsg)
psB.on(topic, incMsg)
psC.on(topic, incMsg)

psB.publish(topic, uint8ArrayFromString('hey'))

function incMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
check()
}

function check () {
if (++counter === 3) {
psA.removeListener(topic, incMsg)
psB.removeListener(topic, incMsg)
psC.removeListener(topic, incMsg)
defer.resolve()
}
}

return defer.promise
})
})
})

describe('2 level tree', () => {
// 2 levels tree
// ┌◉┐
// │c│
// ┌◉─┘ └─◉┐
// │b d│
// ◉─┘ └─◉
// a
let psA, psB, psC, psD, psE

// Create and start pubsub nodes
beforeEach(async () => {
[psA, psB, psC, psD, psE] = await common.setup(5)

// Start pubsub nodes
await Promise.all([psA, psB, psC, psD, psE].map((p) => p.start()))
})

// connect nodes
beforeEach(async () => {
await psA._libp2p.dial(psB.peerId)
await psB._libp2p.dial(psC.peerId)
await psC._libp2p.dial(psD.peerId)
await psD._libp2p.dial(psE.peerId)

// Wait for peers to be ready in pubsub
await pWaitFor(() =>
psA.peers.size === 1 &&
psB.peers.size === 2 &&
psC.peers.size === 2 &&
psD.peers.size === 2 &&
psE.peers.size === 1
)
})

afterEach(async () => {
await Promise.all([psA, psB, psC, psD, psE].map((p) => p.stop()))
await common.teardown()
})

it('subscribes', () => {
psA.subscribe('Z')
expectSet(psA.subscriptions, ['Z'])
psB.subscribe('Z')
expectSet(psB.subscriptions, ['Z'])
psC.subscribe('Z')
expectSet(psC.subscriptions, ['Z'])
psD.subscribe('Z')
expectSet(psD.subscriptions, ['Z'])
psE.subscribe('Z')
expectSet(psE.subscriptions, ['Z'])
})

it('publishes from c', async function () {
this.timeout(30 * 1000)
const defer = pDefer()
let counter = 0

psA.subscribe('Z', incMsg)
psB.subscribe('Z', incMsg)
psC.subscribe('Z', incMsg)
psD.subscribe('Z', incMsg)
psE.subscribe('Z', incMsg)

await Promise.all([
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psC.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psD.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psE.once('pubsub:subscription-change', resolve))
])

psC.publish('Z', uint8ArrayFromString('hey from c'))

function incMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('hey from c')
check()
}

function check () {
if (++counter === 5) {
psA.unsubscribe('Z', incMsg)
psB.unsubscribe('Z', incMsg)
psC.unsubscribe('Z', incMsg)
psD.unsubscribe('Z', incMsg)
psE.unsubscribe('Z', incMsg)
defer.resolve()
}
}

return defer.promise
})
})
})

afterEach(async () => {
sinon.restore()
// TODO: array
await pubsub && pubsub.stop()
await common.teardown()
describe('only some nodes subscribe the networks', () => {
describe('line', () => {
// line
// ◉────◎────◉
// a b c

before(() => { })
after(() => { })
})

describe('1 level tree', () => {
// 1 level tree
// ┌◉┐
// │b│
// ◎─┘ └─◉
// a c

before(() => { })
after(() => { })
})

describe('2 level tree', () => {
// 2 levels tree
// ┌◉┐
// │c│
// ┌◎─┘ └─◉┐
// │b d│
// ◉─┘ └─◎
// a e

before(() => { })
after(() => { })
})
})
})
}
Loading

0 comments on commit 1093890

Please sign in to comment.