Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: async peer store #272

Merged
merged 2 commits into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ import { create } from 'libp2p-kad-dht'
/**
* @param {Libp2p} libp2p
*/
function addDHT(libp2p) {
async function addDHT(libp2p) {
const customDHT = create({
libp2p,
protocolPrefix: '/custom'
})
customDHT.start()
await customDHT.start()

return customDHT
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"it-filter": "^1.0.3",
"it-last": "^1.0.6",
"it-pair": "^1.0.0",
"libp2p": "^0.35.4",
"libp2p": "libp2p/js-libp2p#feat/async-peerstore",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"p-retry": "^4.2.0",
Expand Down
15 changes: 10 additions & 5 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ContentRouting {
* @param {import('../query/manager').QueryManager} params.queryManager
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {boolean} params.lan
*/
constructor ({ peerId, network, peerRouting, queryManager, routingTable, providers, peerStore, lan }) {
Expand Down Expand Up @@ -137,10 +137,15 @@ class ContentRouting {

// yield values if we have some, also slice because maybe we got lucky and already have too many?
if (provs.length) {
const providers = provs.slice(0, toFind).map(peerId => ({
id: peerId,
multiaddrs: (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr)
}))
/** @type {{ id: PeerId, multiaddrs: Multiaddr[] }[]} */
const providers = []

for (const peerId of provs.slice(0, toFind)) {
providers.push({
id: peerId,
multiaddrs: ((await this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr)
})
}

yield peerResponseEvent({ from: this._peerId, messageType: MessageType.GET_PROVIDERS, providers })
yield providerEvent({ from: this._peerId, providers: providers })
Expand Down
14 changes: 7 additions & 7 deletions src/dual-kad-dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ class DualKadDHT extends EventEmitter {
/**
* Whether we are in client or server mode
*/
enableServerMode () {
this._wan.enableServerMode()
async enableServerMode () {
await this._wan.enableServerMode()
}

/**
* Whether we are in client or server mode
*/
enableClientMode () {
this._wan.enableClientMode()
async enableClientMode () {
await this._wan.enableClientMode()
}

/**
Expand Down Expand Up @@ -314,7 +314,7 @@ class DualKadDHT extends EventEmitter {
log('getPublicKey %p', peer)

// local check
const peerData = this._libp2p.peerStore.get(peer)
const peerData = await this._libp2p.peerStore.get(peer)

if (peerData && peerData.id.pubKey) {
log('getPublicKey: found local copy')
Expand All @@ -339,8 +339,8 @@ class DualKadDHT extends EventEmitter {

const peerId = new PeerId(peer.id, undefined, pk)
const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr)
this._libp2p.peerStore.addressBook.add(peerId, addrs)
this._libp2p.peerStore.keyBook.set(peerId, pk)
await this._libp2p.peerStore.addressBook.add(peerId, addrs)
await this._libp2p.peerStore.keyBook.set(peerId, pk)

return pk
}
Expand Down
16 changes: 9 additions & 7 deletions src/kad-dht.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ class KadDHT extends EventEmitter {

// handle peers being discovered via other peer discovery mechanisms
this._topologyListener.on('peer', async (peerId) => {
const multiaddrs = await this._libp2p.peerStore.addressBook.get(peerId)

const peerData = {
id: peerId,
multiaddrs: (this._libp2p.peerStore.addressBook.get(peerId) || []).map((/** @type {{ multiaddr: Multiaddr }} */ addr) => addr.multiaddr)
multiaddrs: multiaddrs.map(addr => addr.multiaddr)
}

this.onPeerConnect(peerData).catch(err => {
Expand Down Expand Up @@ -332,19 +334,19 @@ class KadDHT extends EventEmitter {
/**
* Whether we are in client or server mode
*/
enableServerMode () {
async enableServerMode () {
this._log('enabling server mode')
this._clientMode = false
this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc))
await this._libp2p.handle(this._protocol, this._rpc.onIncomingStream.bind(this._rpc))
}

/**
* Whether we are in client or server mode
*/
enableClientMode () {
async enableClientMode () {
this._log('enabling client mode')
this._clientMode = true
this._libp2p.unhandle(this._protocol)
await this._libp2p.unhandle(this._protocol)
}

/**
Expand All @@ -355,9 +357,9 @@ class KadDHT extends EventEmitter {

// Only respond to queries when not in client mode
if (this._clientMode) {
this.enableClientMode()
await this.enableClientMode()
} else {
this.enableServerMode()
await this.enableServerMode()
}

await Promise.all([
Expand Down
44 changes: 25 additions & 19 deletions src/peer-routing/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PeerRouting {
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {import('../network').Network} params.network
* @param {import('libp2p-interfaces/src/types').DhtValidators} params.validators
* @param {import('../query/manager').QueryManager} params.queryManager
Expand Down Expand Up @@ -52,11 +52,11 @@ class PeerRouting {

if (p) {
this._log('findPeerLocal found %p in routing table', peer)
peerData = this._peerStore.get(p)
peerData = await this._peerStore.get(p)
}

if (!peerData) {
peerData = this._peerStore.get(peer)
peerData = await this._peerStore.get(peer)
}

if (peerData) {
Expand Down Expand Up @@ -141,7 +141,7 @@ class PeerRouting {
const match = peers.find((p) => p.equals(id))

if (match) {
const peer = this._peerStore.get(id)
const peer = await this._peerStore.get(id)

if (peer) {
this._log('found in peerStore')
Expand Down Expand Up @@ -232,13 +232,15 @@ class PeerRouting {

this._log('found %d peers close to %b', peers.length, key)

yield * peers.peers.map(peer => finalPeerEvent({
from: this._peerId,
peer: {
id: peer,
multiaddrs: (this._peerStore.addressBook.get(peer) || []).map(addr => addr.multiaddr)
}
}))
for (const peer of peers.peers) {
yield finalPeerEvent({
from: this._peerId,
peer: {
id: peer,
multiaddrs: (await (this._peerStore.addressBook.get(peer)) || []).map(addr => addr.multiaddr)
}
})
}
}

/**
Expand Down Expand Up @@ -294,16 +296,20 @@ class PeerRouting {
async getCloserPeersOffline (key, closerThan) {
const id = await utils.convertBuffer(key)
const ids = this._routingTable.closestPeers(id)
const output = ids
.map((p) => {
const peer = this._peerStore.get(p)
const output = []

return {
id: p,
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
}
for (const peerId of ids) {
if (peerId.equals(closerThan)) {
continue
}

const peer = await this._peerStore.get(peerId)

output.push({
id: peerId,
multiaddrs: peer ? peer.addresses.map((address) => address.multiaddr) : []
})
.filter((closer) => !closer.id.equals(closerThan))
}

if (output.length) {
this._log('getCloserPeersOffline found %d peer(s) closer to %b than %p', output.length, key, closerThan)
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/handlers/add-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class AddProviderHandler {
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
*/
constructor ({ peerId, providers, peerStore }) {
this._peerId = peerId
Expand Down Expand Up @@ -69,7 +69,7 @@ class AddProviderHandler {

if (!this._peerId.equals(pi.id)) {
// Add known address to peer store
this._peerStore.addressBook.add(pi.id, pi.multiaddrs)
await this._peerStore.addressBook.add(pi.id, pi.multiaddrs)
await this._providers.addProvider(cid, pi.id)
}
})
Expand Down
35 changes: 22 additions & 13 deletions src/rpc/handlers/get-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
* @typedef {import('../../types').PeerData} PeerData
*/

/**
Expand All @@ -24,7 +25,7 @@ class GetProvidersHandler {
* @param {PeerId} params.peerId
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {import('../../types').Addressable} params.addressable
* @param {boolean} [params.lan]
*/
Expand Down Expand Up @@ -58,8 +59,8 @@ class GetProvidersHandler {
this._peerRouting.getCloserPeersOffline(msg.key, peerId)
])

const providerPeers = this._getPeers(peers)
const closerPeers = this._getPeers(closer.map(({ id }) => id))
const providerPeers = await this._getPeers(peers)
const closerPeers = await this._getPeers(closer.map(({ id }) => id))
const response = new Message(msg.type, msg.key, msg.clusterLevel)

if (providerPeers.length > 0) {
Expand All @@ -77,22 +78,30 @@ class GetProvidersHandler {
/**
* @param {PeerId} peerId
*/
_getAddresses (peerId) {
return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (this._peerStore.addressBook.get(peerId) || []).map(address => address.multiaddr)
async _getAddresses (peerId) {
return this._peerId.equals(peerId) ? this._addressable.multiaddrs : (await (this._peerStore.addressBook.get(peerId)) || []).map(address => address.multiaddr)
}

/**
* @param {PeerId[]} peerIds
* @returns
*/
_getPeers (peerIds) {
return peerIds
.map((peerId) => ({
async _getPeers (peerIds) {
/** @type {PeerData[]} */
const output = []
const addrFilter = this._lan ? removePublicAddresses : removePrivateAddresses

for (const peerId of peerIds) {
const peer = addrFilter({
id: peerId,
multiaddrs: this._getAddresses(peerId)
}))
.map(this._lan ? removePublicAddresses : removePrivateAddresses)
.filter(({ multiaddrs }) => multiaddrs.length)
multiaddrs: await this._getAddresses(peerId)
})

if (peer.multiaddrs.length) {
output.push(peer)
}
}

return output
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/rpc/handlers/get-value.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const log = utils.logger('libp2p:kad-dht:rpc:handlers:get-value')
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../types').DHTMessageHandler} DHTMessageHandler
* @typedef {import('libp2p-interfaces/src/keys/types').PublicKey} PublicKey
*/

/**
Expand All @@ -22,7 +23,7 @@ class GetValueHandler {
/**
* @param {object} params
* @param {PeerId} params.peerId
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.records
*/
Expand Down Expand Up @@ -53,18 +54,18 @@ class GetValueHandler {
if (utils.isPublicKeyKey(key)) {
log('is public key')
const idFromKey = utils.fromPublicKeyKey(key)
let id
/** @type {PublicKey | undefined} */
let pubKey

if (this._peerId.equals(idFromKey)) {
id = this._peerId
pubKey = this._peerId.pubKey
} else {
const peerData = this._peerStore.get(idFromKey)
id = peerData && peerData.id
pubKey = await this._peerStore.keyBook.get(idFromKey)
}

if (id && id.pubKey) {
if (pubKey != null) {
log('returning found public key')
response.record = new Record(key, id.pubKey.bytes)
response.record = new Record(key, pubKey.bytes)
return response
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const { PutValueHandler } = require('./put-value')
* @param {object} params
* @param {import('peer-id')} params.peerId
* @param {import('../../providers').Providers} params.providers
* @param {import('../../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {import('../../types').Addressable} params.addressable
* @param {import('../../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.records
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RPC {
* @param {import('../routing-table').RoutingTable} params.routingTable
* @param {import('peer-id')} params.peerId
* @param {import('../providers').Providers} params.providers
* @param {import('../types').PeerStore} params.peerStore
* @param {import('libp2p/src/peer-store/types').PeerStore} params.peerStore
* @param {import('../types').Addressable} params.addressable
* @param {import('../peer-routing').PeerRouting} params.peerRouting
* @param {import('interface-datastore').Datastore} params.records
Expand Down
6 changes: 3 additions & 3 deletions src/topology-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TopologyListener extends EventEmitter {
* Create a new network
*
* @param {object} params
* @param {import('./types').Registrar} params.registrar
* @param {import('libp2p/src/registrar')} params.registrar
* @param {string} params.protocol
* @param {boolean} params.lan
*/
Expand All @@ -28,7 +28,7 @@ class TopologyListener extends EventEmitter {
/**
* Start the network
*/
start () {
async start () {
if (this._running) {
return
}
Expand All @@ -46,7 +46,7 @@ class TopologyListener extends EventEmitter {
onDisconnect: () => {}
}
})
this._registrarId = this._registrar.register(topology)
this._registrarId = await this._registrar.register(topology)
}

/**
Expand Down
Loading