Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

refactor: API changes and switch to async await #55

Merged
merged 14 commits into from
Sep 16, 2019

Conversation

alanshaw
Copy link
Member

@alanshaw alanshaw commented May 1, 2019

For implementation of this API, see libp2p/js-libp2p-mplex#94

New API

const muxer = new Muxer([options])

Create a new duplex stream that can be piped together with a connection in order to allow multiplexed communications.

e.g.

const Muxer = require('your-muxer-module')
const pipe = require('it-pipe')

// Create a duplex muxer
const muxer = new Muxer()

// Use the muxer in a pipeline
pipe(conn, muxer, conn) // conn is duplex connection to another peer

options is an optional Object that may have the following properties:

  • onStream - A function called when receiving a new stream from the remote. e.g.
    // Receive a new stream on the muxed connection
    const onStream = stream => {
      // Read from this stream and write back to it (echo server)
      pipe(
        stream,
        source => (async function * () {
          for await (const data of source) yield data
        })()
        stream
      )
    }
    const muxer = new Muxer({ onStream })
    // ...
    Note: The onStream function can be passed in place of the options object. i.e.
    new Muxer(stream => { /* ... */ })
  • signal - An AbortSignal which can be used to abort the muxer, including all of it's multiplexed connections. e.g.
    const controller = new AbortController()
    const muxer = new Muxer({ signal: controller.signal })
    
    pipe(conn, muxer, conn)
    
    controller.abort()
  • maxMsgSize - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB)

muxer.onStream

Use this property as an alternative to passing onStream as an option to the Muxer constructor.

const muxer = new Muxer()
// ...later
muxer.onStream = stream => { /* ... */ }

const stream = muxer.newStream([options])

Initiate a new stream with the remote. Returns a duplex stream.

e.g.

// Create a new stream on the muxed connection
const stream = muxer.newStream()

// Use this new stream like any other duplex stream:
pipe([1, 2, 3], stream, consume)

Notable changes

Buffer list

Data received by multiplexed streams are expected to be instances of BufferList not Buffer. This is to avoid unnecessary (and slow) buffer copies in some cases.

No events

Muxer implementations are not event emitters, nor are any of the streams they receive or create.

No distinction between listener/dialer

Just create a new Muxer. If you don't want to listen then don't pass an onStream. If you don't want to dial then don't call newStream.

Does not automatically pipe itself to anything

Muxer instances are duplex streams. If you want to use them, you now do the piping yourself. This is in contrast to js-libp2p-mplex or pull-mplex where you pass a Connection which is automatically piped together with the muxer for you.

This is a simplification to give the user of the muxer more power over what happens when errors occur in the stream. i.e. the user can catch an error and re-establish the connection for example.

Lazy by default

It's not documented anywhere, but the lazy option seen in js-libp2p-mplex or pull-mplex is no longer applicable. I do not know if it was ever used (it is false by default)!

Setting lazy: true in js-libp2p-mplex/pull-mplex simply means that the NEW_STREAM message is sent to the other side automatically before you start to send your data, not immediately when the stream is created. FYI, NEW_STREAM instructs the other side to open a new multiplexed stream so it can start receiving the data you want to send.

So, like this (in pull-mplex/js-libp2p-mplex):

const s = muxer.newStream({ lazy: false })
// NEW_STREAM is now sent to the other side
// Later...
pull(pull.values([1, 2, 3]), s, pull.onEnd(() => console.log('done')))

// VS

const s = muxer.newStream({ lazy: true })
// Later...
pull(pull.values([1, 2, 3]), s, pull.onEnd(() => console.log('done')))
// NEW_STREAM is now sent to the other side automatically before 1

Same code with the new muxer API:

const s = muxer.newStream()
// Later...
await pipe([1, 2, 3], s, consume)
// NEW_STREAM is now sent to the other side automatically before 1
console.log('done')

That is to say that in this new API the streams are "lazy" by default and only send the NEW_STREAM message to the other side when the stream is hooked up to a pipeline and data is about to be sent. So, if you don't want to open the stream on the other side, don't pipe any data into the stream.

There's no real reason to not be lazy. There's no use case where we will open a new muxed stream and start to receive data without sending something first. i.e. you would never do this:

const s = muxer.newStream()
await pipe(s, consume)

...and you shouldn't do this anyway because it'll leak resources. The other side will "close" the stream (the source) when it has sent everything and the stream will be left half open because nothing has closed the sink side of the duplex.

If you REALLY needed to do this you'd do the following:

const s = muxer.newStream()
await pipe([], s, consume)

// OR

const s = muxer.newStream()
await pipe(s, consume)
// Not ideal because although this would close the sink side it'll cause a RESET
// message to be sent to the other side.
s.abort()

@ghost ghost assigned alanshaw May 1, 2019
@ghost ghost added the in progress label May 1, 2019
Copy link
Contributor

@jacobheun jacobheun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things, but overall 👍 on the API. This looks great.

```js
new Mplex(stream => { /* ... */ })
```
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g.
* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of its multiplexed connections. e.g.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AbortSignal will now be supported we should add a test(s) for that here

README.md Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
})
closeAndWait(streams[0])

