Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File: Try doing a non-blocking read before punting to the threadpool #3518

Merged
merged 5 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { version = "0.2.87" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
109 changes: 107 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,18 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let buf = buf_cell.as_mut().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

if let Some(x) = read_nowait::try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

let mut buf = buf_cell.take().unwrap();
buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,104 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
mod read_nowait {
use crate::io::ReadBuf;
use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t};
use std::{
os::unix::prelude::AsRawFd,
sync::atomic::{AtomicBool, Ordering},
};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);

pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2_safe(file, dst, -1, RWF_NOWAIT);
if let Err(err) = &out {
match err.raw_os_error() {
Some(libc::ENOSYS) => {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
wmanley marked this conversation as resolved.
Show resolved Hide resolved
Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
_ => {}
}
}
Some(out)
}

fn preadv2_safe(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We have to defend against buffer overflows manually here. The slice API makes
* this fairly straightforward. */
let unfilled = dst.unfilled_mut();
let iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(file.as_raw_fd(), &iov as *const iovec, 1, offset, flags);
wmanley marked this conversation as resolved.
Show resolved Hide resolved
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(dst.filled().len() + bytes_read as usize);
dst.advance(bytes_read as usize);
wmanley marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}
}

fn pos_to_lohi(offset: off_t) -> (c_long, c_long) {
// 64-bit offset is split over high and low 32-bits on 32-bit architectures.
// 64-bit architectures still have high and low arguments, but only the low
// one is inspected. See pos_from_hilo in linux/fs/read_write.c.
const HALF_LONG_BITS: usize = core::mem::size_of::<c_long>() * 8 / 2;
(
offset as c_long,
// We want to shift this off_t value by size_of::<c_long>(). We can't do
// it in one shift because if they're both 64-bits we'd be doing u64 >> 64
// which is implementation defined. Instead do it in two halves:
((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long,
)
}

const RWF_NOWAIT: c_int = 0x00000008;
wmanley marked this conversation as resolved.
Show resolved Hide resolved
unsafe fn preadv2(
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
flags: c_int,
) -> ssize_t {
let (lo, hi) = pos_to_lohi(offset);
libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t
}
}

#[cfg(any(not(target_os = "linux"), test))]
mod read_nowait {
use crate::io::ReadBuf;

pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}
}
21 changes: 21 additions & 0 deletions tokio/tests/fs_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ async fn basic_read() {

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);

// Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in
// file.rs)
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
nix::unistd::fsync(tempfile.as_raw_fd()).unwrap();
nix::fcntl::posix_fadvise(
tempfile.as_raw_fd(),
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
)
.unwrap();
}

let mut file = File::open(tempfile.path()).await.unwrap();
let n = file.read(&mut buf).await.unwrap();

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);
}

#[tokio::test]
Expand Down