Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File: Try doing a non-blocking read before punting to the threadpool #3518

Merged
merged 5 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { version = "0.2.87" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
191 changes: 189 additions & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,18 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let buf = buf_cell.as_mut().unwrap();

if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
}

if let Some(x) = try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

let mut buf = buf_cell.take().unwrap();
buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,186 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
use std::sync::atomic::{AtomicBool, Ordering};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2::preadv2_safe(file, dst, -1, preadv2::RWF_NOWAIT);
if let Err(err) = &out {
match err.raw_os_error() {
Some(libc::ENOSYS) => {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
_ => {}
}
}
Some(out)
}

#[cfg(any(not(target_os = "linux"), test))]
pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}

#[cfg(target_os = "linux")]
mod preadv2 {
use libc::{c_int, c_long, c_void, iovec, off_t, ssize_t};
use std::os::unix::prelude::AsRawFd;

use crate::io::ReadBuf;

pub(crate) fn preadv2_safe(
file: &std::fs::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We have to defend against buffer overflows manually here. The slice API makes
* this fairly straightforward. */
let unfilled = dst.unfilled_mut();
let mut iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(
file.as_raw_fd(),
&mut iov as *mut iovec as *const iovec,
1,
offset,
flags,
);
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(bytes_read as usize);
dst.advance(bytes_read as usize);
Ok(())
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_preadv2_safe() {
use std::io::{Seek, Write};
use std::mem::MaybeUninit;
use tempfile::tempdir;

let tmp = tempdir().unwrap();
let filename = tmp.path().join("file");
const MESSAGE: &[u8] = b"Hello this is a test";
{
let mut f = std::fs::File::create(&filename).unwrap();
f.write_all(MESSAGE).unwrap();
}
let f = std::fs::File::open(&filename).unwrap();

let mut buf = [MaybeUninit::<u8>::new(0); 50];
let mut br = ReadBuf::uninit(&mut buf);

// Basic use:
preadv2_safe(&f, &mut br, 0, 0).unwrap();
assert_eq!(br.initialized().len(), MESSAGE.len());
assert_eq!(br.filled(), MESSAGE);

// Here we check that offset works, but also that appending to a non-empty buffer
// behaves correctly WRT initialisation.
preadv2_safe(&f, &mut br, 5, 0).unwrap();
assert_eq!(br.initialized().len(), MESSAGE.len() * 2 - 5);
assert_eq!(br.filled(), b"Hello this is a test this is a test".as_ref());

// offset of -1 means use the current cursor. This has not been advanced by the
// previous reads because we specified an offset there.
preadv2_safe(&f, &mut br, -1, 0).unwrap();
assert_eq!(br.remaining(), 0);
assert_eq!(
br.filled(),
b"Hello this is a test this is a testHello this is a".as_ref()
);

// but the offset should have been advanced by that read
br.clear();
preadv2_safe(&f, &mut br, -1, 0).unwrap();
assert_eq!(br.filled(), b" test");

// This should be in cache, so RWF_NOWAIT should work, but it not being in cache
// (EAGAIN) or not supported by the underlying filesystem (ENOTSUP) is fine too.
br.clear();
match preadv2_safe(&f, &mut br, 0, RWF_NOWAIT) {
Ok(()) => assert_eq!(br.filled(), MESSAGE),
Err(e) => assert!(matches!(
e.raw_os_error(),
Some(libc::ENOTSUP) | Some(libc::EAGAIN)
)),
}

// Test handling large offsets
{
// I hope the underlying filesystem supports sparse files
let mut w = std::fs::OpenOptions::new()
.write(true)
.open(&filename)
.unwrap();
w.set_len(0x1_0000_0000).unwrap();
w.seek(std::io::SeekFrom::Start(0x1_0000_0000)).unwrap();
w.write_all(b"This is a Large File").unwrap();
}

br.clear();
preadv2_safe(&f, &mut br, 0x1_0000_0008, 0).unwrap();
assert_eq!(br.filled(), b"a Large File");
}
}

fn pos_to_lohi(offset: off_t) -> (c_long, c_long) {
// 64-bit offset is split over high and low 32-bits on 32-bit architectures.
// 64-bit architectures still have high and low arguments, but only the low
// one is inspected. See pos_from_hilo in linux/fs/read_write.c.
const HALF_LONG_BITS: usize = core::mem::size_of::<c_long>() * 8 / 2;
(
offset as c_long,
// We want to shift this off_t value by size_of::<c_long>(). We can't do
// it in one shift because if they're both 64-bits we'd be doing u64 >> 64
// which is implementation defined. Instead do it in two halves:
((offset >> HALF_LONG_BITS) >> HALF_LONG_BITS) as c_long,
)
}

pub(crate) const RWF_NOWAIT: c_int = 0x00000008;
unsafe fn preadv2(
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
flags: c_int,
) -> ssize_t {
// Call via libc::syscall rather than libc::preadv2. preadv2 is only supported by glibc
// and only since v2.26. By using syscall we don't need to worry about compatiblity with
// old glibc versions and it will work on Android and musl too. The downside is that you
// can't use `LD_PRELOAD` tricks any more to intercept these calls.
let (lo, hi) = pos_to_lohi(offset);
libc::syscall(libc::SYS_preadv2, fd, iov, iovcnt, lo, hi, flags) as ssize_t
}
}
21 changes: 21 additions & 0 deletions tokio/tests/fs_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ async fn basic_read() {

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);

// Drop the data from the cache to stimulate uncached codepath on Linux (see preadv2 in
// file.rs)
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
nix::unistd::fsync(tempfile.as_raw_fd()).unwrap();
nix::fcntl::posix_fadvise(
tempfile.as_raw_fd(),
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
)
.unwrap();
}

let mut file = File::open(tempfile.path()).await.unwrap();
let n = file.read(&mut buf).await.unwrap();

assert_eq!(n, HELLO.len());
assert_eq!(&buf[..n], HELLO);
}

#[tokio::test]
Expand Down