diff --git a/package.json b/package.json index 8375792f9..6ab0c4744 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,7 @@ "it-pair": "^1.0.0", "it-pipe": "^1.1.0", "it-pushable": "^1.4.0", - "libp2p-crypto": "^0.17.9", + "libp2p-crypto": "^0.18.0", "libp2p-tcp": "^0.15.0", "multiaddr": "^8.0.0", "multibase": "^3.0.0", diff --git a/src/pubsub/README.md b/src/pubsub/README.md index ef8b2c09f..15fdbccfa 100644 --- a/src/pubsub/README.md +++ b/src/pubsub/README.md @@ -120,7 +120,7 @@ Publish data message to pubsub topics. | Name | Type | Description | |------|------|-------------| -| topics | `string` | pubsub topic | +| topic | `string` | pubsub topic | | message | `Uint8Array` | message to publish | ##### Returns diff --git a/src/pubsub/index.js b/src/pubsub/index.js index 855805291..3935c99fb 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -5,7 +5,6 @@ const EventEmitter = require('events') const errcode = require('err-code') const pipe = require('it-pipe') -const PeerId = require('peer-id') const MulticodecTopology = require('../topology/multicodec-topology') const { codes } = require('./errors') @@ -354,7 +353,7 @@ class PubsubBaseProtocol extends EventEmitter { this.log('received message we didn\'t subscribe to. Dropping.') return } - const msg = utils.normalizeInRpcMessage(message, PeerId.createFromB58String(idB58Str)) + const msg = utils.normalizeInRpcMessage(message, idB58Str) this._processRpcMessage(msg) }) } @@ -362,7 +361,7 @@ class PubsubBaseProtocol extends EventEmitter { } /** - * Handles an subscription change from a peer + * Handles a subscription change from a peer * @param {string} id * @param {RPC.SubOpt} subOpt */ @@ -470,6 +469,9 @@ class PubsubBaseProtocol extends EventEmitter { _sendRpc (id, rpc) { const peerStreams = this.peers.get(id) if (!peerStreams || !peerStreams.isWritable) { + const msg = `Cannot send RPC to ${id} as there is no open stream to it available` + + this.log.err(msg) return } peerStreams.write(this._encodeRpc(rpc)) diff --git a/src/pubsub/message/sign.js b/src/pubsub/message/sign.js index b17aa41b3..504dd870a 100644 --- a/src/pubsub/message/sign.js +++ b/src/pubsub/message/sign.js @@ -39,9 +39,7 @@ async function verifySignature (message) { const baseMessage = { ...message } delete baseMessage.signature delete baseMessage.key - if (typeof baseMessage.from === 'string') { - baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes() - } + baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes() const bytes = uint8ArrayConcat([ SignPrefix, Message.encode(baseMessage) @@ -63,12 +61,7 @@ async function verifySignature (message) { */ async function messagePublicKey (message) { // should be available in the from property of the message (peer id) - let from - if (typeof message.from === 'string') { - from = PeerId.createFromB58String(message.from) - } else { - from = PeerId.createFromBytes(message.from) - } + const from = PeerId.createFromCID(message.from) if (message.key) { const keyPeerId = await PeerId.createFromPubKey(message.key) diff --git a/src/pubsub/utils.js b/src/pubsub/utils.js index cf4acf306..c04c6979d 100644 --- a/src/pubsub/utils.js +++ b/src/pubsub/utils.js @@ -1,7 +1,6 @@ 'use strict' const crypto = require('libp2p-crypto') -const multibase = require('multibase') const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayFromString = require('uint8arrays/from-string') @@ -74,7 +73,7 @@ exports.ensureArray = (maybeArray) => { * Ensures `message.from` is base58 encoded * @param {Object} message * @param {Uint8Array|String} message.from - * @param {PeerId} peerId + * @param {String} peerId * @return {Object} */ exports.normalizeInRpcMessage = (message, peerId) => { @@ -83,7 +82,7 @@ exports.normalizeInRpcMessage = (message, peerId) => { m.from = uint8ArrayToString(message.from, 'base58btc') } if (peerId) { - m.receivedFrom = peerId.toB58String() + m.receivedFrom = peerId } return m } @@ -91,7 +90,7 @@ exports.normalizeInRpcMessage = (message, peerId) => { exports.normalizeOutRpcMessage = (message) => { const m = Object.assign({}, message) if (typeof message.from === 'string' || message.from instanceof String) { - m.from = multibase.decode('z' + message.from) + m.from = uint8ArrayFromString(message.from, 'base58btc') } if (typeof message.data === 'string' || message.data instanceof String) { m.data = uint8ArrayFromString(message.data) diff --git a/test/pubsub/emit-self.spec.js b/test/pubsub/emit-self.spec.js new file mode 100644 index 000000000..f36594158 --- /dev/null +++ b/test/pubsub/emit-self.spec.js @@ -0,0 +1,78 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') + +const { + createPeerId, + mockRegistrar, + PubsubImplementation +} = require('./utils') + +const uint8ArrayFromString = require('uint8arrays/from-string') + +const protocol = '/pubsub/1.0.0' +const topic = 'foo' +const data = uint8ArrayFromString('bar') +const shouldNotHappen = (_) => expect.fail() + +describe('emitSelf', () => { + let pubsub + + describe('enabled', () => { + before(async () => { + const peerId = await createPeerId() + + pubsub = new PubsubImplementation(protocol, { + peerId, + registrar: mockRegistrar + }, { emitSelf: true }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(() => { + pubsub.stop() + }) + + it('should emit to self on publish', () => { + const promise = new Promise((resolve) => pubsub.once(topic, resolve)) + + pubsub.publish(topic, data) + + return promise + }) + }) + + describe('disabled', () => { + before(async () => { + const peerId = await createPeerId() + + pubsub = new PubsubImplementation(protocol, { + peerId, + registrar: mockRegistrar + }, { emitSelf: false }) + }) + + before(() => { + pubsub.start() + pubsub.subscribe(topic) + }) + + after(() => { + pubsub.stop() + }) + + it('should not emit to self on publish', () => { + pubsub.once(topic, (m) => shouldNotHappen) + + pubsub.publish(topic, data) + + // Wait 1 second to guarantee that self is not noticed + return new Promise((resolve) => setTimeout(() => resolve(), 1000)) + }) + }) +}) diff --git a/test/pubsub/utils/emit-self.spec.js b/test/pubsub/utils/emit-self.spec.js deleted file mode 100644 index e69de29bb..000000000 diff --git a/test/pubsub/utils/index.js b/test/pubsub/utils/index.js index 39406d434..f62f1cba8 100644 --- a/test/pubsub/utils/index.js +++ b/test/pubsub/utils/index.js @@ -14,11 +14,12 @@ exports.createPeerId = async () => { } class PubsubImplementation extends PubsubBaseProtocol { - constructor (protocol, libp2p) { + constructor (protocol, libp2p, options = {}) { super({ debugName: 'libp2p:pubsub', multicodecs: protocol, - libp2p + libp2p, + ...options }) }