Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Jacob Heun <[email protected]>
  • Loading branch information
vasco-santos and jacobheun committed Aug 21, 2020
1 parent f209d04 commit bc8ed8b
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 29 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.0",
"libp2p-crypto": "^0.17.9",
"libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Publish data message to pubsub topics.

| Name | Type | Description |
|------|------|-------------|
| topics | `string` | pubsub topic |
| topic | `string` | pubsub topic |
| message | `Uint8Array` | message to publish |

##### Returns
Expand Down
3 changes: 2 additions & 1 deletion src/pubsub/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

exports.codes = {
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE'
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE',
ERR_NO_STREAM_TO_SEND_RPC: 'ERR_NO_STREAM_TO_SEND_RPC'
}
10 changes: 6 additions & 4 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const EventEmitter = require('events')
const errcode = require('err-code')

const pipe = require('it-pipe')
const PeerId = require('peer-id')

const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
Expand Down Expand Up @@ -354,15 +353,15 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
const msg = utils.normalizeInRpcMessage(message, PeerId.createFromB58String(idB58Str))
const msg = utils.normalizeInRpcMessage(message, idB58Str)
this._processRpcMessage(msg)
})
}
return true
}

/**
* Handles an subscription change from a peer
* Handles a subscription change from a peer
* @param {string} id
* @param {RPC.SubOpt} subOpt
*/
Expand Down Expand Up @@ -470,7 +469,10 @@ class PubsubBaseProtocol extends EventEmitter {
_sendRpc (id, rpc) {
const peerStreams = this.peers.get(id)
if (!peerStreams || !peerStreams.isWritable) {
return
const msg = `Cannot send RPC to ${id} as there is no open stream to it available`

this.log.err(msg)
throw errcode(new Error(msg), codes.ERR_NO_STREAM_TO_SEND_RPC)
}
peerStreams.write(this._encodeRpc(rpc))
}
Expand Down
11 changes: 2 additions & 9 deletions src/pubsub/message/sign.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ async function verifySignature (message) {
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
if (typeof baseMessage.from === 'string') {
baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes()
}
baseMessage.from = PeerId.createFromB58String(baseMessage.from).toBytes()
const bytes = uint8ArrayConcat([
SignPrefix,
Message.encode(baseMessage)
Expand All @@ -63,12 +61,7 @@ async function verifySignature (message) {
*/
async function messagePublicKey (message) {
// should be available in the from property of the message (peer id)
let from
if (typeof message.from === 'string') {
from = PeerId.createFromB58String(message.from)
} else {
from = PeerId.createFromBytes(message.from)
}
const from = PeerId.createFromCID(message.from)

if (message.key) {
const keyPeerId = await PeerId.createFromPubKey(message.key)
Expand Down
7 changes: 3 additions & 4 deletions src/pubsub/utils.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const crypto = require('libp2p-crypto')
const multibase = require('multibase')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')

Expand Down Expand Up @@ -74,7 +73,7 @@ exports.ensureArray = (maybeArray) => {
* Ensures `message.from` is base58 encoded
* @param {Object} message
* @param {Uint8Array|String} message.from
* @param {PeerId} peerId
* @param {String} peerId
* @return {Object}
*/
exports.normalizeInRpcMessage = (message, peerId) => {
Expand All @@ -83,15 +82,15 @@ exports.normalizeInRpcMessage = (message, peerId) => {
m.from = uint8ArrayToString(message.from, 'base58btc')
}
if (peerId) {
m.receivedFrom = peerId.toB58String()
m.receivedFrom = peerId
}
return m
}

exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) {
m.from = multibase.decode('z' + message.from)
m.from = uint8ArrayFromString(message.from, 'base58btc')
}
if (typeof message.data === 'string' || message.data instanceof String) {
m.data = uint8ArrayFromString(message.data)
Expand Down
24 changes: 15 additions & 9 deletions test/pubsub/lifesycle.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,22 @@ describe('pubsub base lifecycle', () => {
const error = new Error('new stream error')
sinon.stub(c0, 'newStream').throws(error)

await onConnectA(peerIdB, c0)
await handlerB({
protocol,
stream: c1.stream,
connection: {
remotePeer: peerIdA
}
})
try {
await onConnectA(peerIdB, c0)
await handlerB({
protocol,
stream: c1.stream,
connection: {
remotePeer: peerIdA
}
})
} catch (err) {
expect(err).to.exist()
expect(c0.newStream).to.have.property('callCount', 1)
return
}

expect(c0.newStream).to.have.property('callCount', 1)
throw new Error('should throw to send a message if no strea available')
})

it('should handle onDisconnect as expected', async () => {
Expand Down

0 comments on commit bc8ed8b

Please sign in to comment.