Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
feat!: change stream muxer interface (#279)
Browse files Browse the repository at this point in the history
* feat!: change connection encryption interface to uint8arraylist

* feat!: change stream muxer interface

* chore: update types

* chore: linting

* chore: remove unused dep

* chore: types

Co-authored-by: achingbrain <[email protected]>
  • Loading branch information
mpetrunic and achingbrain authored Aug 7, 2022
1 parent 1fa580c commit 1ebe269
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 31 deletions.
10 changes: 5 additions & 5 deletions packages/interface-connection/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export interface StreamStat {
* It may be encrypted and multiplexed depending on the
* configuration of the nodes.
*/
export interface Stream<T extends Uint8Array | Uint8ArrayList = Uint8Array> extends Duplex<T> {
export interface Stream extends Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> {
/**
* Close a stream for reading and writing
*/
Expand Down Expand Up @@ -120,23 +120,23 @@ export interface Stream<T extends Uint8Array | Uint8ArrayList = Uint8Array> exte
* multiplexed, depending on the configuration of the nodes
* between which the connection is made.
*/
export interface Connection<T extends Uint8Array | Uint8ArrayList = Uint8Array> {
export interface Connection {
id: string
stat: ConnectionStat
remoteAddr: Multiaddr
remotePeer: PeerId
tags: string[]
streams: Array<Stream<T>>
streams: Stream[]

newStream: (multicodecs: string | string[], options?: AbortOptions) => Promise<Stream>
addStream: (stream: Stream<T>) => void
addStream: (stream: Stream) => void
removeStream: (id: string) => void
close: () => Promise<void>
}

export const symbol = Symbol.for('@libp2p/connection')

export function isConnection (other: any): other is Connection<Uint8Array | Uint8ArrayList> {
export function isConnection (other: any): other is Connection {
return other != null && Boolean(other[symbol])
}

Expand Down
6 changes: 3 additions & 3 deletions packages/interface-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ export interface Stats {
push: (counter: string, inc: number) => void
}

export interface TrackStreamOptions <T extends Duplex<Uint8Array>> {
export interface TrackStreamOptions {
/**
* A duplex iterable stream
*/
stream: T
stream: Duplex<{ byteLength: number }, any>

/**
* The id of the remote peer that's connected
Expand Down Expand Up @@ -111,7 +111,7 @@ export interface StreamMetrics {
* When the `PeerId` is known, `Metrics.updatePlaceholder` should be called
* with the placeholder string returned from here, and the known `PeerId`.
*/
trackStream: <T extends Duplex<Uint8Array>> (data: TrackStreamOptions<T>) => T
trackStream: (data: TrackStreamOptions) => void
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-mocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"@libp2p/interface-transport": "^1.0.0",
"@libp2p/interfaces": "^3.0.0",
"@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^2.0.0",
"@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-id": "^1.1.12",
"@libp2p/peer-id-factory": "^1.0.12",
"@multiformats/multiaddr": "^10.2.0",
Expand Down
11 changes: 5 additions & 6 deletions packages/interface-mocks/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import type { Registrar } from '@libp2p/interface-registrar'
import { mockRegistrar } from './registrar.js'
import { Dialer, Listener } from '@libp2p/multistream-select'
import * as mss from '@libp2p/multistream-select'
import { logger } from '@libp2p/logger'
import * as STATUS from '@libp2p/interface-connection/status'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { StreamMuxer } from '@libp2p/interface-stream-muxer'
import type { Components } from '@libp2p/components'
import type { AbortOptions } from '@libp2p/interfaces'
import errCode from 'err-code'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:mock-connection')

Expand Down Expand Up @@ -79,8 +80,7 @@ class MockConnection implements Connection {

const id = `${Math.random()}`
const stream: Stream = this.muxer.newStream(id)
const mss = new Dialer(stream)
const result = await mss.select(protocols, options)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
...stream,
Expand Down Expand Up @@ -130,9 +130,8 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
const muxer = muxerFactory.createStreamMuxer({
direction: direction,
onIncomingStream: (muxedStream) => {
const mss = new Listener(muxedStream)
try {
mss.handle(registrar.getProtocols())
mss.handle(muxedStream, registrar.getProtocols())
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream = { ...muxedStream, ...stream }
Expand Down Expand Up @@ -169,7 +168,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
return connection
}

export function mockStream (stream: Duplex<Uint8Array>): Stream {
export function mockStream (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>): Stream {
return {
...stream,
close: () => {},
Expand Down
10 changes: 5 additions & 5 deletions packages/interface-mocks/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type StreamMessage = DataMessage | ResetMessage | CloseMessage | CreateMessage

class MuxedStream {
public id: string
public input: Pushable<Uint8Array>
public input: Pushable<Uint8ArrayList>
public stream: Stream
public type: 'initiator' | 'recipient'

Expand Down Expand Up @@ -299,7 +299,7 @@ class MockMuxer implements StreamMuxer {
try {
await pipe(
abortableSource(source, this.closeController.signal),
(source) => map(source, buf => uint8ArrayToString(buf)),
(source) => map(source, buf => uint8ArrayToString(buf.subarray())),
ndjson.parse,
async (source) => {
for await (const message of source) {
Expand Down Expand Up @@ -344,7 +344,7 @@ class MockMuxer implements StreamMuxer {
}

if (message.type === 'data') {
muxedStream.input.push(uint8ArrayFromString(message.chunk, 'base64'))
muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64')))
} else if (message.type === 'reset') {
this.log('-> reset stream %s %s', muxedStream.type, muxedStream.stream.id)
muxedStream.stream.reset()
Expand Down Expand Up @@ -422,10 +422,10 @@ class MockMuxerFactory implements StreamMuxerFactory {
void pipe(
mockMuxer.streamInput,
ndjson.stringify,
(source) => map(source, str => uint8ArrayFromString(str)),
(source) => map(source, str => new Uint8ArrayList(uint8ArrayFromString(str))),
async (source) => {
for await (const buf of source) {
mockMuxer.input.push(buf)
mockMuxer.input.push(buf.subarray())
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
"it-stream-types": "^1.0.4",
"p-defer": "^4.0.0",
"p-limit": "^4.0.0",
"uint8arraylist": "^2.1.2",
"uint8arrays": "^3.0.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests'
import type { Stream } from '@libp2p/interface-connection'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { Source, Duplex } from 'it-stream-types'
import { Uint8ArrayList } from 'uint8arraylist'

async function drainAndClose (stream: Duplex<Uint8Array>) {
async function drainAndClose (stream: Duplex<any>) {
return await pipe([], stream, drain)
}

Expand Down Expand Up @@ -144,7 +145,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
})

it('Open a stream on one side, write, open a stream on the other side', async () => {
const toString = (source: Source<Uint8Array>) => map(source, (u) => uint8ArrayToString(u))
const toString = (source: Source<Uint8ArrayList>) => map(source, (u) => uint8ArrayToString(u.subarray()))
const p = duplexPair<Uint8Array>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
Expand All @@ -169,8 +170,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
const dialerConn = dialer.newStream()
const listenerConn = listener.newStream()

void pipe([uint8ArrayFromString('hey')], dialerConn)
void pipe([uint8ArrayFromString('hello')], listenerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn)

const [
dialerStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import pDefer from 'p-defer'
import all from 'it-all'
import { Uint8ArrayList } from 'uint8arraylist'

function randomBuffer () {
return uint8ArrayFromString(Math.random().toString())
Expand All @@ -18,7 +19,7 @@ function randomBuffer () {
const infiniteRandom = {
[Symbol.asyncIterator]: async function * () {
while (true) {
yield randomBuffer()
yield new Uint8ArrayList(randomBuffer())
await delay(50)
}
}
Expand Down Expand Up @@ -220,7 +221,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {

// Pause, and then send some data and close the first stream
await delay(50)
await pipe([randomBuffer()], stream, drain)
await pipe([new Uint8ArrayList(randomBuffer())], stream, drain)
closed = true

// Abort all the other streams later
Expand All @@ -232,7 +233,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
})

it('can close a stream for writing', async () => {
const deferred = pDefer<any>()
const deferred = pDefer<Error>()

const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
Expand All @@ -257,8 +258,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
expect(results).to.eql(data)

try {
await stream.sink([randomBuffer()])
} catch (err) {
await stream.sink([new Uint8ArrayList(randomBuffer())])
} catch (err: any) {
deferred.resolve(err)
}

Expand All @@ -283,7 +284,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const data = [randomBuffer(), randomBuffer()]
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import drain from 'it-drain'
import all from 'it-all'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import { Uint8ArrayList } from 'uint8arraylist'

export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMuxer>, nStreams: number, nMsg: number, limit?: number) => {
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array>()

const msg = uint8ArrayFromString('simple msg')
const msg = new Uint8ArrayList(uint8ArrayFromString('simple msg'))

const listener = await createMuxer({
direction: 'inbound',
Expand Down

0 comments on commit 1ebe269

Please sign in to comment.