diff --git a/Cargo.lock b/Cargo.lock index c39664be2584..4abd1345be73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,6 +874,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -1757,7 +1767,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" dependencies = [ "gloo-timers", - "send_wrapper", + "send_wrapper 0.4.0", ] [[package]] @@ -3127,11 +3137,18 @@ name = "libp2p-wasm-ext" version = "0.40.0" dependencies = [ "futures", + "getrandom 0.2.9", "js-sys", "libp2p-core", - "parity-send-wrapper", + "libp2p-identity", + "libp2p-noise", + "multibase", + "send_wrapper 0.6.0", + "thiserror", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-bindgen-test", + "web-sys", ] [[package]] @@ -3405,8 +3422,6 @@ dependencies = [ [[package]] name = "multiaddr" version = "0.17.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b36f567c7099511fa8612bbbb52dda2419ce0bdbacf31714e3a5ffdb766d3bd" dependencies = [ "arrayref", "byteorder", @@ -3705,12 +3720,6 @@ dependencies = [ "libm", ] -[[package]] -name = "parity-send-wrapper" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa9777aa91b8ad9dd5aaa04a9b6bcb02c7f1deb952fca5a66034d5e63afc5c6f" - [[package]] name = "parking" version = "2.0.0" @@ -4547,6 +4556,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -4625,6 +4640,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +dependencies = [ + "futures-core", +] + [[package]] name = "serde" version = "1.0.163" @@ -5511,11 +5535,35 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +[[package]] +name = "wasm-bindgen-test" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e636f3a428ff62b3742ebc3c70e254dfe12b8c2b469d688ea59cdd4abcf502" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f18c1fad2f7c4958e7bcce014fa212f59a65d5e3721d0f77e6c0b27ede936ba3" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "web-sys" -version = "0.3.60" +version = "0.3.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 7cedac929c6c..a3ef1e74ee6c 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -44,6 +44,7 @@ full = [ "wasm-bindgen", "wasm-ext", "wasm-ext-websocket", + "wasm-ext-webtransport", "webrtc", "websocket", "yamux", @@ -81,6 +82,7 @@ uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen", "libp2p-gossipsub?/wasm-bindgen"] wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] +wasm-ext-webtransport = ["wasm-ext", "libp2p-wasm-ext?/webtransport"] webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"] websocket = ["dep:libp2p-websocket"] yamux = ["dep:libp2p-yamux"] diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 54ada77b68bc..d9c6f88b6853 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -11,17 +11,40 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +thiserror = { version = "1.0.4", optional = true } futures = "0.3.28" js-sys = "0.3.63" +web-sys = { version = "0.3.63", optional = true } libp2p-core = { workspace = true } -parity-send-wrapper = "0.1.0" +libp2p-identity = { workspace = true, optional = true } +libp2p-noise = { workspace = true, optional = true } +send_wrapper = { version = "0.6.0", features = ["futures"] } wasm-bindgen = "0.2.86" wasm-bindgen-futures = "0.4.36" [features] websocket = [] +webtransport = [ + "thiserror", + "libp2p-identity", + "libp2p-noise", + "dep:web-sys", + "web-sys?/ReadableStreamDefaultReader", + "web-sys?/WritableStreamDefaultWriter", + "web-sys?/WebTransport", + "web-sys?/WebTransportHash", + "web-sys?/WebTransportOptions", + "web-sys?/WebTransportBidirectionalStream", + "web-sys?/WebTransportReceiveStream", + "web-sys?/WebTransportSendStream", +] -# Passing arguments to the docsrs builder in order to properly document cfg's. +[dev-dependencies] +multibase = "0.9.1" +wasm-bindgen-test = "0.3.36" +getrandom = { version = "0.2.9", features = ["js"] } + +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 91236ca8758f..2cece6eeac20 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -40,11 +40,17 @@ use libp2p_core::{ transport::{ListenerId, TransportError, TransportEvent}, Multiaddr, Transport, }; -use parity_send_wrapper::SendWrapper; +use send_wrapper::SendWrapper; use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen_futures::JsFuture; +#[cfg(feature = "webtransport")] +pub mod webtransport; + +#[cfg(test)] +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + /// Contains the definition that one must match on the JavaScript side. pub mod ffi { use wasm_bindgen::prelude::*; diff --git a/transports/wasm-ext/src/webtransport.rs b/transports/wasm-ext/src/webtransport.rs new file mode 100644 index 000000000000..b900a13f43c3 --- /dev/null +++ b/transports/wasm-ext/src/webtransport.rs @@ -0,0 +1,11 @@ +mod cached_js_promise; +mod connection; +mod error; +mod substream; +mod transport; +mod utils; + +pub use self::connection::Connection; +pub use self::error::Error; +pub use self::substream::Substream; +pub use self::transport::{Config, Transport}; diff --git a/transports/wasm-ext/src/webtransport/cached_js_promise.rs b/transports/wasm-ext/src/webtransport/cached_js_promise.rs new file mode 100644 index 000000000000..137e830ec2e9 --- /dev/null +++ b/transports/wasm-ext/src/webtransport/cached_js_promise.rs @@ -0,0 +1,48 @@ +use futures::FutureExt; +use js_sys::Promise; +use send_wrapper::SendWrapper; +use std::task::{ready, Context, Poll}; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; + +pub struct CachedJsPromise { + cached: Option>, +} + +impl CachedJsPromise { + pub fn new() -> Self { + CachedJsPromise { cached: None } + } + + pub fn maybe_init_and_poll( + &mut self, + cx: &mut Context, + init: F, + ) -> Poll> + where + F: FnOnce() -> Promise, + { + if self.cached.is_none() { + self.cached = Some(SendWrapper::new(JsFuture::from(init()))); + } + + self.poll(cx) + } + + pub fn poll(&mut self, cx: &mut Context) -> Poll> { + let val = ready!(self + .cached + .as_mut() + .expect("CachedJsPromise not initialized") + .poll_unpin(cx)); + + // Future finished, drop it + self.cached.take(); + + Poll::Ready(val) + } + + pub fn is_active(&self) -> bool { + self.cached.is_some() + } +} diff --git a/transports/wasm-ext/src/webtransport/connection.rs b/transports/wasm-ext/src/webtransport/connection.rs new file mode 100644 index 000000000000..4540d51c25c5 --- /dev/null +++ b/transports/wasm-ext/src/webtransport/connection.rs @@ -0,0 +1,177 @@ +use js_sys::{Array, Uint8Array}; +use libp2p_core::multihash::Multihash; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent, StreamMuxerExt}; +use libp2p_core::{OutboundUpgrade, UpgradeInfo}; +use libp2p_identity::{Keypair, PeerId}; +use send_wrapper::SendWrapper; +use std::future::poll_fn; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + ReadableStreamDefaultReader, WebTransport, WebTransportBidirectionalStream, WebTransportHash, + WebTransportOptions, +}; + +use crate::webtransport::cached_js_promise::CachedJsPromise; +use crate::webtransport::utils::{parse_reader_response, to_js_type, WebTransportEndpoint}; +use crate::webtransport::{Error, Substream}; + +pub struct Connection { + session: SendWrapper, + create_substream_promise: CachedJsPromise, + incoming_substream_promise: CachedJsPromise, + incoming_substreams_reader: SendWrapper, + closed: bool, +} + +impl Connection { + pub(crate) fn new(endpoint: &WebTransportEndpoint) -> Result { + let url = format!( + "https://{}:{}/.well-known/libp2p-webtransport?type=noise", + endpoint.host, endpoint.port + ); + + let session = if endpoint.certhashes.is_empty() { + WebTransport::new(&url).map_err(Error::from)? + } else { + let opts = build_webtransport_opts(&endpoint.certhashes); + WebTransport::new_with_options(&url, &opts).map_err(Error::from)? + }; + + let stream = session.incoming_bidirectional_streams(); + let incoming_substreams_reader = + to_js_type::(stream.get_reader())?; + + Ok(Connection { + session: SendWrapper::new(session), + create_substream_promise: CachedJsPromise::new(), + incoming_substream_promise: CachedJsPromise::new(), + incoming_substreams_reader: SendWrapper::new(incoming_substreams_reader), + closed: false, + }) + } + + pub(crate) async fn authenticate( + &mut self, + keypair: &Keypair, + certhashes: &[Multihash], + ) -> Result { + self.ready().await?; + let stream = self.create_substream().await?; + + let noise = + libp2p_noise::Config::new(keypair)?.with_webtransport_certhashes(certhashes.to_vec()); + + // We do not use `upgrade::apply_outbound` function because it uses + // `multistream_select` protocol, which is not used by WebTransport spec. + let info = noise.protocol_info().next().unwrap(); + let (peer_id, _io) = noise.upgrade_outbound(stream, info).await?; + + Ok(peer_id) + } + + async fn ready(&mut self) -> Result<(), Error> { + let fut = SendWrapper::new(JsFuture::from(self.session.ready())); + fut.await?; + Ok(()) + } + + async fn create_substream(&mut self) -> Result { + poll_fn(|cx| self.poll_outbound_unpin(cx)).await + } + + fn poll_create_bidirectional_stream( + &mut self, + cx: &mut Context, + ) -> Poll> { + self.create_substream_promise + .maybe_init_and_poll(cx, || self.session.create_bidirectional_stream()) + } + + fn poll_incoming_bidirectional_streams( + &mut self, + cx: &mut Context, + ) -> Poll> { + self.incoming_substream_promise + .maybe_init_and_poll(cx, || self.incoming_substreams_reader.read()) + } + + fn close(&mut self) { + if !self.closed { + self.session.close(); + self.closed = true; + } + } +} + +impl StreamMuxer for Connection { + type Substream = Substream; + type Error = Error; + + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let val = ready!(this.poll_incoming_bidirectional_streams(cx))?; + let val = parse_reader_response(&val)? + .ok_or_else(|| Error::JsError("incoming_bidirectional_streams closed".to_string()))?; + + let bidi_stream = to_js_type::(val)?; + let substream = Substream::new(bidi_stream)?; + + Poll::Ready(Ok(substream)) + } + + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + let val = ready!(this.poll_create_bidirectional_stream(cx))?; + let bidi_stream = to_js_type::(val)?; + let substream = Substream::new(bidi_stream)?; + + Poll::Ready(Ok(substream)) + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + self.get_mut().close(); + Poll::Ready(Ok(())) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.close(); + } +} + +fn build_webtransport_opts(certhashes: &[Multihash]) -> WebTransportOptions { + let mut opts = WebTransportOptions::new(); + let hashes = Array::new(); + + for hash in certhashes { + let digest = Uint8Array::from(hash.digest()); + + let mut jshash = WebTransportHash::new(); + jshash.algorithm("sha-256").value(&digest); + + hashes.push(&jshash); + } + + opts.server_certificate_hashes(&hashes); + + opts +} diff --git a/transports/wasm-ext/src/webtransport/error.rs b/transports/wasm-ext/src/webtransport/error.rs new file mode 100644 index 000000000000..c5535b001fab --- /dev/null +++ b/transports/wasm-ext/src/webtransport/error.rs @@ -0,0 +1,38 @@ +use libp2p_core::transport::TransportError; +use wasm_bindgen::{JsCast, JsValue}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Listening on WebTransport is not possible from within a browser")] + ListeningNotPossible, + + #[error("Invalid multiaddr: {0}")] + InvalidMultiaddr(&'static str), + + #[error("Noise error: {0}")] + Noise(#[from] libp2p_noise::Error), + + #[error("JavaScript error: {0}")] + JsError(String), +} + +impl From for Error { + fn from(value: JsValue) -> Self { + let s = if value.is_instance_of::() { + js_sys::Error::from(value) + .to_string() + .as_string() + .unwrap_or_else(|| "Unknown error".to_string()) + } else { + "Unknown error".to_string() + }; + + Error::JsError(s) + } +} + +impl From for TransportError { + fn from(value: Error) -> Self { + TransportError::Other(value) + } +} diff --git a/transports/wasm-ext/src/webtransport/substream.rs b/transports/wasm-ext/src/webtransport/substream.rs new file mode 100644 index 000000000000..584ff9226e84 --- /dev/null +++ b/transports/wasm-ext/src/webtransport/substream.rs @@ -0,0 +1,190 @@ +use futures::{AsyncRead, AsyncWrite}; +use js_sys::Uint8Array; +use send_wrapper::SendWrapper; +use std::io; +use std::pin::Pin; +use std::task::ready; +use std::task::{Context, Poll}; +use web_sys::{ + ReadableStreamDefaultReader, WebTransportBidirectionalStream, WritableStreamDefaultWriter, +}; + +use crate::webtransport::cached_js_promise::CachedJsPromise; +use crate::webtransport::utils::{detach_promise, parse_reader_response, to_io_error, to_js_type}; +use crate::webtransport::Error; + +pub struct Substream { + reader: SendWrapper, + reader_read_promise: CachedJsPromise, + read_leftovers: Option>, + writer: SendWrapper, + writer_state: StreamState, + writer_ready_promise: CachedJsPromise, + writer_closed_promise: CachedJsPromise, +} + +#[derive(PartialEq, Eq)] +enum StreamState { + Open, + Closing, + Closed, +} + +impl Substream { + pub(crate) fn new(bidi_stream: WebTransportBidirectionalStream) -> Result { + let recv_stream = bidi_stream.readable(); + let send_stream = bidi_stream.writable(); + + let reader = to_js_type::(recv_stream.get_reader())?; + let writer = send_stream.get_writer()?; + + Ok(Substream { + reader: SendWrapper::new(reader), + reader_read_promise: CachedJsPromise::new(), + read_leftovers: None, + writer: SendWrapper::new(writer), + writer_state: StreamState::Open, + writer_ready_promise: CachedJsPromise::new(), + writer_closed_promise: CachedJsPromise::new(), + }) + } + + fn poll_writer_ready(&mut self, cx: &mut Context) -> Poll> { + if self.writer_state != StreamState::Open { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + + let desired_size = self + .writer + .desired_size() + .map_err(to_io_error)? + .map(|n| n.trunc() as i64) + .unwrap_or(0); + + // We need to poll if the queue is full or if the promise was already activated + if desired_size <= 0 || self.writer_ready_promise.is_active() { + ready!(self + .writer_ready_promise + .maybe_init_and_poll(cx, || self.writer.ready())) + .map_err(to_io_error)?; + } + + Poll::Ready(Ok(())) + } + + fn poll_writer_close(&mut self, cx: &mut Context) -> Poll> { + match self.writer_state { + StreamState::Open => { + self.writer_state = StreamState::Closing; + + // Initiate close + detach_promise(self.writer.close()); + + // Assume closed on error + let _ = ready!(self + .writer_closed_promise + .maybe_init_and_poll(cx, || self.writer.closed())); + + self.writer_state = StreamState::Closed; + } + StreamState::Closing => { + // Assume closed on error + let _ = ready!(self.writer_closed_promise.poll(cx)); + self.writer_state = StreamState::Closed; + } + StreamState::Closed => {} + } + + Poll::Ready(Ok(())) + } + + fn poll_reader_read(&mut self, cx: &mut Context) -> Poll>> { + let val = ready!(self + .reader_read_promise + .maybe_init_and_poll(cx, || self.reader.read())) + .map_err(to_io_error)?; + + let val = parse_reader_response(&val) + .map_err(to_io_error)? + .map(Uint8Array::from); + + Poll::Ready(Ok(val)) + } +} + +impl Drop for Substream { + fn drop(&mut self) { + // On drop we abort any writes and cancel any ongoing reads + detach_promise(self.writer.abort()); + detach_promise(self.reader.cancel()); + } +} + +impl AsyncRead for Substream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + + // If we have leftovers from a previous read, then use them. + // Otherwise read new data. + let data = match this.read_leftovers.take() { + Some(data) => data.take(), + None => { + match ready!(this.poll_reader_read(cx))? { + Some(data) => data, + // EOF + None => return Poll::Ready(Ok(0)), + } + } + }; + + if data.byte_length() == 0 { + return Poll::Ready(Ok(0)); + } + + let out_len = data.byte_length().min(buf.len() as u32); + data.slice(0, out_len).copy_to(&mut buf[..out_len as usize]); + + let leftovers = data.slice(out_len, data.byte_length()); + + if leftovers.byte_length() > 0 { + this.read_leftovers = Some(SendWrapper::new(leftovers)); + } + + Poll::Ready(Ok(out_len as usize)) + } +} + +impl AsyncWrite for Substream { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.get_mut(); + + ready!(this.poll_writer_ready(cx))?; + + let data = Uint8Array::new_with_length(buf.len() as u32); + data.copy_from(buf); + + detach_promise(this.writer.write_with_chunk(&data)); + + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + + if this.writer_state == StreamState::Open { + // Writer has queue size of 1, so as soon it is ready, it means the + // messages were flushed + this.poll_writer_ready(cx) + } else { + Poll::Ready(Ok(())) + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.get_mut().poll_writer_close(cx) + } +} diff --git a/transports/wasm-ext/src/webtransport/transport.rs b/transports/wasm-ext/src/webtransport/transport.rs new file mode 100644 index 000000000000..03ff9b2e5fe8 --- /dev/null +++ b/transports/wasm-ext/src/webtransport/transport.rs @@ -0,0 +1,100 @@ +use futures::future::FutureExt; +use libp2p_core::address_translation; +use libp2p_core::multiaddr::Multiaddr; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::{ + Boxed, ListenerId, Transport as TransportTrait, TransportError, TransportEvent, +}; +use libp2p_identity::{Keypair, PeerId}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::webtransport::utils::parse_multiaddr; +use crate::webtransport::Connection; +use crate::webtransport::Error; + +pub struct Config { + keypair: Keypair, +} + +pub struct Transport { + config: Config, +} + +impl Config { + pub fn new(keypair: &Keypair) -> Self { + Config { + keypair: keypair.to_owned(), + } + } +} + +impl Transport { + pub fn new(config: Config) -> Transport { + Transport { config } + } +} + +impl Transport { + pub fn boxed(self) -> Boxed<(PeerId, StreamMuxerBox)> { + self.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer))) + .boxed() + } +} + +impl TransportTrait for Transport { + type Output = (PeerId, Connection); + type Error = Error; + type ListenerUpgrade = Pin> + Send>>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + _id: ListenerId, + _addr: Multiaddr, + ) -> Result<(), TransportError> { + Err(Error::ListeningNotPossible.into()) + } + + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let endpoint = parse_multiaddr(&addr).map_err(|e| match e { + Error::InvalidMultiaddr(_) => { + // TODO: Log error + TransportError::MultiaddrNotSupported(addr) + } + e => e.into(), + })?; + + let mut session = Connection::new(&endpoint)?; + let keypair = self.config.keypair.clone(); + + Ok(async move { + let peer_id = session.authenticate(&keypair, &endpoint.certhashes).await?; + Ok((peer_id, session)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { + address_translation(listen, observed) + } +} diff --git a/transports/wasm-ext/src/webtransport/utils.rs b/transports/wasm-ext/src/webtransport/utils.rs new file mode 100644 index 000000000000..136c29c5489c --- /dev/null +++ b/transports/wasm-ext/src/webtransport/utils.rs @@ -0,0 +1,191 @@ +use js_sys::{Promise, Reflect}; +use libp2p_core::multiaddr::{Multiaddr, Protocol}; +use libp2p_core::multihash::Multihash; +use libp2p_identity::PeerId; +use send_wrapper::SendWrapper; +use std::io; +use wasm_bindgen::{JsCast, JsValue}; + +use crate::webtransport::Error; + +pub struct WebTransportEndpoint { + pub host: String, + pub port: u16, + pub certhashes: Vec, + pub remote_peer: Option, +} + +pub fn parse_multiaddr(addr: &Multiaddr) -> Result { + let mut host = None; + let mut port = None; + let mut found_quic = false; + let mut found_webtransport = false; + let mut certhashes = Vec::new(); + let mut remote_peer = None; + + for proto in addr.iter() { + // TODO: Add ip6, dns4, dns6, dnsaddr + match proto { + Protocol::Ip4(addr) => { + if host.is_some() { + return Err(Error::InvalidMultiaddr("More than one host definitions")); + } + + host = Some(addr.to_string()); + } + Protocol::Udp(p) => { + if port.is_some() { + return Err(Error::InvalidMultiaddr("More than one port definitions")); + } + + port = Some(p); + } + Protocol::Quic | Protocol::QuicV1 => { + if host.is_none() || port.is_none() { + return Err(Error::InvalidMultiaddr( + "No host and port definition before /quic/webtransport", + )); + } + + found_quic = true; + } + Protocol::WebTransport => { + if !found_quic { + return Err(Error::InvalidMultiaddr( + "/quic is not found before /webtransport", + )); + } + + found_webtransport = true; + } + Protocol::Certhash(hash) => { + if !found_webtransport { + return Err(Error::InvalidMultiaddr( + "/certhashes must be after /quic/found_webtransport", + )); + } + + certhashes.push(hash); + } + Protocol::P2p(peer) => { + if remote_peer.is_some() { + return Err(Error::InvalidMultiaddr("More than one peer definitions")); + } + + let peer = PeerId::from_multihash(peer) + .map_err(|_| Error::InvalidMultiaddr("Invalid peer ID"))?; + + remote_peer = Some(peer); + } + _ => {} + } + } + + if !found_quic || !found_webtransport { + return Err(Error::InvalidMultiaddr( + "Not a /quic/webtransport multiaddr", + )); + } + + if certhashes.is_empty() { + return Err(Error::InvalidMultiaddr("No /certhashes found")); + } + + let host = host.ok_or_else(|| Error::InvalidMultiaddr("Host is not defined"))?; + let port = port.ok_or_else(|| Error::InvalidMultiaddr("Port is not defined"))?; + + Ok(WebTransportEndpoint { + host, + port, + certhashes, + remote_peer, + }) +} + +pub fn detach_promise(promise: Promise) { + type Closure = wasm_bindgen::closure::Closure; + static mut DO_NOTHING: Option> = None; + + // Allocate Closure only once and reuse it + let do_nothing = unsafe { + if DO_NOTHING.is_none() { + let cb = Closure::new(|_| {}); + DO_NOTHING = Some(SendWrapper::new(cb)); + } + + DO_NOTHING.as_deref().unwrap() + }; + + // Avoid having "floating" promises and ignore any errors. + // After `catch` we are allowed to drop the promise without. + // + // Ref: https://github.com/typescript-eslint/typescript-eslint/blob/391a6702c0a9b5b3874a7a27047f2a721f090fb6/packages/eslint-plugin/docs/rules/no-floating-promises.md + let _ = promise.catch(do_nothing); +} + +pub fn to_js_type(value: impl Into) -> Result +where + T: JsCast + From, +{ + let value = value.into(); + + if value.is_instance_of::() { + Err(value) + } else if value.is_instance_of::() { + Ok(T::from(value)) + } else { + // TODO: write better error message + Err(js_sys::TypeError::new("Invalid type").into()) + } +} + +pub fn to_io_error(value: JsValue) -> io::Error { + io::Error::new(io::ErrorKind::Other, Error::from(value)) +} + +pub fn parse_reader_response(resp: &JsValue) -> Result, JsValue> { + let value = Reflect::get(resp, &JsValue::from_str("value"))?; + let done = Reflect::get(resp, &JsValue::from_str("done"))? + .as_bool() + .unwrap_or_default(); + + if value.is_undefined() || done { + Ok(None) + } else { + Ok(Some(value)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use wasm_bindgen_test::wasm_bindgen_test; + + fn multihash_from_str(s: &str) -> Multihash { + let (_base, bytes) = multibase::decode(s).unwrap(); + Multihash::from_bytes(&bytes).unwrap() + } + + #[wasm_bindgen_test] + fn valid_webtransport_multiaddr() { + let addr = Multiaddr::from_str("/ip4/127.0.0.1/udp/44874/quic-v1/webtransport/certhash/uEiCaDd1Ca1A8IVJ3hsIxIyi11cwxaDKqzVrBkGJbKZU5ng/certhash/uEiDv-VGW8oXxui_G_Kqp-87YjvET-Hr2qYAMYPePJDcsjQ/p2p/12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap(); + let endpoint = parse_multiaddr(&addr).unwrap(); + + assert_eq!(endpoint.host, "127.0.0.1"); + assert_eq!(endpoint.port, 44874); + assert_eq!(endpoint.certhashes.len(), 2); + assert_eq!( + endpoint.certhashes[0], + multihash_from_str("uEiCaDd1Ca1A8IVJ3hsIxIyi11cwxaDKqzVrBkGJbKZU5ng") + ); + assert_eq!( + endpoint.certhashes[1], + multihash_from_str("uEiDv-VGW8oXxui_G_Kqp-87YjvET-Hr2qYAMYPePJDcsjQ") + ); + assert_eq!( + endpoint.remote_peer.unwrap(), + PeerId::from_str("12D3KooWR7EfNv5SLtgjMRjUwR8AvNu3hP4fLrtSa9fmHHXKYWNG").unwrap() + ); + } +}