Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UDP GRO #1350

Merged
merged 4 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ impl UdpSocket {
debug_assert!(!bufs.is_empty());
let mut buf = ReadBuf::new(&mut bufs[0]);
let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?;
let len = buf.filled().len();
meta[0] = RecvMeta {
len: buf.filled().len(),
len,
stride: len,
addr,
ecn: None,
dst_ip: None,
Expand All @@ -95,6 +97,7 @@ impl UdpSocket {
pub fn udp_state() -> super::UdpState {
super::UdpState {
max_gso_segments: std::sync::atomic::AtomicUsize::new(1),
gro_segments: 1,
}
}

Expand Down
12 changes: 12 additions & 0 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
#[derive(Debug)]
pub struct UdpState {
max_gso_segments: AtomicUsize,
gro_segments: usize,
}

impl UdpState {
Expand All @@ -53,6 +54,15 @@ impl UdpState {
pub fn max_gso_segments(&self) -> usize {
self.max_gso_segments.load(Ordering::Relaxed)
}

/// The number of segments to read when GRO is enabled. Used as a factor to
/// compute the receive buffer size.
///
/// Returns 1 if the platform doesn't support GRO.
#[inline]
pub fn gro_segments(&self) -> usize {
self.gro_segments
}
}

impl Default for UdpState {
Expand All @@ -65,6 +75,7 @@ impl Default for UdpState {
pub struct RecvMeta {
pub addr: SocketAddr,
pub len: usize,
pub stride: usize,
pub ecn: Option<EcnCodepoint>,
/// The destination IP address which was encoded in this datagram
pub dst_ip: Option<IpAddr>,
Expand All @@ -76,6 +87,7 @@ impl Default for RecvMeta {
Self {
addr: SocketAddr::new(Ipv6Addr::UNSPECIFIED.into(), 0),
len: 0,
stride: 0,
ecn: None,
dst_ip: None,
}
Expand Down
56 changes: 56 additions & 0 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct UdpSocket {
impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;

init(&socket)?;
let now = Instant::now();
Ok(UdpSocket {
Expand Down Expand Up @@ -134,6 +135,18 @@ fn init(io: &std::net::UdpSocket) -> io::Result<()> {
}
#[cfg(target_os = "linux")]
{
// opportunistically try to enable GRO. See gro::gro_segments().
let on: libc::c_int = 1;
unsafe {
libc::setsockopt(
io.as_raw_fd(),
libc::SOL_UDP,
libc::UDP_GRO,
&on as *const _ as _,
mem::size_of_val(&on) as _,
)
};

if addr.is_ipv4() {
let rc = unsafe {
libc::setsockopt(
Expand Down Expand Up @@ -407,6 +420,7 @@ fn recv(
pub fn udp_state() -> UdpState {
UdpState {
max_gso_segments: AtomicUsize::new(gso::max_gso_segments()),
gro_segments: gro::gro_segments(),
}
}

Expand Down Expand Up @@ -500,6 +514,7 @@ fn decode_recv(
let name = unsafe { name.assume_init() };
let mut ecn_bits = 0;
let mut dst_ip = None;
let mut stride = len;

let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
for cmsg in cmsg_iter {
Expand Down Expand Up @@ -527,6 +542,10 @@ fn decode_recv(
let pktinfo = cmsg::decode::<libc::in6_pktinfo>(cmsg);
dst_ip = Some(IpAddr::V6(ptr::read(&pktinfo.ipi6_addr as *const _ as _)));
},
#[cfg(target_os = "linux")]
(libc::SOL_UDP, libc::UDP_GRO) => unsafe {
stride = cmsg::decode::<libc::c_int>(cmsg) as usize;
},
_ => {}
}
}
Expand All @@ -539,6 +558,7 @@ fn decode_recv(

RecvMeta {
len,
stride,
addr,
ecn: EcnCodepoint::from_bits(ecn_bits),
dst_ip,
Expand Down Expand Up @@ -602,3 +622,39 @@ mod gso {
panic!("Setting a segment size is not supported on current platform");
}
}

#[cfg(target_os = "linux")]
mod gro {
use super::*;

pub fn gro_segments() -> usize {
let socket = match std::net::UdpSocket::bind("[::]:0") {
Ok(socket) => socket,
Err(_) => return 1,
};

let on: libc::c_int = 1;
let rc = unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_UDP,
libc::UDP_GRO,
&on as *const _ as _,
mem::size_of_val(&on) as _,
)
};

if rc != -1 {
10
} else {
1
}
}
}

#[cfg(not(target_os = "linux"))]
mod gro {
pub fn gro_segments() -> usize {
1
}
}
58 changes: 34 additions & 24 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::Instant,
};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use proto::{
self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig,
};
Expand Down Expand Up @@ -346,27 +346,32 @@ impl EndpointInner {
Poll::Ready(Ok(msgs)) => {
self.recv_limiter.record_work(msgs);
for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) {
let data = buf[0..meta.len].into();
match self
.inner
.handle(now, meta.addr, meta.dst_ip, meta.ecn, data)
{
Some((handle, DatagramEvent::NewConnection(conn))) => {
let conn =
self.connections
.insert(handle, conn, self.udp_state.clone());
self.incoming.push_back(conn);
let mut data: BytesMut = buf[0..meta.len].into();
while !data.is_empty() {
let buf = data.split_to(meta.stride.min(data.len()));
match self
.inner
.handle(now, meta.addr, meta.dst_ip, meta.ecn, buf)
{
Some((handle, DatagramEvent::NewConnection(conn))) => {
let conn = self.connections.insert(
handle,
conn,
self.udp_state.clone(),
);
self.incoming.push_back(conn);
}
Some((handle, DatagramEvent::ConnectionEvent(event))) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
let _ = self
.connections
.senders
.get_mut(&handle)
.unwrap()
.send(ConnectionEvent::Proto(event));
}
None => {}
}
Some((handle, DatagramEvent::ConnectionEvent(event))) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
let _ = self
.connections
.senders
.get_mut(&handle)
.unwrap()
.send(ConnectionEvent::Proto(event));
}
None => {}
}
}
}
Expand Down Expand Up @@ -565,12 +570,17 @@ pub(crate) struct EndpointRef(Arc<Mutex<EndpointInner>>);

impl EndpointRef {
pub(crate) fn new(socket: UdpSocket, inner: proto::Endpoint, ipv6: bool) -> Self {
let recv_buf =
vec![0; inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * BATCH_SIZE];
let udp_state = Arc::new(UdpState::new());
let recv_buf = vec![
0;
inner.config().get_max_udp_payload_size().min(64 * 1024) as usize
* udp_state.gro_segments()
* BATCH_SIZE
];
let (sender, events) = mpsc::unbounded_channel();
Self(Arc::new(Mutex::new(EndpointInner {
socket,
udp_state: Arc::new(UdpState::new()),
udp_state,
inner,
ipv6,
events,
Expand Down