Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UDP GRO #1350

Merged
merged 4 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct RecvMeta {
pub ecn: Option<EcnCodepoint>,
/// The destination IP address which was encoded in this datagram
pub dst_ip: Option<IpAddr>,
pub gso_size: Option<usize>,
Ralith marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for RecvMeta {
Expand All @@ -78,6 +79,7 @@ impl Default for RecvMeta {
len: 0,
ecn: None,
dst_ip: None,
gso_size: None,
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct UdpSocket {
impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;

init(&socket)?;
let now = Instant::now();
Ok(UdpSocket {
Expand Down Expand Up @@ -134,6 +135,20 @@ fn init(io: &std::net::UdpSocket) -> io::Result<()> {
}
#[cfg(target_os = "linux")]
{
let on: libc::c_int = 1;
let rc = unsafe {
libc::setsockopt(
io.as_raw_fd(),
libc::SOL_UDP,
libc::UDP_GRO,
&on as *const _ as _,
mem::size_of_val(&on) as _,
)
};
if rc == -1 {
return Err(io::Error::last_os_error());
}

if addr.is_ipv4() {
let rc = unsafe {
libc::setsockopt(
Expand Down Expand Up @@ -500,6 +515,7 @@ fn decode_recv(
let name = unsafe { name.assume_init() };
let mut ecn_bits = 0;
let mut dst_ip = None;
let mut gso_size = None;

let cmsg_iter = unsafe { cmsg::Iter::new(hdr) };
for cmsg in cmsg_iter {
Expand Down Expand Up @@ -527,6 +543,10 @@ fn decode_recv(
let pktinfo = cmsg::decode::<libc::in6_pktinfo>(cmsg);
dst_ip = Some(IpAddr::V6(ptr::read(&pktinfo.ipi6_addr as *const _ as _)));
},
#[cfg(target_os = "linux")]
(libc::SOL_UDP, libc::UDP_GRO) => unsafe {
gso_size = Some(cmsg::decode::<libc::c_int>(cmsg) as usize);
},
_ => {}
}
}
Expand All @@ -542,6 +562,7 @@ fn decode_recv(
addr,
ecn: EcnCodepoint::from_bits(ecn_bits),
dst_ip,
gso_size,
}
}

Expand Down
52 changes: 30 additions & 22 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::Instant,
};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use proto::{
self as proto, ClientConfig, ConnectError, ConnectionHandle, DatagramEvent, ServerConfig,
};
Expand Down Expand Up @@ -346,27 +346,31 @@ impl EndpointInner {
Poll::Ready(Ok(msgs)) => {
self.recv_limiter.record_work(msgs);
for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) {
let data = buf[0..meta.len].into();
match self
.inner
.handle(now, meta.addr, meta.dst_ip, meta.ecn, data)
{
Some((handle, DatagramEvent::NewConnection(conn))) => {
let conn =
self.connections
.insert(handle, conn, self.udp_state.clone());
self.incoming.push_back(conn);
for buf in buf[0..meta.len].chunks(meta.gso_size.unwrap_or(meta.len)) {
let data: BytesMut = buf.into();
Ralith marked this conversation as resolved.
Show resolved Hide resolved
match self
.inner
.handle(now, meta.addr, meta.dst_ip, meta.ecn, data)
{
Some((handle, DatagramEvent::NewConnection(conn))) => {
let conn = self.connections.insert(
handle,
conn,
self.udp_state.clone(),
);
self.incoming.push_back(conn);
}
Some((handle, DatagramEvent::ConnectionEvent(event))) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
let _ = self
.connections
.senders
.get_mut(&handle)
.unwrap()
.send(ConnectionEvent::Proto(event));
}
None => {}
}
Some((handle, DatagramEvent::ConnectionEvent(event))) => {
// Ignoring errors from dropped connections that haven't yet been cleaned up
let _ = self
.connections
.senders
.get_mut(&handle)
.unwrap()
.send(ConnectionEvent::Proto(event));
}
None => {}
}
}
}
Expand Down Expand Up @@ -565,8 +569,12 @@ pub(crate) struct EndpointRef(Arc<Mutex<EndpointInner>>);

impl EndpointRef {
pub(crate) fn new(socket: UdpSocket, inner: proto::Endpoint, ipv6: bool) -> Self {
// FIXME: don't hardcode the GRO size
let recv_buf =
vec![0; inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * BATCH_SIZE];
vec![
0;
inner.config().get_max_udp_payload_size().min(64 * 1024) as usize * 10 * BATCH_SIZE
];
let (sender, events) = mpsc::unbounded_channel();
Self(Arc::new(Mutex::new(EndpointInner {
socket,
Expand Down