Skip to content

Commit

Permalink
feat: use libp2p 0.28.x (#217)
Browse files Browse the repository at this point in the history
Updates the bitswap module to use new features in `[email protected]`.

`peer-info` has been deprecated, `js-libp2p` has changed how it gathers its listening multiaddr, the `ConnectionManager`, `PeerStore` and `DHT` APIs have all be updated.

BREAKING CHANGE: Requires `[email protected]` or above

Co-authored-by: Jacob Heun <[email protected]>
  • Loading branch information
vasco-santos and jacobheun authored Jun 5, 2020
1 parent b5e05e2 commit c4ede4d
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 78 deletions.
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p": "^0.28.0",
"libp2p-kad-dht": "^0.19.1",
"libp2p-mplex": "^0.9.2",
"libp2p-secio": "^0.12.1",
"libp2p-tcp": "^0.14.2",
Expand All @@ -65,9 +65,7 @@
"p-defer": "^3.0.0",
"p-event": "^4.1.0",
"p-wait-for": "^3.1.0",
"peer-book": "~0.9.0",
"peer-id": "^0.13.5",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
Expand All @@ -84,6 +82,7 @@
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^0.3.0",
"moving-average": "^1.0.0",
"multicodec": "^1.0.0",
"multihashing-async": "^0.8.0",
Expand Down
12 changes: 6 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const statsKeys = [
class Bitswap {
constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)
this._log = logger(this.peerId)

this._options = Object.assign({}, defaultOptions, options)

Expand All @@ -54,16 +54,16 @@ class Bitswap {
// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats)
this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats)

// handle message sending
this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)
this.wm = new WantManager(this.peerId, this.network, this._stats)

this.notifications = new Notifications(this.peerInfo.id)
this.notifications = new Notifications(this.peerId)
}

get peerInfo () {
return this._libp2p.peerInfo
get peerId () {
return this._libp2p.peerId
}

// handle messages received through the network
Expand Down
35 changes: 22 additions & 13 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const Message = require('./types/message')
const CONSTANTS = require('./constants')
const logger = require('./utils').logger
Expand All @@ -13,7 +15,7 @@ const BITSWAP120 = '/ipfs/bitswap/1.2.0'

class Network {
constructor (libp2p, bitswap, options, stats) {
this._log = logger(libp2p.peerInfo.id, 'network')
this._log = logger(libp2p.peerId, 'network')
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
Expand All @@ -37,14 +39,21 @@ class Network {
this._running = true
this.libp2p.handle(this.protocols, this._onConnection)

this.libp2p.on('peer:connect', this._onPeerConnect)
this.libp2p.on('peer:disconnect', this._onPeerDisconnect)
// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: this.protocols,
handlers: {
onConnect: this._onPeerConnect,
onDisconnect: this._onPeerDisconnect
}
})
this._registrarId = this.libp2p.registrar.register(topology)

// All existing connections are like new ones for us
for (const peer of this.libp2p.peerStore.peers.values()) {
if (this.libp2p.registrar.getConnection(peer)) {
this._onPeerConnect(peer)
}
const conn = this.libp2p.connectionManager.get(peer.id)

conn && this._onPeerConnect(conn)
}
}

Expand All @@ -54,8 +63,8 @@ class Network {
// Unhandle both, libp2p doesn't care if it's not already handled
this.libp2p.unhandle(this.protocols)

this.libp2p.removeListener('peer:connect', this._onPeerConnect)
this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect)
// unregister protocol and handlers
this.libp2p.registrar.unregister(this._registrarId)
}

