From a00f1181acb439920f61fb398c49d1225c4cd1ba Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 1 Jun 2023 18:12:42 +0300 Subject: [PATCH] feat(transport): Add WebTransport for WASM environments Use `web-sys::WebTransport` and provide a `Transport`. --- Cargo.lock | 70 +++++- Cargo.toml | 2 + libp2p/Cargo.toml | 5 + libp2p/src/lib.rs | 5 + transports/websys-webtransport/CHANGELOG.md | 5 + transports/websys-webtransport/Cargo.toml | 48 +++++ .../src/cached_js_promise.rs | 48 +++++ .../websys-webtransport/src/connection.rs | 184 ++++++++++++++++ transports/websys-webtransport/src/error.rs | 41 ++++ transports/websys-webtransport/src/lib.rs | 14 ++ transports/websys-webtransport/src/stream.rs | 190 ++++++++++++++++ .../websys-webtransport/src/transport.rs | 99 +++++++++ transports/websys-webtransport/src/utils.rs | 202 ++++++++++++++++++ 13 files changed, 909 insertions(+), 4 deletions(-) create mode 100644 transports/websys-webtransport/CHANGELOG.md create mode 100644 transports/websys-webtransport/Cargo.toml create mode 100644 transports/websys-webtransport/src/cached_js_promise.rs create mode 100644 transports/websys-webtransport/src/connection.rs create mode 100644 transports/websys-webtransport/src/error.rs create mode 100644 transports/websys-webtransport/src/lib.rs create mode 100644 transports/websys-webtransport/src/stream.rs create mode 100644 transports/websys-webtransport/src/transport.rs create mode 100644 transports/websys-webtransport/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index c7e766309e1e..e1a977c5539d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -880,6 +880,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -2415,6 +2425,7 @@ dependencies = [ "libp2p-wasm-ext", "libp2p-webrtc", "libp2p-websocket", + "libp2p-websys-webtransport", "libp2p-yamux", "multiaddr", "pin-project", @@ -3196,6 +3207,26 @@ dependencies = [ "webpki-roots 0.23.0", ] +[[package]] +name = "libp2p-websys-webtransport" +version = "0.43.0" +dependencies = [ + "futures", + "getrandom 0.2.9", + "js-sys", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "log", + "multibase", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test", + "web-sys", +] + [[package]] name = "libp2p-yamux" version = "0.44.0" @@ -3406,8 +3437,6 @@ dependencies = [ [[package]] name = "multiaddr" version = "0.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b36f567c7099511fa8612bbbb52dda2419ce0bdbacf31714e3a5ffdb766d3bd" dependencies = [ "arrayref", "byteorder", @@ -4542,6 +4571,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -4625,6 +4660,9 @@ name = "send_wrapper" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +dependencies = [ + "futures-core", +] [[package]] name = "serde" @@ -5502,11 +5540,35 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +[[package]] +name = "wasm-bindgen-test" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e636f3a428ff62b3742ebc3c70e254dfe12b8c2b469d688ea59cdd4abcf502" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f18c1fad2f7c4958e7bcce014fa212f59a65d5e3721d0f77e6c0b27ede936ba3" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "web-sys" -version = "0.3.60" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 8400238368de..06d7e9917364 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "transports/wasm-ext", "transports/webrtc", "transports/websocket", + "transports/websys-webtransport", ] resolver = "2" @@ -94,6 +95,7 @@ libp2p-uds = { version = "0.39.0", path = "transports/uds" } libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" } libp2p-webrtc = { version = "0.5.0-alpha", path = "transports/webrtc" } libp2p-websocket = { version = "0.42.0", path = "transports/websocket" } +libp2p-websys-webtransport = { version = "0.43.0", path = "transports/websys-webtransport" } libp2p-yamux = { version = "0.44.0", path = "muxers/yamux" } multistream-select = { version = "0.13.0", path = "misc/multistream-select" } quick-protobuf-codec = { version = "0.2.0", path = "misc/quick-protobuf-codec" } diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 7cedac929c6c..17422a9047a7 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -46,6 +46,7 @@ full = [ "wasm-ext-websocket", "webrtc", "websocket", + "websys-webtransport", "yamux", ] @@ -83,6 +84,7 @@ wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"] websocket = ["dep:libp2p-websocket"] +websys-webtransport = ["dep:libp2p-websys-webtransport"] yamux = ["dep:libp2p-yamux"] [dependencies] @@ -128,6 +130,9 @@ libp2p-uds = { workspace = true, optional = true } libp2p-webrtc = { workspace = true, optional = true } libp2p-websocket = { workspace = true, optional = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +libp2p-websys-webtransport = { workspace = true, optional = true } + [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } async-trait = "0.1" diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 2a359509dc12..33697517853a 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -144,6 +144,11 @@ pub mod webrtc { #[cfg(not(target_arch = "wasm32"))] #[doc(inline)] pub use libp2p_websocket as websocket; +#[cfg(feature = "websys-webtransport")] +#[cfg(target_arch = "wasm32")] +#[cfg_attr(docsrs, doc(cfg(feature = "websys-webtransport")))] +#[doc(inline)] +pub use libp2p_websys_webtransport as websys_webtransport; #[cfg(feature = "yamux")] #[doc(inline)] pub use libp2p_yamux as yamux; diff --git a/transports/websys-webtransport/CHANGELOG.md b/transports/websys-webtransport/CHANGELOG.md new file mode 100644 index 000000000000..8b1ebc98140e --- /dev/null +++ b/transports/websys-webtransport/CHANGELOG.md @@ -0,0 +1,5 @@ +## 0.43.0 - unreleased + +* Initial implementation of WebTranport trasnport that uses web-sys. [PR 4015] + +[PR 4015]: https://github.com/libp2p/rust-libp2p/pull/4015 diff --git a/transports/websys-webtransport/Cargo.toml b/transports/websys-webtransport/Cargo.toml new file mode 100644 index 000000000000..5f43ea172d25 --- /dev/null +++ b/transports/websys-webtransport/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "libp2p-websys-webtransport" +edition = "2021" +rust-version = { workspace = true } +description = "WebTransport for libp2p under WASM environment" +version = "0.43.0" +authors = [ + "Yiannis Marangos ", + "oblique ", +] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.3.28" +js-sys = "0.3.63" +libp2p-core = { workspace = true } +libp2p-identity = { workspace = true } +libp2p-noise = { workspace = true } +log = "0.4.18" +send_wrapper = { version = "0.6.0", features = ["futures"] } +thiserror = "1.0.4" +wasm-bindgen = "0.2.86" +wasm-bindgen-futures = "0.4.36" +web-sys = { version = "0.3.63", features = [ + "ReadableStreamDefaultReader", + "WebTransport", + "WebTransportBidirectionalStream", + "WebTransportHash", + "WebTransportOptions", + "WebTransportReceiveStream", + "WebTransportSendStream", + "WritableStreamDefaultWriter", +] } + +[dev-dependencies] +getrandom = { version = "0.2.9", features = ["js"] } +multibase = "0.9.1" +wasm-bindgen-test = "0.3.36" + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs", "--cfg", "web_sys_unstable_apis"] +rustc-args = ["--cfg", "docsrs", "--cfg", "web_sys_unstable_apis"] diff --git a/transports/websys-webtransport/src/cached_js_promise.rs b/transports/websys-webtransport/src/cached_js_promise.rs new file mode 100644 index 000000000000..137e830ec2e9 --- /dev/null +++ b/transports/websys-webtransport/src/cached_js_promise.rs @@ -0,0 +1,48 @@ +use futures::FutureExt; +use js_sys::Promise; +use send_wrapper::SendWrapper; +use std::task::{ready, Context, Poll}; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; + +pub struct CachedJsPromise { + cached: Option>, +} + +impl CachedJsPromise { + pub fn new() -> Self { + CachedJsPromise { cached: None } + } + + pub fn maybe_init_and_poll( + &mut self, + cx: &mut Context, + init: F, + ) -> Poll> + where + F: FnOnce() -> Promise, + { + if self.cached.is_none() { + self.cached = Some(SendWrapper::new(JsFuture::from(init()))); + } + + self.poll(cx) + } + + pub fn poll(&mut self, cx: &mut Context) -> Poll> { + let val = ready!(self + .cached + .as_mut() + .expect("CachedJsPromise not initialized") + .poll_unpin(cx)); + + // Future finished, drop it + self.cached.take(); + + Poll::Ready(val) + } + + pub fn is_active(&self) -> bool { + self.cached.is_some() + } +} diff --git a/transports/websys-webtransport/src/connection.rs b/transports/websys-webtransport/src/connection.rs new file mode 100644 index 000000000000..6db8e5c63dec --- /dev/null +++ b/transports/websys-webtransport/src/connection.rs @@ -0,0 +1,184 @@ +use js_sys::{Array, Uint8Array}; +use libp2p_core::multihash::Multihash; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent, StreamMuxerExt}; +use libp2p_core::{OutboundUpgrade, UpgradeInfo}; +use libp2p_identity::{Keypair, PeerId}; +use send_wrapper::SendWrapper; +use std::collections::HashSet; +use std::future::poll_fn; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + ReadableStreamDefaultReader, WebTransport, WebTransportBidirectionalStream, WebTransportHash, + WebTransportOptions, +}; + +use crate::cached_js_promise::CachedJsPromise; +use crate::utils::{parse_reader_response, to_js_type, WebTransportEndpoint}; +use crate::{Error, Stream}; + +pub struct Connection { + session: SendWrapper, + create_stream_promise: CachedJsPromise, + incoming_stream_promise: CachedJsPromise, + incoming_streams_reader: SendWrapper, + closed: bool, +} + +impl Connection { + pub(crate) fn new(endpoint: &WebTransportEndpoint) -> Result { + let url = format!( + "https://{}:{}/.well-known/libp2p-webtransport?type=noise", + endpoint.host, endpoint.port + ); + + let session = if endpoint.certhashes.is_empty() { + WebTransport::new(&url).map_err(Error::from_js_value)? + } else { + let opts = build_webtransport_opts(&endpoint.certhashes); + WebTransport::new_with_options(&url, &opts).map_err(Error::from_js_value)? + }; + + let stream = session.incoming_bidirectional_streams(); + let incoming_streams_reader = + to_js_type::(stream.get_reader())?; + + Ok(Connection { + session: SendWrapper::new(session), + create_stream_promise: CachedJsPromise::new(), + incoming_stream_promise: CachedJsPromise::new(), + incoming_streams_reader: SendWrapper::new(incoming_streams_reader), + closed: false, + }) + } + + pub(crate) async fn authenticate( + &mut self, + keypair: &Keypair, + certhashes: HashSet, + ) -> Result { + self.ready().await?; + let stream = self.create_stream().await?; + + let noise = libp2p_noise::Config::new(keypair)?.with_webtransport_certhashes(certhashes); + + // We do not use `upgrade::apply_outbound` function because it uses + // `multistream_select` protocol, which is not used by WebTransport spec. + let info = noise.protocol_info().next().unwrap(); + let (peer_id, _io) = noise.upgrade_outbound(stream, info).await?; + + Ok(peer_id) + } + + async fn ready(&mut self) -> Result<(), Error> { + let fut = SendWrapper::new(JsFuture::from(self.session.ready())); + fut.await.map_err(Error::from_js_value)?; + Ok(()) + } + + async fn create_stream(&mut self) -> Result { + poll_fn(|cx| self.poll_outbound_unpin(cx)).await + } + + fn poll_create_bidirectional_stream( + &mut self, + cx: &mut Context, + ) -> Poll> { + // Create bidirectional stream + let val = ready!(self + .create_stream_promise + .maybe_init_and_poll(cx, || self.session.create_bidirectional_stream())) + .map_err(Error::from_js_value)?; + + let bidi_stream = to_js_type::(val)?; + let stream = Stream::new(bidi_stream)?; + + Poll::Ready(Ok(stream)) + } + + fn poll_incoming_bidirectional_streams( + &mut self, + cx: &mut Context, + ) -> Poll> { + // Read the next incoming stream from the JS channel + let val = ready!(self + .incoming_stream_promise + .maybe_init_and_poll(cx, || self.incoming_streams_reader.read())) + .map_err(Error::from_js_value)?; + + let val = parse_reader_response(&val) + .map_err(Error::from_js_value)? + .ok_or_else(|| Error::JsError("incoming_bidirectional_streams closed".to_string()))?; + + let bidi_stream = to_js_type::(val)?; + let stream = Stream::new(bidi_stream)?; + + Poll::Ready(Ok(stream)) + } + + fn close_session(&mut self) { + if !self.closed { + self.session.close(); + self.closed = true; + } + } +} + +impl StreamMuxer for Connection { + type Substream = Stream; + type Error = Error; + + fn poll_inbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_incoming_bidirectional_streams(cx) + } + + fn poll_outbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.poll_create_bidirectional_stream(cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + self.close_session(); + Poll::Ready(Ok(())) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.close_session(); + } +} + +fn build_webtransport_opts(certhashes: &HashSet) -> WebTransportOptions { + let mut opts = WebTransportOptions::new(); + let hashes = Array::new(); + + for hash in certhashes { + let digest = Uint8Array::from(hash.digest()); + + let mut jshash = WebTransportHash::new(); + jshash.algorithm("sha-256").value(&digest); + + hashes.push(&jshash); + } + + opts.server_certificate_hashes(&hashes); + + opts +} diff --git a/transports/websys-webtransport/src/error.rs b/transports/websys-webtransport/src/error.rs new file mode 100644 index 000000000000..a6898337aa0b --- /dev/null +++ b/transports/websys-webtransport/src/error.rs @@ -0,0 +1,41 @@ +use libp2p_core::transport::TransportError; +use wasm_bindgen::{JsCast, JsValue}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Listening on WebTransport is not possible from within a browser")] + ListeningNotPossible, + + #[error("Invalid multiaddr: {0}")] + InvalidMultiaddr(&'static str), + + #[error("Noise authentication failed")] + Noise(#[from] libp2p_noise::Error), + + #[error("JavaScript error: {0}")] + JsError(String), + + #[error("JavaScript typecasting failed")] + JsCastFailed, +} + +impl Error { + pub(crate) fn from_js_value(value: JsValue) -> Self { + let s = if value.is_instance_of::() { + js_sys::Error::from(value) + .to_string() + .as_string() + .unwrap_or_else(|| "Unknown error".to_string()) + } else { + "Unknown error".to_string() + }; + + Error::JsError(s) + } +} + +impl From for TransportError { + fn from(value: Error) -> Self { + TransportError::Other(value) + } +} diff --git a/transports/websys-webtransport/src/lib.rs b/transports/websys-webtransport/src/lib.rs new file mode 100644 index 000000000000..5eee9a911e07 --- /dev/null +++ b/transports/websys-webtransport/src/lib.rs @@ -0,0 +1,14 @@ +mod cached_js_promise; +mod connection; +mod error; +mod stream; +mod transport; +mod utils; + +pub use self::connection::Connection; +pub use self::error::Error; +pub use self::stream::Stream; +pub use self::transport::{Config, Transport}; + +#[cfg(test)] +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/transports/websys-webtransport/src/stream.rs b/transports/websys-webtransport/src/stream.rs new file mode 100644 index 000000000000..dae9dbf58ee1 --- /dev/null +++ b/transports/websys-webtransport/src/stream.rs @@ -0,0 +1,190 @@ +use futures::{AsyncRead, AsyncWrite}; +use js_sys::Uint8Array; +use send_wrapper::SendWrapper; +use std::io; +use std::pin::Pin; +use std::task::ready; +use std::task::{Context, Poll}; +use web_sys::{ + ReadableStreamDefaultReader, WebTransportBidirectionalStream, WritableStreamDefaultWriter, +}; + +use crate::cached_js_promise::CachedJsPromise; +use crate::utils::{detach_promise, parse_reader_response, to_io_error, to_js_type}; +use crate::Error; + +pub struct Stream { + reader: SendWrapper, + reader_read_promise: CachedJsPromise, + read_leftovers: Option>, + writer: SendWrapper, + writer_state: StreamState, + writer_ready_promise: CachedJsPromise, + writer_closed_promise: CachedJsPromise, +} + +#[derive(PartialEq, Eq)] +enum StreamState { + Open, + Closing, + Closed, +} + +impl Stream { + pub(crate) fn new(bidi_stream: WebTransportBidirectionalStream) -> Result { + let recv_stream = bidi_stream.readable(); + let send_stream = bidi_stream.writable(); + + let reader = to_js_type::(recv_stream.get_reader())?; + let writer = send_stream.get_writer().map_err(Error::from_js_value)?; + + Ok(Stream { + reader: SendWrapper::new(reader), + reader_read_promise: CachedJsPromise::new(), + read_leftovers: None, + writer: SendWrapper::new(writer), + writer_state: StreamState::Open, + writer_ready_promise: CachedJsPromise::new(), + writer_closed_promise: CachedJsPromise::new(), + }) + } + + fn poll_writer_ready(&mut self, cx: &mut Context) -> Poll> { + if self.writer_state != StreamState::Open { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + + let desired_size = self + .writer + .desired_size() + .map_err(to_io_error)? + .map(|n| n.trunc() as i64) + .unwrap_or(0); + + // We need to poll if the queue is full or if the promise was already activated + if desired_size <= 0 || self.writer_ready_promise.is_active() { + ready!(self + .writer_ready_promise + .maybe_init_and_poll(cx, || self.writer.ready())) + .map_err(to_io_error)?; + } + + Poll::Ready(Ok(())) + } + + fn poll_writer_close(&mut self, cx: &mut Context) -> Poll> { + match self.writer_state { + StreamState::Open => { + self.writer_state = StreamState::Closing; + + // Initiate close + detach_promise(self.writer.close()); + + // Assume closed on error + let _ = ready!(self + .writer_closed_promise + .maybe_init_and_poll(cx, || self.writer.closed())); + + self.writer_state = StreamState::Closed; + } + StreamState::Closing => { + // Assume closed on error + let _ = ready!(self.writer_closed_promise.poll(cx)); + self.writer_state = StreamState::Closed; + } + StreamState::Closed => {} + } + + Poll::Ready(Ok(())) + } + + fn poll_reader_read(&mut self, cx: &mut Context) -> Poll>> { + let val = ready!(self + .reader_read_promise + .maybe_init_and_poll(cx, || self.reader.read())) + .map_err(to_io_error)?; + + let val = parse_reader_response(&val) + .map_err(to_io_error)? + .map(Uint8Array::from); + + Poll::Ready(Ok(val)) + } +} + +impl Drop for Stream { + fn drop(&mut self) { + // On drop we abort any writes and cancel any ongoing reads + detach_promise(self.writer.abort()); + detach_promise(self.reader.cancel()); + } +} + +impl AsyncRead for Stream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + + // If we have leftovers from a previous read, then use them. + // Otherwise read new data. + let data = match this.read_leftovers.take() { + Some(data) => data.take(), + None => { + match ready!(this.poll_reader_read(cx))? { + Some(data) => data, + // EOF + None => return Poll::Ready(Ok(0)), + } + } + }; + + if data.byte_length() == 0 { + return Poll::Ready(Ok(0)); + } + + let out_len = data.byte_length().min(buf.len() as u32); + data.slice(0, out_len).copy_to(&mut buf[..out_len as usize]); + + let leftovers = data.slice(out_len, data.byte_length()); + + if leftovers.byte_length() > 0 { + this.read_leftovers = Some(SendWrapper::new(leftovers)); + } + + Poll::Ready(Ok(out_len as usize)) + } +} + +impl AsyncWrite for Stream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + + ready!(this.poll_writer_ready(cx))?; + + let data = Uint8Array::new_with_length(buf.len() as u32); + data.copy_from(buf); + + detach_promise(this.writer.write_with_chunk(&data)); + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + + if this.writer_state == StreamState::Open { + // Writer has queue size of 1, so as soon it is ready, it means the + // messages were flushed. + this.poll_writer_ready(cx) + } else { + Poll::Ready(Ok(())) + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.get_mut().poll_writer_close(cx) + } +} diff --git a/transports/websys-webtransport/src/transport.rs b/transports/websys-webtransport/src/transport.rs new file mode 100644 index 000000000000..8111e35d94ca --- /dev/null +++ b/transports/websys-webtransport/src/transport.rs @@ -0,0 +1,99 @@ +use futures::future::FutureExt; +use libp2p_core::multiaddr::Multiaddr; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::{ + Boxed, ListenerId, Transport as TransportTrait, TransportError, TransportEvent, +}; +use libp2p_identity::{Keypair, PeerId}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::utils::parse_multiaddr; +use crate::Connection; +use crate::Error; + +pub struct Config { + keypair: Keypair, +} + +pub struct Transport { + config: Config, +} + +impl Config { + pub fn new(keypair: &Keypair) -> Self { + Config { + keypair: keypair.to_owned(), + } + } +} + +impl Transport { + pub fn new(config: Config) -> Transport { + Transport { config } + } +} + +impl Transport { + pub fn boxed(self) -> Boxed<(PeerId, StreamMuxerBox)> { + self.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer))) + .boxed() + } +} + +impl TransportTrait for Transport { + type Output = (PeerId, Connection); + type Error = Error; + type ListenerUpgrade = Pin> + Send>>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + _id: ListenerId, + _addr: Multiaddr, + ) -> Result<(), TransportError> { + Err(Error::ListeningNotPossible.into()) + } + + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let endpoint = parse_multiaddr(&addr).map_err(|e| match e { + e @ Error::InvalidMultiaddr(_) => { + log::error!("{}", e); + TransportError::MultiaddrNotSupported(addr) + } + e => e.into(), + })?; + + let mut session = Connection::new(&endpoint)?; + let keypair = self.config.keypair.clone(); + + Ok(async move { + let peer_id = session.authenticate(&keypair, endpoint.certhashes).await?; + Ok((peer_id, session)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } +} diff --git a/transports/websys-webtransport/src/utils.rs b/transports/websys-webtransport/src/utils.rs new file mode 100644 index 000000000000..870d355b1d8e --- /dev/null +++ b/transports/websys-webtransport/src/utils.rs @@ -0,0 +1,202 @@ +use js_sys::{Promise, Reflect}; +use libp2p_core::multiaddr::{Multiaddr, Protocol}; +use libp2p_core::multihash::Multihash; +use libp2p_identity::PeerId; +use send_wrapper::SendWrapper; +use std::collections::HashSet; +use std::io; +use wasm_bindgen::{JsCast, JsValue}; + +use crate::Error; + +pub struct WebTransportEndpoint { + pub host: String, + pub port: u16, + pub certhashes: HashSet, + pub remote_peer: Option, +} + +pub fn parse_multiaddr(addr: &Multiaddr) -> Result { + let mut host = None; + let mut port = None; + let mut found_quic = false; + let mut found_webtransport = false; + let mut certhashes = HashSet::new(); + let mut remote_peer = None; + + for proto in addr.iter() { + // TODO: Add ip6, dns4, dns6, dnsaddr + match proto { + Protocol::Ip4(addr) => { + if host.is_some() { + return Err(Error::InvalidMultiaddr("More than one host definitions")); + } + + host = Some(addr.to_string()); + } + Protocol::Udp(p) => { + if port.is_some() { + return Err(Error::InvalidMultiaddr("More than one port definitions")); + } + + port = Some(p); + } + Protocol::Quic | Protocol::QuicV1 => { + if host.is_none() || port.is_none() { + return Err(Error::InvalidMultiaddr( + "No host and port definition before /quic/webtransport", + )); + } + + found_quic = true; + } + Protocol::WebTransport => { + if !found_quic { + return Err(Error::InvalidMultiaddr( + "/quic is not found before /webtransport", + )); + } + + found_webtransport = true; + } + Protocol::Certhash(hash) => { + if !found_webtransport { + return Err(Error::InvalidMultiaddr( + "/certhashes must be after /quic/found_webtransport", + )); + } + + certhashes.insert(hash); + } + Protocol::P2p(peer) => { + if remote_peer.is_some() { + return Err(Error::InvalidMultiaddr("More than one peer definitions")); + } + + let peer = PeerId::from_multihash(peer) + .map_err(|_| Error::InvalidMultiaddr("Invalid peer ID"))?; + + remote_peer = Some(peer); + } + _ => {} + } + } + + if !found_quic || !found_webtransport { + return Err(Error::InvalidMultiaddr( + "Not a /quic/webtransport multiaddr", + )); + } + + let host = host.ok_or_else(|| Error::InvalidMultiaddr("Host is not defined"))?; + let port = port.ok_or_else(|| Error::InvalidMultiaddr("Port is not defined"))?; + + Ok(WebTransportEndpoint { + host, + port, + certhashes, + remote_peer, + }) +} + +pub fn detach_promise(promise: Promise) { + type Closure = wasm_bindgen::closure::Closure; + static mut DO_NOTHING: Option> = None; + + // Allocate Closure only once and reuse it + let do_nothing = unsafe { + if DO_NOTHING.is_none() { + let cb = Closure::new(|_| {}); + DO_NOTHING = Some(SendWrapper::new(cb)); + } + + DO_NOTHING.as_deref().unwrap() + }; + + // Avoid having "floating" promises and ignore any errors. + // After `catch` we are allowed to drop the promise without. + // + // Ref: https://github.com/typescript-eslint/typescript-eslint/blob/391a6702c0a9b5b3874a7a27047f2a721f090fb6/packages/eslint-plugin/docs/rules/no-floating-promises.md + let _ = promise.catch(do_nothing); +} + +pub fn to_js_type(value: impl Into) -> Result +where + T: JsCast + From, +{ + let value = value.into(); + + if value.is_instance_of::() { + Err(Error::from_js_value(value)) + } else if value.is_instance_of::() { + Ok(T::from(value)) + } else { + Err(Error::JsCastFailed) + } +} + +pub fn to_io_error(value: JsValue) -> io::Error { + io::Error::new(io::ErrorKind::Other, Error::from_js_value(value)) +} + +pub fn parse_reader_response(resp: &JsValue) -> Result, JsValue> { + let value = Reflect::get(resp, &JsValue::from_str("value"))?; + let done = Reflect::get(resp, &JsValue::from_str("done"))? + .as_bool() + .unwrap_or_default(); + + if value.is_undefined() || done { + Ok(None) + } else { + Ok(Some(value)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use wasm_bindgen_test::wasm_bindgen_test; + + fn multihash_from_str(s: &str) -> Multihash { + let (_base, bytes) = multibase::decode(s).unwrap(); + Multihash::from_bytes(&bytes).unwrap() + } + + #[wasm_bindgen_test] + fn valid_webtransport_multiaddr() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/udp/44874/quic-v1/webtransport/certhash/uEiCaDd1Ca1A8IVJ3hsIxIyi11cwxaDKqzVrBkGJbKZU5ng/certhash/uEiDv-VGW8oXxui_G_Kqp-87YjvET-Hr2qYAMYPePJDcsjQ/p2p/12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap(); + let endpoint = parse_multiaddr(&addr).unwrap(); + + assert_eq!(endpoint.host, "127.0.0.1"); + assert_eq!(endpoint.port, 44874); + assert_eq!(endpoint.certhashes.len(), 2); + + assert!(endpoint.certhashes.contains(&multihash_from_str( + "uEiCaDd1Ca1A8IVJ3hsIxIyi11cwxaDKqzVrBkGJbKZU5ng" + ))); + + assert!(endpoint.certhashes.contains(&multihash_from_str( + "uEiDv-VGW8oXxui_G_Kqp-87YjvET-Hr2qYAMYPePJDcsjQ" + ))); + + assert_eq!( + endpoint.remote_peer.unwrap(), + PeerId::from_str("12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap() + ); + } + + #[wasm_bindgen_test] + fn valid_webtransport_multiaddr_without_certhashes() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/udp/44874/quic-v1/webtransport/p2p/12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap(); + let endpoint = parse_multiaddr(&addr).unwrap(); + + assert_eq!(endpoint.host, "127.0.0.1"); + assert_eq!(endpoint.port, 44874); + assert_eq!(endpoint.certhashes.len(), 0); + assert_eq!( + endpoint.remote_peer.unwrap(), + PeerId::from_str("12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap() + ); + } +}