Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add error codes to dht and pubsub errors #328

Merged
merged 2 commits into from
Feb 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/dht.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
'use strict'

const nextTick = require('async/nextTick')
const errCode = require('err-code')

const { messages, codes } = require('./errors')

module.exports = (node) => {
return {
put: (key, value, callback) => {
if (!node._dht) {
return callback(new Error('DHT is not available'))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.put(key, value, callback)
Expand All @@ -16,7 +21,7 @@ module.exports = (node) => {
}

if (!node._dht) {
return callback(new Error('DHT is not available'))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.get(key, options, callback)
Expand All @@ -28,7 +33,7 @@ module.exports = (node) => {
}

if (!node._dht) {
return callback(new Error('DHT is not available'))
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}

node._dht.getMany(key, nVals, options, callback)
Expand Down
3 changes: 0 additions & 3 deletions src/error-messages.js

This file was deleted.

11 changes: 11 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict'

exports.messages = {
NOT_STARTED_YET: 'The libp2p node is not started yet',
DHT_DISABLED: 'DHT is not available'
}

exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED'
}
28 changes: 15 additions & 13 deletions src/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
'use strict'

const setImmediate = require('async/setImmediate')
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')

const errCode = require('err-code')

module.exports = (node) => {
const floodSub = new FloodSub(node)

Expand All @@ -18,7 +20,7 @@ module.exports = (node) => {
}

if (!node.isStarted() && !floodSub.started) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

function subscribe (cb) {
Expand All @@ -27,15 +29,15 @@ module.exports = (node) => {
}

floodSub.on(topic, handler)
setImmediate(cb)
nextTick(cb)
}

subscribe(callback)
},

unsubscribe: (topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
throw new Error(NOT_STARTED_YET)
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
if (!handler && !callback) {
floodSub.removeAllListeners(topic)
Expand All @@ -48,37 +50,37 @@ module.exports = (node) => {
}

if (typeof callback === 'function') {
setImmediate(() => callback())
nextTick(() => callback())
}
},

publish: (topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
}

floodSub.publish(topic, data)

setImmediate(() => callback())
nextTick(() => callback())
},

ls: (callback) => {
if (!node.isStarted() && !floodSub.started) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

const subscriptions = Array.from(floodSub.subscriptions)

setImmediate(() => callback(null, subscriptions))
nextTick(() => callback(null, subscriptions))
},

peers: (topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (typeof topic === 'function') {
Expand All @@ -90,7 +92,7 @@ module.exports = (node) => {
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
nextTick(() => callback(null, peers))
},

setMaxListeners (n) {
Expand Down
3 changes: 3 additions & 0 deletions test/dht.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ describe('.dht', () => {

nodeA.dht.put(key, value, (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DHT_DISABLED')
done()
})
})
Expand All @@ -149,6 +150,7 @@ describe('.dht', () => {

nodeA.dht.get(key, (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DHT_DISABLED')
done()
})
})
Expand All @@ -158,6 +160,7 @@ describe('.dht', () => {

nodeA.dht.getMany(key, 10, (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DHT_DISABLED')
done()
})
})
Expand Down
101 changes: 101 additions & 0 deletions test/pubsub.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const parallel = require('async/parallel')
const series = require('async/series')
const _times = require('lodash.times')

const { codes } = require('../src/errors')
const createNode = require('./utils/create-node')

function startTwo (callback) {
Expand Down Expand Up @@ -76,6 +77,10 @@ describe('.pubsub', () => {
(cb) => setTimeout(cb, 500),
// publish on the second
(cb) => nodes[1].pubsub.publish('pubsub', data, cb),
// ls subscripts
(cb) => nodes[1].pubsub.ls(cb),
// get subscribed peers
(cb) => nodes[1].pubsub.peers('pubsub', cb),
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe on the first
Expand Down Expand Up @@ -132,6 +137,33 @@ describe('.pubsub', () => {
expect(err).to.not.exist().mark()
})
})
it('publish should fail if data is not a buffer', (done) => {
createNode('/ip4/0.0.0.0/tcp/0', {
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
}
}
}, (err, node) => {
expect(err).to.not.exist()

node.start((err) => {
expect(err).to.not.exist()

node.pubsub.publish('pubsub', 'datastr', (err) => {
expect(err).to.exist()
expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER')

done()
})
})
})
})
})

describe('.pubsub off', () => {
Expand All @@ -154,4 +186,73 @@ describe('.pubsub', () => {
})
})
})

describe('.pubsub on and node not started', () => {
let libp2pNode

before(function (done) {
createNode('/ip4/0.0.0.0/tcp/0', {
config: {
peerDiscovery: {
mdns: {
enabled: false
}
},
EXPERIMENTAL: {
pubsub: true
}
}
}, (err, node) => {
expect(err).to.not.exist()

libp2pNode = node
done()
})
})

it('fail to subscribe if node not started yet', (done) => {
libp2pNode.pubsub.subscribe('pubsub', () => { }, (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to unsubscribe if node not started yet', (done) => {
libp2pNode.pubsub.unsubscribe('pubsub', () => { }, (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to publish if node not started yet', (done) => {
libp2pNode.pubsub.publish('pubsub', Buffer.from('data'), (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to ls if node not started yet', (done) => {
libp2pNode.pubsub.ls((err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})

it('fail to get subscribed peers to a topic if node not started yet', (done) => {
libp2pNode.pubsub.peers('pubsub', (err) => {
expect(err).to.exist()
expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED)

done()
})
})
})
})