Skip to content

Commit

Permalink
fix: make connection securing abortable
Browse files Browse the repository at this point in the history
To allow doing things like having a single `AbortSignal` that can
be used as a timeout for incoming connection establishment, allow
passing it as an option to the `ConnectionEncrypter` `secureOutbound`
and `secureInbound` methods.

Previously we'd wrap the stream to be secured in an `AbortableSource`,
however this has some [serious performance implications](ChainSafe/js-libp2p-gossipsub#361)
and it's generally better to just use a signal to cancel an ongoing
operation instead of racing every chunk that comes out of the source.
  • Loading branch information
achingbrain committed Aug 14, 2024
1 parent 0edbfe7 commit 786ba3c
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 61 deletions.
41 changes: 13 additions & 28 deletions packages/connection-encrypter-plaintext/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { UnexpectedPeerError, InvalidCryptoExchangeError, serviceCapabilities }
import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id'
import { pbStream } from 'it-protobuf-stream'
import { Exchange, KeyType } from './pb/proto.js'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId } from '@libp2p/interface'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, SecureConnectionOptions } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand All @@ -34,22 +34,12 @@ export interface PlaintextComponents {
logger: ComponentLogger
}

export interface PlaintextInit {
/**
* The peer id exchange must complete within this many milliseconds
* (default: 1000)
*/
timeout?: number
}

class Plaintext implements ConnectionEncrypter {
public protocol: string = PROTOCOL
private readonly log: Logger
private readonly timeout: number

constructor (components: PlaintextComponents, init: PlaintextInit = {}) {
constructor (components: PlaintextComponents) {
this.log = components.logger.forComponent('libp2p:plaintext')
this.timeout = init.timeout ?? 1000
}

readonly [Symbol.toStringTag] = '@libp2p/plaintext'
Expand All @@ -58,19 +48,18 @@ class Plaintext implements ConnectionEncrypter {
'@libp2p/connection-encryption'
]

async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, remoteId)
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, options)
}

async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, remoteId)
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, options)
}

/**
* Encrypt connection
*/
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
const signal = AbortSignal.timeout(this.timeout)
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const pb = pbStream(conn).pb(Exchange)

let type = KeyType.RSA
Expand All @@ -81,7 +70,7 @@ class Plaintext implements ConnectionEncrypter {
type = KeyType.Secp256k1
}

this.log('write pubkey exchange to peer %p', remoteId)
this.log('write pubkey exchange to peer %p', options?.remotePeer)

const [
, response
Expand All @@ -93,13 +82,9 @@ class Plaintext implements ConnectionEncrypter {
Type: type,
Data: localId.publicKey ?? new Uint8Array(0)
}
}, {
signal
}),
}, options),
// Get the Exchange message
pb.read({
signal
})
pb.read(options)
])

let peerId
Expand All @@ -126,7 +111,7 @@ class Plaintext implements ConnectionEncrypter {
throw new InvalidCryptoExchangeError('Remote did not provide its public key')
}

if (remoteId != null && !peerId.equals(remoteId)) {
if (options?.remotePeer != null && !peerId.equals(options?.remotePeer)) {
throw new UnexpectedPeerError()
}

Expand All @@ -139,6 +124,6 @@ class Plaintext implements ConnectionEncrypter {
}
}

export function plaintext (init?: PlaintextInit): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components, init)
export function plaintext (): (components: PlaintextComponents) => ConnectionEncrypter {
return (components) => new Plaintext(components)
}
8 changes: 6 additions & 2 deletions packages/connection-encrypter-plaintext/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ describe('plaintext', () => {

await Promise.all([
encrypter.secureInbound(remotePeer, inbound),
encrypter.secureOutbound(localPeer, outbound, wrongPeer)
encrypter.secureOutbound(localPeer, outbound, {
remotePeer: wrongPeer
})
]).then(() => expect.fail('should have failed'), (err) => {
expect(err).to.exist()
expect(err).to.have.property('code', UnexpectedPeerError.code)
Expand All @@ -68,7 +70,9 @@ describe('plaintext', () => {

await expect(Promise.all([
encrypter.secureInbound(localPeer, inbound),
encrypter.secureOutbound(remotePeer, outbound, localPeer)
encrypter.secureOutbound(remotePeer, outbound, {
remotePeer: localPeer
})
]))
.to.eventually.be.rejected.with.property('code', InvalidCryptoExchangeError.code)
})
Expand Down
12 changes: 2 additions & 10 deletions packages/connection-encrypter-tls/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ export interface TLSComponents {
logger: ComponentLogger
}

export interface TLSInit {
/**
* The peer id exchange must complete within this many milliseconds
* (default: 1000)
*/
timeout?: number
}

export function tls (init?: TLSInit): (components: TLSComponents) => ConnectionEncrypter {
return (components) => new TLS(components, init)
export function tls (): (components: TLSComponents) => ConnectionEncrypter {
return (components) => new TLS(components)
}
28 changes: 11 additions & 17 deletions packages/connection-encrypter-tls/src/tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,17 @@ import { TLSSocket, type TLSSocketOptions, connect } from 'node:tls'
import { CodeError, serviceCapabilities } from '@libp2p/interface'
import { generateCertificate, verifyPeerCertificate, itToStream, streamToIt } from './utils.js'
import { PROTOCOL } from './index.js'
import type { TLSComponents, TLSInit } from './index.js'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, Logger } from '@libp2p/interface'
import type { TLSComponents } from './index.js'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PeerId, Logger, SecureConnectionOptions } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

export class TLS implements ConnectionEncrypter {
public protocol: string = PROTOCOL
private readonly log: Logger
private readonly timeout: number

constructor (components: TLSComponents, init: TLSInit = {}) {
constructor (components: TLSComponents) {
this.log = components.logger.forComponent('libp2p:tls')
this.timeout = init.timeout ?? 1000
}

readonly [Symbol.toStringTag] = '@libp2p/tls'
Expand All @@ -43,18 +41,18 @@ export class TLS implements ConnectionEncrypter {
'@libp2p/connection-encryption'
]

async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, true, remoteId)
async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, true, options)
}

async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, false, remoteId)
async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(localId, conn, false, options)
}