streams.slice(1).forEach(async stream => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only 1 of these is actually going to send anything right? As soon as the first iteration happens the onStream function will get called and the 2nd expect.mark() will happen, ending the test immediately.

Since the streams will likely open lazily, shouldn't we start the infinite streams for all of them, then await the close on the first stream? If we don't actually open the other streams first we may miss closures happening.

Copy link

@dirkmc dirkmc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so much better with async & iterables 👍

README.md Show resolved Hide resolved
Copy link
Member

@vasco-santos vasco-santos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @alanshaw ! I feel this is getting great

README.md Outdated Show resolved Hide resolved
Alan Shaw and others added 7 commits July 25, 2019 17:00
Notable API changes:

Data received by multiplexed streams are expected to be instances of [`BufferList`](https://www.npmjs.com/package/bl) not [`Buffer`](https://www.npmjs.com/package/buffer). This is to avoid unnecessary (and slow) buffer copies in some cases.

Muxer implementations are not an event emitters, nor are any of the streams they receive or create.

Just create a `new Muxer`. If you don't want to listen then don't pass an `onStream`. If you don't want to dial then don't call `newStream`.

Muxer instances are [duplex streams]((https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it)). If you want to use them, you now do the piping yourself. This is in contrast to `js-libp2p-mplex` or `pull-mplex` where you pass a `Connection` which is automatically piped together with the muxer for you.

This is a simplification to give the user of the muxer more power over what happens when errors occur in the stream. i.e. the user can catch an error and re-establish the connection for example.

It's not documented anywhere, but the `lazy` option seen in `js-libp2p-mplex` or `pull-mplex` is no longer applicable. I do not know if it was ever used (it is `false` by default)!

Setting `lazy: true` in `js-libp2p-mplex`/`pull-mplex` simply means that the `NEW_STREAM` message is sent to the other side automatically before you start to send your data, not immediately when the stream is created. FYI, `NEW_STREAM` instructs the _other side_ to open a new multiplexed stream so it can start receiving the data you want to send.

So, like this (in `pull-mplex`/`js-libp2p-mplex`):

```js
const s = muxer.newStream({ lazy: false })
// NEW_STREAM is now sent to the other side
// Later...
pipe(pull.values([1, 2, 3]), s, pull.onEnd(() => console.log('done')))

// VS

const s = muxer.newStream({ lazy: true })
// Later...
pull(pull.values([1, 2, 3]), s, pull.onEnd(() => console.log('done')))
// NEW_STREAM is now sent to the other side automatically before 1
```

Same code with the new muxer API:

```js
const s = muxer.newStream()
// Later...
await pipe([1, 2, 3], s, consume)
// NEW_STREAM is now sent to the other side automatically before 1
console.log('done')
```

That is to say that in this new API the streams are "lazy" by default and only send the `NEW_STREAM` message to the other side when the stream is hooked up to a pipeline and data is about to be sent. So, if you don't want to open the stream on the other side, don't pipe any data into the stream.

There's no real reason to _not_ be lazy. There's no use case where we will open a new muxed stream and start to receive data without sending something first. i.e. you would never do this:

```js
const s = muxer.newStream()
await pipe(s, consume)
```

...and you shouldn't do this anyway because it'll leak resources. The other side will "close" the stream (the source) when it has sent everything and the stream will be left half open because nothing has closed the sink side of the duplex.

If you REALLY needed to do this you'd do the following:

```js
const s = muxer.newStream()
await pipe([], s, consume)

// OR

const s = muxer.newStream()
await pipe(s, consume)
// Not ideal because although this would close the sink side it'll cause a RESET
// message to be sent to the other side.
s.abort()
```

License: MIT
Signed-off-by: Alan Shaw <[email protected]>
Co-Authored-By: alanshaw <[email protected]>
Co-Authored-By: alanshaw <[email protected]>
Co-Authored-By: alanshaw <[email protected]>
Co-Authored-By: alanshaw <[email protected]>
Co-Authored-By: Vasco Santos <[email protected]>
@jacobheun jacobheun force-pushed the refactor/async-iterators branch from d8cb660 to e5897e3 Compare July 25, 2019 15:00
@jacobheun
Copy link
Contributor

@alanshaw fyi, I rebased this with the latest master. Since this is dependent on libp2p-tcp being done and that is still pending, I may migrate this PR to the combined interface repo if we aren't able to resolve libp2p/js-libp2p-tcp#102 and this in the next couple days.

Alan Shaw and others added 5 commits August 15, 2019 12:22
License: MIT
Signed-off-by: Alan Shaw <[email protected]>
License: MIT
Signed-off-by: Alan Shaw <[email protected]>
License: MIT
Signed-off-by: Alan Shaw <[email protected]>
License: MIT
Signed-off-by: Alan Shaw <[email protected]>
@alanshaw alanshaw requested a review from jacobheun September 10, 2019 18:44
Copy link
Contributor

@jacobheun jacobheun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, we just need to finalize tcp and then bump the version here. I will submit a separate PR for the abort signal (#55 (comment)) to avoid adding more to this, as there is already a significant amount of changes.

Copy link
Member

@vasco-santos vasco-santos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!!! 🎉

@jacobheun
Copy link
Contributor

@jacobheun jacobheun merged commit dd837ba into master Sep 16, 2019
@jacobheun jacobheun deleted the refactor/async-iterators branch September 16, 2019 15:48
@alanshaw
Copy link
Member Author

🎉🥳

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants