Skip to content

Commit

Permalink
File: Try doing a non-blocking read before punting to the threadpool
Browse files Browse the repository at this point in the history
...on Linux.

If the data is already available in cache this will avoid cross-thread
interaction and remove a copy.  It should help with latency too as reads
that can be satisfied now won't need to wait in queue until other fs
operations are complete.
  • Loading branch information
wmanley committed Mar 26, 2021
1 parent 724ba34 commit ef5c74c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 5 deletions.
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { version = "0.2.87" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
83 changes: 81 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,18 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let buf = buf_cell.as_mut().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

if let Some(x) = read_nowait::try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

let mut buf = buf_cell.take().unwrap();
buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,78 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
mod read_nowait {
use crate::io::ReadBuf;
use libc::{c_int, c_void, iovec, off_t, preadv2};
use std::{
os::unix::prelude::AsRawFd,
sync::atomic::{AtomicBool, Ordering},
};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);

pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2_safe(file, dst, -1, libc::RWF_NOWAIT);
if let Err(err) = &out {
match err.raw_os_error() {
Some(libc::ENOSYS) => {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
_ => {}
}
}
Some(out)
}

fn preadv2_safe(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We have to defend against buffer overflows manually here. The slice API makes
* this fairly straightforward. */
let unfilled = dst.unfilled_mut();
let iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(file.as_raw_fd(), &iov as *const iovec, 1, offset, flags);
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(dst.filled().len() + bytes_read as usize);
dst.advance(bytes_read as usize);
Ok(())
}
}
}
}

#[cfg(any(not(target_os = "linux"), test))]
mod read_nowait {
use crate::io::ReadBuf;

pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}
}

0 comments on commit ef5c74c

Please sign in to comment.