Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Jacob Heun <[email protected]>
  • Loading branch information
vasco-santos and jacobheun committed Aug 21, 2020
1 parent f209d04 commit d5ffc49
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 20 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"libp2p-crypto": "^0.17.9",
"libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Publish data message to pubsub topics.

| Name | Type | Description |
|------|------|-------------|
| topics | `string` | pubsub topic |
| topic | `string` | pubsub topic |
| message | `Uint8Array` | message to publish |

##### Returns
Expand Down
8 changes: 5 additions & 3 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const EventEmitter = require('events')
const errcode = require('err-code')

const pipe = require('it-pipe')
const PeerId = require('peer-id')

const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
Expand Down Expand Up @@ -354,15 +353,15 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
const msg = utils.normalizeInRpcMessage(message, PeerId.createFromB58String(idB58Str))
const msg = utils.normalizeInRpcMessage(message, idB58Str)
this._processRpcMessage(msg)
})
}
return true
}

/**
* Handles an subscription change from a peer
* Handles a subscription change from a peer
* @param {string} id
* @param {RPC.SubOpt} subOpt
*/
Expand Down Expand Up @@ -470,6 +469,9 @@ class PubsubBaseProtocol extends EventEmitter {
_sendRpc (id, rpc) {
const peerStreams = this.peers.get(id)
if (!peerStreams || !peerStreams.isWritable) {
const msg = `Cannot send RPC to ${id} as there is no open stream to it available`

this.log.err(msg)
return
}
peerStreams.write(this._encodeRpc(rpc))
Expand Down
11 changes: 2 additions & 9 deletions src/pubsub/message/sign.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ async function verifySignature (message) {
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
if (typeof baseMessage.from === 'string') {
baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes()
}
baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes()
const bytes = uint8ArrayConcat([
SignPrefix,
Message.encode(baseMessage)
Expand All @@ -63,12 +61,7 @@ async function verifySignature (message) {
*/
async function messagePublicKey (message) {
// should be available in the from property of the message (peer id)
let from
if (typeof message.from === 'string') {
from = PeerId.createFromB58String(message.from)
} else {
from = PeerId.createFromBytes(message.from)
}
const from = PeerId.createFromCID(message.from)

if (message.key) {
const keyPeerId = await PeerId.createFromPubKey(message.key)
Expand Down
7 changes: 3 additions & 4 deletions src/pubsub/utils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const crypto = require('libp2p-crypto')
const multibase = require('multibase')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')

Expand Down Expand Up @@ -74,7 +73,7 @@ exports.ensureArray = (maybeArray) => {
* Ensures `message.from` is base58 encoded
* @param {Object} message
* @param {Uint8Array|String} message.from
* @param {PeerId} peerId
* @param {String} peerId
* @return {Object}
*/
exports.normalizeInRpcMessage = (message, peerId) => {
Expand All @@ -83,15 +82,15 @@ exports.normalizeInRpcMessage = (message, peerId) => {
m.from = uint8ArrayToString(message.from, 'base58btc')
}
if (peerId) {
m.receivedFrom = peerId.toB58String()
m.receivedFrom = peerId
}
return m
}

exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) {
m.from = multibase.decode('z' + message.from)
m.from = uint8ArrayFromString(message.from, 'base58btc')
}
if (typeof message.data === 'string' || message.data instanceof String) {
m.data = uint8ArrayFromString(message.data)
Expand Down
78 changes: 78 additions & 0 deletions test/pubsub/emit-self.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')

const {
createPeerId,
mockRegistrar,
PubsubImplementation
} = require('./utils')

const uint8ArrayFromString = require('uint8arrays/from-string')

const protocol = '/pubsub/1.0.0'
const topic = 'foo'
const data = uint8ArrayFromString('bar')
const shouldNotHappen = (_) => expect.fail()

describe('emitSelf', () => {
let pubsub

describe('enabled', () => {
before(async () => {
const peerId = await createPeerId()

pubsub = new PubsubImplementation(protocol, {
peerId,
registrar: mockRegistrar
}, { emitSelf: true })
})

before(() => {
pubsub.start()
pubsub.subscribe(topic)
})

after(() => {
pubsub.stop()
})

it('should emit to self on publish', () => {
const promise = new Promise((resolve) => pubsub.once(topic, resolve))

pubsub.publish(topic, data)

return promise
})
})

describe('disabled', () => {
before(async () => {
const peerId = await createPeerId()

pubsub = new PubsubImplementation(protocol, {
peerId,
registrar: mockRegistrar
}, { emitSelf: false })
})

before(() => {
pubsub.start()
pubsub.subscribe(topic)
})

after(() => {
pubsub.stop()
})

it('should not emit to self on publish', () => {
pubsub.once(topic, (m) => shouldNotHappen)

pubsub.publish(topic, data)

// Wait 1 second to guarantee that self is not noticed
return new Promise((resolve) => setTimeout(() => resolve(), 1000))
})
})
})
Empty file.
5 changes: 3 additions & 2 deletions test/pubsub/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ exports.createPeerId = async () => {
}

class PubsubImplementation extends PubsubBaseProtocol {
constructor (protocol, libp2p) {
constructor (protocol, libp2p, options = {}) {
super({
debugName: 'libp2p:pubsub',
multicodecs: protocol,
libp2p
libp2p,
...options
})
}

Expand Down

0 comments on commit d5ffc49

Please sign in to comment.