From 4e80022234a83405742b7e816c5f90cf6123851b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alessandro=C2=A0Decina?= Date: Wed, 4 May 2022 08:22:52 +0000 Subject: [PATCH 1/4] WIP: enable GRO --- quinn-udp/src/lib.rs | 2 ++ quinn-udp/src/unix.rs | 21 +++++++++++++++++ quinn/src/endpoint.rs | 52 +++++++++++++++++++++++++------------------ 3 files changed, 53 insertions(+), 22 deletions(-) diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index fd232b023..e4ef45b41 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -68,6 +68,7 @@ pub struct RecvMeta { pub ecn: Option, /// The destination IP address which was encoded in this datagram pub dst_ip: Option, + pub gso_size: Option, } impl Default for RecvMeta { @@ -78,6 +79,7 @@ impl Default for RecvMeta { len: 0, ecn: None, dst_ip: None, + gso_size: None, } } } diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index a3746ac1f..01395bc25 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -52,6 +52,7 @@ pub struct UdpSocket { impl UdpSocket { pub fn from_std(socket: std::net::UdpSocket) -> io::Result { socket.set_nonblocking(true)?; + init(&socket)?; let now = Instant::now(); Ok(UdpSocket { @@ -134,6 +135,20 @@ fn init(io: &std::net::UdpSocket) -> io::Result<()> { } #[cfg(target_os = "linux")] { + let on: libc::c_int = 1; + let rc = unsafe { + libc::setsockopt( + io.as_raw_fd(), + libc::SOL_UDP, + libc::UDP_GRO, + &on as *const _ as _, + mem::size_of_val(&on) as _, + ) + }; + if rc == -1 { + return Err(io::Error::last_os_error()); + } + if addr.is_ipv4() { let rc = unsafe { libc::setsockopt( @@ -500,6 +515,7 @@ fn decode_recv( let name = unsafe { name.assume_init() }; let mut ecn_bits = 0; let mut dst_ip = None; + let mut gso_size = None; let cmsg_iter = unsafe { cmsg::Iter::new(hdr) }; for cmsg in cmsg_iter { @@ -527,6 +543,10 @@ fn decode_recv( let pktinfo = cmsg::decode::(cmsg); dst_ip = Some(IpAddr::V6(ptr::read(&pktinfo.ipi6_addr as *const _ as _))); }, + #[cfg(target_os = "linux")] + (libc::SOL_UDP, libc::UDP_GRO) => unsafe { + gso_size = Some(cmsg::decode::(cmsg) as usize); + }, _ => {} } } @@ -542,6 +562,7 @@ fn decode_recv( addr, ecn: EcnCodepoint::from_bits(ecn_bits), dst_ip, + gso_size, } } diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 698b706e7..5b0f5d8c7 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -12,7 +12,7 @@ use std::{ time::Instant, }; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use proto::{ self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig, }; @@ -346,27 +346,31 @@ impl EndpointInner { Poll::Ready(Ok(msgs)) => { self.recv_limiter.record_work(msgs); for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) { - let data = buf[0..meta.len].into(); - match self - .inner - .handle(now, meta.addr, meta.dst_ip, meta.ecn, data) - { - Some((handle, DatagramEvent::NewConnection(conn))) => { - let conn = - self.connections - .insert(handle, conn, self.udp_state.clone()); - self.incoming.push_back(conn); + for buf in buf[0..meta.len].chunks(meta.gso_size.unwrap_or(meta.len)) { + let data: BytesMut = buf.into(); + match self + .inner + .handle(now, meta.addr, meta.dst_ip, meta.ecn, data) + { + Some((handle, DatagramEvent::NewConnection(conn))) => { + let conn = self.connections.insert( + handle, + conn, + self.udp_state.clone(), + ); + self.incoming.push_back(conn); + } + Some((handle, DatagramEvent::ConnectionEvent(event))) => { + // Ignoring errors from dropped connections that haven't yet been cleaned up + let _ = self + .connections + .senders + .get_mut(&handle) + .unwrap() + .send(ConnectionEvent::Proto(event)); + } + None => {} } - Some((handle, DatagramEvent::ConnectionEvent(event))) => { - // Ignoring errors from dropped connections that haven't yet been cleaned up - let _ = self - .connections - .senders - .get_mut(&handle) - .unwrap() - .send(ConnectionEvent::Proto(event)); - } - None => {} } } } @@ -565,8 +569,12 @@ pub(crate) struct EndpointRef(Arc>); impl EndpointRef { pub(crate) fn new(socket: UdpSocket, inner: proto::Endpoint, ipv6: bool) -> Self { + // FIXME: don't hardcode the GRO size let recv_buf = - vec![0; inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * BATCH_SIZE]; + vec![ + 0; + inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * 10 * BATCH_SIZE + ]; let (sender, events) = mpsc::unbounded_channel(); Self(Arc::new(Mutex::new(EndpointInner { socket, From 71c568935d28815ba8f29f09a1115b4bf4309244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alessandro=C2=A0Decina?= Date: Thu, 5 May 2022 06:29:26 +0000 Subject: [PATCH 2/4] Allocate once --- quinn/src/endpoint.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 5b0f5d8c7..79d95db6d 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -346,11 +346,13 @@ impl EndpointInner { Poll::Ready(Ok(msgs)) => { self.recv_limiter.record_work(msgs); for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) { - for buf in buf[0..meta.len].chunks(meta.gso_size.unwrap_or(meta.len)) { - let data: BytesMut = buf.into(); + let mut data: BytesMut = buf[0..meta.len].into(); + while !data.is_empty() { + let buf = + data.split_to(meta.gso_size.unwrap_or(meta.len).min(data.len())); match self .inner - .handle(now, meta.addr, meta.dst_ip, meta.ecn, data) + .handle(now, meta.addr, meta.dst_ip, meta.ecn, buf) { Some((handle, DatagramEvent::NewConnection(conn))) => { let conn = self.connections.insert( From df41f9960e8d576d114e8b6aa84d66b230f01b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alessandro=C2=A0Decina?= Date: Thu, 5 May 2022 06:32:25 +0000 Subject: [PATCH 3/4] Rename gso_size to stride --- quinn-udp/src/fallback.rs | 4 +++- quinn-udp/src/lib.rs | 4 ++-- quinn-udp/src/unix.rs | 6 +++--- quinn/src/endpoint.rs | 3 +-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/quinn-udp/src/fallback.rs b/quinn-udp/src/fallback.rs index e3855c6b8..00fea1ef4 100644 --- a/quinn-udp/src/fallback.rs +++ b/quinn-udp/src/fallback.rs @@ -77,8 +77,10 @@ impl UdpSocket { debug_assert!(!bufs.is_empty()); let mut buf = ReadBuf::new(&mut bufs[0]); let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?; + let len = buf.filled().len(); meta[0] = RecvMeta { - len: buf.filled().len(), + len, + stride: len, addr, ecn: None, dst_ip: None, diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index e4ef45b41..058e6ac20 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -65,10 +65,10 @@ impl Default for UdpState { pub struct RecvMeta { pub addr: SocketAddr, pub len: usize, + pub stride: usize, pub ecn: Option, /// The destination IP address which was encoded in this datagram pub dst_ip: Option, - pub gso_size: Option, } impl Default for RecvMeta { @@ -77,9 +77,9 @@ impl Default for RecvMeta { Self { addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0), len: 0, + stride: 0, ecn: None, dst_ip: None, - gso_size: None, } } } diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 01395bc25..1e3331799 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -515,7 +515,7 @@ fn decode_recv( let name = unsafe { name.assume_init() }; let mut ecn_bits = 0; let mut dst_ip = None; - let mut gso_size = None; + let mut stride = len; let cmsg_iter = unsafe { cmsg::Iter::new(hdr) }; for cmsg in cmsg_iter { @@ -545,7 +545,7 @@ fn decode_recv( }, #[cfg(target_os = "linux")] (libc::SOL_UDP, libc::UDP_GRO) => unsafe { - gso_size = Some(cmsg::decode::(cmsg) as usize); + stride = cmsg::decode::(cmsg) as usize; }, _ => {} } @@ -559,10 +559,10 @@ fn decode_recv( RecvMeta { len, + stride, addr, ecn: EcnCodepoint::from_bits(ecn_bits), dst_ip, - gso_size, } } diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 79d95db6d..7a50490e2 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -348,8 +348,7 @@ impl EndpointInner { for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) { let mut data: BytesMut = buf[0..meta.len].into(); while !data.is_empty() { - let buf = - data.split_to(meta.gso_size.unwrap_or(meta.len).min(data.len())); + let buf = data.split_to(meta.stride.min(data.len())); match self .inner .handle(now, meta.addr, meta.dst_ip, meta.ecn, buf) From 58945c6277b095d04a25d5685ae852ec43054c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alessandro=C2=A0Decina?= Date: Thu, 5 May 2022 10:48:28 +0000 Subject: [PATCH 4/4] UdpState::gro_segments --- quinn-udp/src/fallback.rs | 1 + quinn-udp/src/lib.rs | 10 +++++++++ quinn-udp/src/unix.rs | 43 +++++++++++++++++++++++++++++++++++---- quinn/src/endpoint.rs | 15 +++++++------- 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/quinn-udp/src/fallback.rs b/quinn-udp/src/fallback.rs index 00fea1ef4..14556142a 100644 --- a/quinn-udp/src/fallback.rs +++ b/quinn-udp/src/fallback.rs @@ -97,6 +97,7 @@ impl UdpSocket { pub fn udp_state() -> super::UdpState { super::UdpState { max_gso_segments: std::sync::atomic::AtomicUsize::new(1), + gro_segments: 1, } } diff --git a/quinn-udp/src/lib.rs b/quinn-udp/src/lib.rs index 058e6ac20..05071102a 100644 --- a/quinn-udp/src/lib.rs +++ b/quinn-udp/src/lib.rs @@ -37,6 +37,7 @@ pub const BATCH_SIZE: usize = imp::BATCH_SIZE; #[derive(Debug)] pub struct UdpState { max_gso_segments: AtomicUsize, + gro_segments: usize, } impl UdpState { @@ -53,6 +54,15 @@ impl UdpState { pub fn max_gso_segments(&self) -> usize { self.max_gso_segments.load(Ordering::Relaxed) } + + /// The number of segments to read when GRO is enabled. Used as a factor to + /// compute the receive buffer size. + /// + /// Returns 1 if the platform doesn't support GRO. + #[inline] + pub fn gro_segments(&self) -> usize { + self.gro_segments + } } impl Default for UdpState { diff --git a/quinn-udp/src/unix.rs b/quinn-udp/src/unix.rs index 1e3331799..1244dce8d 100644 --- a/quinn-udp/src/unix.rs +++ b/quinn-udp/src/unix.rs @@ -135,8 +135,9 @@ fn init(io: &std::net::UdpSocket) -> io::Result<()> { } #[cfg(target_os = "linux")] { + // opportunistically try to enable GRO. See gro::gro_segments(). let on: libc::c_int = 1; - let rc = unsafe { + unsafe { libc::setsockopt( io.as_raw_fd(), libc::SOL_UDP, @@ -145,9 +146,6 @@ fn init(io: &std::net::UdpSocket) -> io::Result<()> { mem::size_of_val(&on) as _, ) }; - if rc == -1 { - return Err(io::Error::last_os_error()); - } if addr.is_ipv4() { let rc = unsafe { @@ -422,6 +420,7 @@ fn recv( pub fn udp_state() -> UdpState { UdpState { max_gso_segments: AtomicUsize::new(gso::max_gso_segments()), + gro_segments: gro::gro_segments(), } } @@ -623,3 +622,39 @@ mod gso { panic!("Setting a segment size is not supported on current platform"); } } + +#[cfg(target_os = "linux")] +mod gro { + use super::*; + + pub fn gro_segments() -> usize { + let socket = match std::net::UdpSocket::bind("[::]:0") { + Ok(socket) => socket, + Err(_) => return 1, + }; + + let on: libc::c_int = 1; + let rc = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_UDP, + libc::UDP_GRO, + &on as *const _ as _, + mem::size_of_val(&on) as _, + ) + }; + + if rc != -1 { + 10 + } else { + 1 + } + } +} + +#[cfg(not(target_os = "linux"))] +mod gro { + pub fn gro_segments() -> usize { + 1 + } +} diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index 7a50490e2..98d817376 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -570,16 +570,17 @@ pub(crate) struct EndpointRef(Arc>); impl EndpointRef { pub(crate) fn new(socket: UdpSocket, inner: proto::Endpoint, ipv6: bool) -> Self { - // FIXME: don't hardcode the GRO size - let recv_buf = - vec![ - 0; - inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * 10 * BATCH_SIZE - ]; + let udp_state = Arc::new(UdpState::new()); + let recv_buf = vec![ + 0; + inner.config().get_max_udp_payload_size().min(64 * 1024) as usize + * udp_state.gro_segments() + * BATCH_SIZE + ]; let (sender, events) = mpsc::unbounded_channel(); Self(Arc::new(Mutex::new(EndpointInner { socket, - udp_state: Arc::new(UdpState::new()), + udp_state, inner, ipv6, events,