Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: update interfaces to new version (#327)
Browse files Browse the repository at this point in the history
Removes dialer, lets connection manager manage connections.
  • Loading branch information
achingbrain authored May 3, 2022
1 parent 526e65e commit 388042b
Show file tree
Hide file tree
Showing 16 changed files with 140 additions and 186 deletions.
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
/** @type {import('aegir').PartialOptions} */
export default {
build: {
bundlesizeMax: '300KB'
bundlesizeMax: '160KB'
}
}
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@
"release": "aegir release"
},
"dependencies": {
"@libp2p/crypto": "^0.22.10",
"@libp2p/interfaces": "^1.3.21",
"@libp2p/logger": "^1.1.3",
"@libp2p/peer-id": "^1.1.9",
"@libp2p/record": "^1.0.3",
"@libp2p/crypto": "^0.22.11",
"@libp2p/interfaces": "^1.3.29",
"@libp2p/logger": "^1.1.4",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/record": "^1.0.4",
"@libp2p/topology": "^1.1.7",
"@multiformats/multiaddr": "^10.1.5",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"datastore-core": "^7.0.0",
"err-code": "^3.0.1",
Expand Down Expand Up @@ -168,9 +169,9 @@
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.1.21",
"@libp2p/interface-compliance-tests": "^1.1.31",
"@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/peer-store": "^1.0.8",
"@libp2p/peer-store": "^1.0.11",
"@types/lodash.random": "^3.2.6",
"@types/lodash.range": "^3.2.6",
"@types/node": "^16.11.26",
Expand Down
1 change: 1 addition & 0 deletions src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export class ContentFetching implements Initializable {
const msg = new Message(MESSAGE_TYPE.PUT_VALUE, key, 0)
msg.record = Libp2pRecord.deserialize(record)

this.log('send put to %p', event.peer.id)
for await (const putEvent of this.network.sendRequest(event.peer.id, msg, options)) {
events.push(putEvent)

Expand Down
3 changes: 1 addition & 2 deletions src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
}

async onPeerConnect (peerData: PeerInfo) {
this.log('peer %p connected', peerData.id)
this.log('peer %p connected with protocols %s', peerData.id, peerData.protocols)

if (this.lan) {
peerData = removePublicAddresses(peerData)
Expand Down Expand Up @@ -242,7 +242,6 @@ export class KadDHT extends EventEmitter<PeerDiscoveryEvents> implements DHT, In
} else {
this.log('enabling server mode')
this.clientMode = false

await this.components.getRegistrar().handle(this.protocol, this.rpc.onIncomingStream.bind(this.rpc))
}
}
Expand Down
48 changes: 35 additions & 13 deletions src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import type { Logger } from '@libp2p/logger'
import type { Duplex } from 'it-stream-types'
import type { PeerInfo } from '@libp2p/interfaces/peer-info'
import { Components, Initializable } from '@libp2p/interfaces/components'
import type { Stream } from '@libp2p/interfaces/connection'
import { abortableDuplex } from 'abortable-iterator'

export interface NetworkInit {
protocol: string
Expand Down Expand Up @@ -87,15 +89,17 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
}

this.log('sending %s to %p', msg.type, to)
yield dialingPeerEvent({ peer: to })
yield sendingQueryEvent({ to, type: msg.type })

try {
yield dialingPeerEvent({ peer: to })

const { stream } = await this.components.getDialer().dialProtocol(to, this.protocol, options)
let stream: Stream | undefined

yield sendingQueryEvent({ to, type: msg.type })
try {
const connection = await this.components.getConnectionManager().openConnection(to, options)
const streamData = await connection.newStream(this.protocol)
stream = streamData.stream

const response = await this._writeReadMessage(stream, msg.serialize())
const response = await this._writeReadMessage(stream, msg.serialize(), options)

yield peerResponseEvent({
from: to,
Expand All @@ -106,6 +110,10 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
})
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
} finally {
if (stream != null) {
stream.close()
}
}
}

Expand All @@ -118,26 +126,36 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
}

this.log('sending %s to %p', msg.type, to)

yield dialingPeerEvent({ peer: to })

const { stream } = await this.components.getDialer().dialProtocol(to, this.protocol, options)

yield sendingQueryEvent({ to, type: msg.type })

let stream: Stream | undefined

try {
await this._writeMessage(stream, msg.serialize())
const connection = await this.components.getConnectionManager().openConnection(to, options)
const data = await connection.newStream(this.protocol)
stream = data.stream

await this._writeMessage(stream, msg.serialize(), options)

yield peerResponseEvent({ from: to, messageType: msg.type })
} catch (err: any) {
yield queryErrorEvent({ from: to, error: err })
} finally {
if (stream != null) {
stream.close()
}
}
}