/**
Expand Down Expand Up @@ -92,12 +101,12 @@ class Network {
}
}

_onPeerConnect (peerInfo) {
this.bitswap._onPeerConnected(peerInfo.id)
_onPeerConnect (peerId) {
this.bitswap._onPeerConnected(peerId)
}

_onPeerDisconnect (peerInfo) {
this.bitswap._onPeerDisconnected(peerInfo.id)
_onPeerDisconnect (peerId) {
this.bitswap._onPeerDisconnected(peerId)
}

/**
Expand Down Expand Up @@ -181,7 +190,7 @@ class Network {
/**
* Connects to another peer
*
* @param {PeerInfo|PeerId|Multiaddr} peer
* @param {PeerId|Multiaddr} peer
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Connection>}
Expand Down
5 changes: 3 additions & 2 deletions test/bitswap-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ describe('bitswap stats', () => {
bs2 = bitswaps[1]
bs2.start()

await libp2pNodes[0].dial(libp2pNodes[1].peerInfo)
const ma = `${libp2pNodes[1].multiaddrs[0]}/p2p/${libp2pNodes[1].peerId.toB58String()}`
await libp2pNodes[0].dial(ma)

block = await makeBlock()

Expand Down Expand Up @@ -212,7 +213,7 @@ describe('bitswap stats', () => {
})

it('has peer stats', async () => {
const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerInfo.id)
const peerStats = bs2.stat().forPeer(libp2pNodes[0].peerId)
expect(peerStats).to.exist()

const stats = await pEvent(peerStats, 'update')
Expand Down
28 changes: 21 additions & 7 deletions test/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
'use strict'

const { expect } = require('aegir/utils/chai')
const delay = require('delay')
const PeerId = require('peer-id')
const sinon = require('sinon')
const pWaitFor = require('p-wait-for')

const Bitswap = require('../src')

Expand Down Expand Up @@ -38,9 +38,12 @@ describe('bitswap without DHT', function () {
])

// connect 0 -> 1 && 1 -> 2
const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}`
const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}`

await Promise.all([
nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo),
nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo)
nodes[0].libp2pNode.dial(ma1),
nodes[1].libp2pNode.dial(ma2)
])
})

Expand Down Expand Up @@ -132,9 +135,19 @@ describe('bitswap with DHT', function () {
])

// connect 0 -> 1 && 1 -> 2
const ma1 = `${nodes[1].libp2pNode.multiaddrs[0]}/p2p/${nodes[1].libp2pNode.peerId.toB58String()}`
const ma2 = `${nodes[2].libp2pNode.multiaddrs[0]}/p2p/${nodes[2].libp2pNode.peerId.toB58String()}`

await Promise.all([
nodes[0].libp2pNode.dial(ma1),
nodes[1].libp2pNode.dial(ma2)
])

// await dht routing table are updated
await Promise.all([
nodes[0].libp2pNode.dial(nodes[1].libp2pNode.peerInfo),
nodes[1].libp2pNode.dial(nodes[2].libp2pNode.peerInfo)
pWaitFor(() => nodes[0].libp2pNode._dht.routingTable.size >= 1),
pWaitFor(() => nodes[1].libp2pNode._dht.routingTable.size >= 2),
pWaitFor(() => nodes[2].libp2pNode._dht.routingTable.size >= 1)
])
})

Expand All @@ -148,10 +161,11 @@ describe('bitswap with DHT', function () {

it('put a block in 2, get it in 0', async () => {
const block = await makeBlock()
const provideSpy = sinon.spy(nodes[2].libp2pNode._dht, 'provide')
await nodes[2].bitswap.put(block)

// Give put time to process
await delay(100)
// wait for the DHT to finish providing
await provideSpy.returnValues[0]

const blockRetrieved = await nodes[0].bitswap.get(block.cid)
expect(block.data).to.eql(blockRetrieved.data)
Expand Down
29 changes: 17 additions & 12 deletions test/network/network.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('network', () => {

it('connectTo fail', async () => {
try {
await networkA.connectTo(p2pB.peerInfo.id)
await networkA.connectTo(p2pB.peerId)
assert.fail()
} catch (err) {
expect(err).to.exist()
Expand All @@ -87,24 +87,26 @@ describe('network', () => {
var counter = 0

bitswapMockA._onPeerConnected = (peerId) => {
expect(peerId.toB58String()).to.equal(p2pB.peerInfo.id.toB58String())
expect(peerId.toB58String()).to.equal(p2pB.peerId.toB58String())
counter++
}

bitswapMockB._onPeerConnected = (peerId) => {
expect(peerId.toB58String()).to.equal(p2pA.peerInfo.id.toB58String())
expect(peerId.toB58String()).to.equal(p2pA.peerId.toB58String())
counter++
}

await p2pA.dial(p2pB.peerInfo)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
await p2pA.dial(ma)

await pWaitFor(() => counter >= 2)
bitswapMockA._onPeerConnected = () => {}
bitswapMockB._onPeerConnected = () => {}
})

it('connectTo success', async () => {
await networkA.connectTo(p2pB.peerInfo)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
await networkA.connectTo(ma)
})

const versions = [{
Expand Down Expand Up @@ -134,7 +136,8 @@ describe('network', () => {

bitswapMockB._receiveError = (err) => deferred.reject(err)

const { stream } = await p2pA.dialProtocol(p2pB.peerInfo, '/ipfs/bitswap/' + version.num)
const ma = `${p2pB.multiaddrs[0]}/p2p/${p2pB.peerId.toB58String()}`
const { stream } = await p2pA.dialProtocol(ma, '/ipfs/bitswap/' + version.num)
await pipe(
[version.serialize(msg)],
lp.encode(),
Expand Down Expand Up @@ -165,11 +168,12 @@ describe('network', () => {

bitswapMockB._receiveError = deferred.reject

await networkA.sendMessage(p2pB.peerInfo.id, msg)
await networkA.sendMessage(p2pB.peerId, msg)
})

it('dial to peer on Bitswap 1.0.0', async () => {
const { protocol } = await p2pA.dialProtocol(p2pC.peerInfo, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0'])
const ma = `${p2pC.multiaddrs[0]}/p2p/${p2pC.peerId.toB58String()}`
const { protocol } = await p2pA.dialProtocol(ma, ['/ipfs/bitswap/1.1.0', '/ipfs/bitswap/1.0.0'])

expect(protocol).to.equal('/ipfs/bitswap/1.0.0')
})
Expand All @@ -194,7 +198,7 @@ describe('network', () => {

bitswapMockC._receiveError = deferred.reject

await networkA.sendMessage(p2pC.peerInfo.id, msg)
await networkA.sendMessage(p2pC.peerId, msg)
await deferred.promise
})

Expand All @@ -208,16 +212,17 @@ describe('network', () => {
networkA.start()
networkB.start()

// FIXME: have to already be connected as sendMessage only accepts a peer id, not a PeerInfo
await p2pA.dial(p2pB.peerInfo)
// In a real network scenario, peers will be discovered and their addresses
// will be added to the addressBook before bitswap kicks in
p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs)

const deferred = pDefer()

bitswapMockB._receiveMessage = () => {
deferred.resolve()
}

await networkA.sendMessage(p2pB.peerInfo.id, new Message(true))
await networkA.sendMessage(p2pB.peerId, new Message(true))

return deferred
})
Expand Down
2 changes: 1 addition & 1 deletion test/utils/connect-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const without = require('lodash.without')
module.exports = async (nodes) => {
for (const node of nodes) {
for (const otherNode of without(nodes, node)) {
await node.libp2pNode.dial(otherNode.bitswap.peerInfo)
await node.libp2pNode.dial(otherNode.bitswap.peerId)
}
}
}
13 changes: 8 additions & 5 deletions test/utils/create-libp2p-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const libp2p = require('libp2p')
const KadDHT = require('libp2p-kad-dht')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')

const defaultsDeep = require('@nodeutils/defaults-deep')

class Node extends libp2p {
Expand Down Expand Up @@ -38,10 +38,13 @@ class Node extends libp2p {

async function createLibp2pNode (options = {}) {
const id = await PeerId.create({ bits: 512 })
const peerInfo = new PeerInfo(id)
peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
options.peerInfo = peerInfo
const node = new Node(options)
const node = new Node({
peerId: id,
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
...options
})
await node.start()

return node
Expand Down
Loading

0 comments on commit c4ede4d

Please sign in to comment.