/**
* Encrypt connection
*/
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, isServer: boolean, remoteId?: PeerId): Promise<SecuredConnection<Stream>> {
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localId: PeerId, conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const opts: TLSSocketOptions = {
...await generateCertificate(localId),
isServer,
Expand All @@ -81,14 +79,14 @@ export class TLS implements ConnectionEncrypter {
}

return new Promise((resolve, reject) => {
const abortTimeout = setTimeout(() => {
options?.signal?.addEventListener('abort', () => {
socket.destroy(new CodeError('Handshake timeout', 'ERR_HANDSHAKE_TIMEOUT'))
}, this.timeout)
})

const verifyRemote = (): void => {
const remote = socket.getPeerCertificate()

verifyPeerCertificate(remote.raw, remoteId, this.log)
verifyPeerCertificate(remote.raw, options?.remotePeer, this.log)
.then(remotePeer => {
this.log('remote certificate ok, remote peer %p', remotePeer)

Expand All @@ -103,14 +101,10 @@ export class TLS implements ConnectionEncrypter {
.catch((err: Error) => {
reject(err)
})
.finally(() => {
clearTimeout(abortTimeout)
})
}

socket.on('error', (err: Error) => {
reject(err)
clearTimeout(abortTimeout)
})
socket.once('secure', (evt) => {
this.log('verifying remote certificate')
Expand Down
8 changes: 6 additions & 2 deletions packages/connection-encrypter-tls/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ describe('tls', () => {

await Promise.all([
encrypter.secureInbound(remotePeer, inbound),
encrypter.secureOutbound(localPeer, outbound, wrongPeer)
encrypter.secureOutbound(localPeer, outbound, {
remotePeer: wrongPeer
})
]).then(() => expect.fail('should have failed'), (err) => {
expect(err).to.exist()
expect(err).to.have.property('code', UnexpectedPeerError.code)
Expand All @@ -68,7 +70,9 @@ describe('tls', () => {

await expect(Promise.all([
encrypter.secureInbound(localPeer, inbound),
encrypter.secureOutbound(remotePeer, outbound, localPeer)
encrypter.secureOutbound(remotePeer, outbound, {
remotePeer: localPeer
})
]))
.to.eventually.be.rejected.with.property('code', InvalidCryptoExchangeError.code)
})
Expand Down
14 changes: 12 additions & 2 deletions packages/interface/src/connection-encrypter/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import type { MultiaddrConnection } from '../connection/index.js'
import type { AbortOptions } from '../index.js'
import type { PeerId } from '../peer-id/index.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

/**
* If the remote PeerId is known and passed as an option, the securing operation
* will throw if the remote peer cannot prove it has the private key that
* corresponds to the public key the remote PeerId is derived from.
*/
export interface SecureConnectionOptions extends AbortOptions {
remotePeer?: PeerId
}

/**
* A libp2p connection encrypter module must be compliant to this interface
* to ensure all exchanged data between two peers is encrypted.
Expand All @@ -15,14 +25,14 @@ export interface ConnectionEncrypter<Extension = unknown> {
* pass it for extra verification, otherwise it will be determined during
* the handshake.
*/
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>

/**
* Decrypt incoming data. If the remote PeerId is known,
* pass it for extra verification, otherwise it will be determined during
* the handshake
*/
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, remotePeer?: PeerId): Promise<SecuredConnection<Stream, Extension>>
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (localPeer: PeerId, connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
}

export interface SecuredConnection<Stream = any, Extension = unknown> {
Expand Down

0 comments on commit 786ba3c

Please sign in to comment.