Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Feb 20, 2019
1 parent 7e3702c commit 7cfb25c
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 31 deletions.
14 changes: 5 additions & 9 deletions src/dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
const nextTick = require('async/nextTick')
const errCode = require('err-code')

const { messages, codes } = require('./errors')

module.exports = (node) => {
return {
put: (key, value, callback) => {
if (!node._dht) {
return nextTick(() => callback(
errCode(new Error('DHT is not available'), 'ERR_DHT_DISABLED')
))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.put(key, value, callback)
Expand All @@ -21,9 +21,7 @@ module.exports = (node) => {
}

if (!node._dht) {
return nextTick(() => callback(
errCode(new Error('DHT is not available'), 'ERR_DHT_DISABLED')
))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.get(key, options, callback)
Expand All @@ -35,9 +33,7 @@ module.exports = (node) => {
}

if (!node._dht) {
return nextTick(() => callback(
errCode(new Error('DHT is not available'), 'ERR_DHT_DISABLED')
))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.getMany(key, nVals, options, callback)
Expand Down
3 changes: 0 additions & 3 deletions src/error-messages.js

This file was deleted.

11 changes: 11 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict'

exports.messages = {
NOT_STARTED_YET: 'The libp2p node is not started yet',
DHT_DISABLED: 'DHT is not available'
}

exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED'
}
26 changes: 7 additions & 19 deletions src/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const nextTick = require('async/nextTick')
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')

const errCode = require('err-code')
Expand All @@ -20,9 +20,7 @@ module.exports = (node) => {
}

if (!node.isStarted() && !floodSub.started) {
return nextTick(() => callback(
errCode(new Error(NOT_STARTED_YET), 'ERR_PUBSUB_NOT_STARTED')
))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

function subscribe (cb) {
Expand All @@ -39,9 +37,7 @@ module.exports = (node) => {

unsubscribe: (topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
return nextTick(() => callback(
errCode(new Error(NOT_STARTED_YET), 'ERR_PUBSUB_NOT_STARTED')
))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

floodSub.removeListener(topic, handler)
Expand All @@ -57,15 +53,11 @@ module.exports = (node) => {

publish: (topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
return nextTick(() => callback(
errCode(new Error(NOT_STARTED_YET), 'ERR_PUBSUB_NOT_STARTED')
))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!Buffer.isBuffer(data)) {
return nextTick(() => callback(
errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER')
))
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
}

floodSub.publish(topic, data)
Expand All @@ -75,9 +67,7 @@ module.exports = (node) => {

ls: (callback) => {
if (!node.isStarted() && !floodSub.started) {
return nextTick(() => callback(
errCode(new Error(NOT_STARTED_YET), 'ERR_PUBSUB_NOT_STARTED')
))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

const subscriptions = Array.from(floodSub.subscriptions)
Expand All @@ -87,9 +77,7 @@ module.exports = (node) => {

peers: (topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
return nextTick(() => callback(
errCode(new Error(NOT_STARTED_YET), 'ERR_PUBSUB_NOT_STARTED')
))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (typeof topic === 'function') {
Expand Down
98 changes: 98 additions & 0 deletions test/pubsub.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const parallel = require('async/parallel')
const series = require('async/series')
const _times = require('lodash.times')

const { codes } = require('../src/errors')
const createNode = require('./utils/create-node')

function startTwo (callback) {
Expand Down Expand Up @@ -87,6 +88,34 @@ describe('.pubsub', () => {
expect(err).to.not.exist().mark()
})
})

it('publish should fail if data is not a buffer', (done) => {
createNode('/ip4/0.0.0.0/tcp/0', {
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
}
}
}, (err, node) => {
expect(err).to.not.exist()

node.start((err) => {
expect(err).to.not.exist()

node.pubsub.publish('pubsub', 'datastr', (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER')

done()
})
})
})
})
})

describe('.pubsub off', () => {
Expand All @@ -109,4 +138,73 @@ describe('.pubsub', () => {
})
})
})

describe('.pubsub on and node not started', () => {
let libp2pNode

before(function (done) {
createNode('/ip4/0.0.0.0/tcp/0', {
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
}
}
}, (err, node) => {
expect(err).to.not.exist()

libp2pNode = node
done()
})
})

it('fail to subscribe if node not started yet', (done) => {
libp2pNode.pubsub.subscribe('pubsub', () => {}, (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to unsubscribe if node not started yet', (done) => {
libp2pNode.pubsub.unsubscribe('pubsub', () => { }, (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to publish if node not started yet', (done) => {
libp2pNode.pubsub.publish('pubsub', Buffer.from('data'), (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to ls if node not started yet', (done) => {
libp2pNode.pubsub.ls((err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to get subscribed peers to a topic if node not started yet', (done) => {
libp2pNode.pubsub.peers('pubsub', (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})
})
})

0 comments on commit 7cfb25c

Please sign in to comment.