/**
* Write a message to the given stream
*/
async _writeMessage (stream: Duplex<Uint8Array>, msg: Uint8Array) {
async _writeMessage (stream: Duplex<Uint8Array>, msg: Uint8Array, options: AbortOptions) {
if (options.signal != null) {
stream = abortableDuplex(stream, options.signal)
}

await pipe(
[msg],
lp.encode(),
Expand All @@ -151,7 +169,11 @@ export class Network extends EventEmitter<NetworkEvents> implements Startable, I
* If no response is received after the specified timeout
* this will error out.
*/
async _writeReadMessage (stream: Duplex<Uint8Array>, msg: Uint8Array) {
async _writeReadMessage (stream: Duplex<Uint8Array>, msg: Uint8Array, options: AbortOptions) {
if (options.signal != null) {
stream = abortableDuplex(stream, options.signal)
}

const res = await pipe(
[msg],
lp.encode(),
Expand Down
3 changes: 2 additions & 1 deletion src/routing-table/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ export class RoutingTable implements Startable, Initializable {
timeoutController = new TimeoutController(this.pingTimeout)

this.log('pinging old contact %p', oldContact.peer)
const { stream } = await this.components.getDialer().dialProtocol(oldContact.peer, PROTOCOL_DHT, {
const connection = await this.components.getConnectionManager().openConnection(oldContact.peer, {
signal: timeoutController.signal
})
const { stream } = await connection.newStream(PROTOCOL_DHT)
await stream.close()
responded++
} catch (err: any) {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class RPC implements Initializable {
source => (async function * () {
for await (const msg of source) {
// handle the message
const desMessage = Message.deserialize(msg.slice())
const desMessage = Message.deserialize(msg)
self.log('incoming %s from %p', desMessage.type, peerId)
const res = await self.handleMessage(peerId, desMessage)

Expand Down
2 changes: 1 addition & 1 deletion src/topology-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class TopologyListener extends EventEmitter<TopologyListenerEvents> imple
// register protocol with topology
const topology = createTopology({
onConnect: (peerId) => {
this.log('observed peer %p with protocol %s', this.protocol, peerId)
this.log('observed peer %p with protocol %s', peerId, this.protocol)
this.dispatchEvent(new CustomEvent('peer', {
detail: peerId
}))
Expand Down
4 changes: 2 additions & 2 deletions test/generate-peers/generate-peers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import {
} from '../../src/utils.js'
import { Components } from '@libp2p/interfaces/components'
import { stubInterface } from 'ts-sinon'
import type { Dialer } from '@libp2p/interfaces/dialer'
import path from 'path'
import { fileURLToPath } from 'url'
import type { ConnectionManager } from '@libp2p/interfaces/connection-manager'

const dirname = path.dirname(fileURLToPath(import.meta.url))

Expand Down Expand Up @@ -54,7 +54,7 @@ describe.skip('generate peers', function () {

const components = new Components({
peerId: id,
dialer: stubInterface<Dialer>()
connectionManager: stubInterface<ConnectionManager>()
})
const table = new RoutingTable({
kBucketSize: 20,
Expand Down
14 changes: 9 additions & 5 deletions test/kad-dht.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,14 @@ describe('KadDHT', () => {
new Array(nDHTs).fill(0).map(async () => await tdht.spawn())
)

const connected: Array<Promise<void>> = []

for (let i = 0; i < dhts.length - 1; i++) {
await tdht.connect(dhts[i], dhts[(i + 1) % dhts.length])
connected.push(tdht.connect(dhts[i], dhts[(i + 1) % dhts.length]))
}

await Promise.all(connected)

const res = await all(filter(dhts[1].getClosestPeers(uint8ArrayFromString('foo')), event => event.name === 'FINAL_PEER'))

expect(res).to.not.be.empty()
Expand Down Expand Up @@ -785,16 +789,16 @@ describe('KadDHT', () => {
tdht.spawn(),
tdht.spawn()
])
const stub = sinon.stub(dhtA.components.getDialer(), 'dialProtocol').rejects(error)

await tdht.connect(dhtA, dhtB)

const stub = sinon.stub(dhtA.components.getConnectionManager(), 'openConnection').rejects(error)

const errors = await all(filter(dhtA.get(uint8ArrayFromString('/v/hello')), event => event.name === 'QUERY_ERROR'))

expect(errors).to.have.lengthOf(3)
expect(errors).to.have.lengthOf(2)
expect(errors).to.have.nested.property('[0].error.code', errCode)
expect(errors).to.have.nested.property('[1].error.code', errCode)
expect(errors).to.have.nested.property('[2].error.code', 'ERR_NOT_FOUND')
expect(errors).to.have.nested.property('[1].error.code', 'ERR_NOT_FOUND')

stub.restore()
})
Expand Down
79 changes: 37 additions & 42 deletions test/network.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { pair } from 'it-pair'
import { pipe } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import pDefer from 'p-defer'
Expand All @@ -11,9 +10,9 @@ import { Message, MESSAGE_TYPE } from '../src/message/index.js'
import { TestDHT } from './utils/test-dht.js'
import { mockStream } from '@libp2p/interface-compliance-tests/mocks'
import type { DualKadDHT } from '../src/dual-kad-dht.js'
import type { Sink } from 'it-stream-types'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Connection } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Sink } from 'it-stream-types'

describe('Network', () => {
let dht: DualKadDHT
Expand All @@ -33,17 +32,6 @@ describe('Network', () => {
it('send and response echo', async () => {
const msg = new Message(MESSAGE_TYPE.PING, uint8ArrayFromString('hello'), 0)

// mock dial
dht.components.getDialer().dialProtocol = async (peer: PeerId | Multiaddr, protocols: string | string[]) => {
const protocol = Array.isArray(protocols) ? protocols[0] : protocols

// {source, sink} streams that are internally connected
return {
stream: mockStream(pair()),
protocol
}
}

const events = await all(dht.lan.network.sendRequest(dht.components.getPeerId(), msg))
const response = events
.filter(event => event.name === 'PEER_RESPONSE')
Expand All @@ -63,36 +51,43 @@ describe('Network', () => {
const msg = new Message(MESSAGE_TYPE.PING, uint8ArrayFromString('hello'), 0)

// mock it
dht.components.getDialer().dialProtocol = async (peer: PeerId | Multiaddr, protocols: string | string[]) => {
const protocol = Array.isArray(protocols) ? protocols[0] : protocols
const msg = new Message(MESSAGE_TYPE.FIND_NODE, uint8ArrayFromString('world'), 0)

const data = await pipe(
[msg.serialize()],
lp.encode(),
async (source) => await all(source)
)

const source = (function * () {
const array = data

yield * array
})()

const sink: Sink<Uint8Array> = async source => {
const res = await pipe(
source,
lp.decode(),
async (source) => await all(source)
)
expect(Message.deserialize(res[0]).type).to.eql(MESSAGE_TYPE.PING)
finish()
dht.components.getConnectionManager().openConnection = async (peer: PeerId) => {
// @ts-expect-error incomplete implementation
const connection: Connection = {
newStream: async (protocols: string | string[]) => {
const protocol = Array.isArray(protocols) ? protocols[0] : protocols
const msg = new Message(MESSAGE_TYPE.FIND_NODE, uint8ArrayFromString('world'), 0)

const data = await pipe(
[msg.serialize()],
lp.encode(),
async (source) => await all(source)
)

const source = (function * () {
const array = data

yield * array
})()

const sink: Sink<Uint8Array> = async source => {
const res = await pipe(
source,
lp.decode(),
async (source) => await all(source)
)
expect(Message.deserialize(res[0]).type).to.eql(MESSAGE_TYPE.PING)
finish()
}

return {
protocol,
stream: mockStream({ source, sink })
}
}
}

return {
protocol,
stream: mockStream({ source, sink })
}
return connection
}

const events = await all(dht.lan.network.sendRequest(dht.components.getPeerId(), msg))
Expand Down
Loading

0 comments on commit 388042b

Please sign in to comment.