Skip to content

Commit

Permalink
File: Try doing a non-blocking read before punting to the threadpool
Browse files Browse the repository at this point in the history
...on Linux.

If the data is already available in cache this will avoid cross-thread
interaction and remove a copy.  It should help with latency too as reads
that can be satisfied now won't need to wait in queue until other fs
operations are complete.
  • Loading branch information
wmanley committed Mar 21, 2021
1 parent 63395f0 commit 755a704
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
6 changes: 3 additions & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ full = [
"time",
]

fs = []
fs = ["libc"]
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
Expand Down Expand Up @@ -103,11 +103,11 @@ parking_lot = { version = "0.11.0", optional = true }
tracing = { version = "0.1.21", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
libc = { version = "0.2.42", optional = true }
libc = { version = "0.2.87", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }

[target.'cfg(unix)'.dev-dependencies]
libc = { version = "0.2.42" }
libc = { version = "0.2.87" }
nix = { version = "0.19.0" }

[target.'cfg(windows)'.dependencies.winapi]
Expand Down
79 changes: 79 additions & 0 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,10 @@ impl AsyncRead for File {
return Ready(Ok(()));
}

if let Some(x) = read_nowait::try_nonblocking_read(me.std.as_ref(), dst) {
return Ready(x);
}

buf.ensure_capacity_for(dst);
let std = me.std.clone();

Expand Down Expand Up @@ -756,3 +760,78 @@ impl Inner {
}
}
}

#[cfg(all(target_os = "linux", not(test)))]
mod read_nowait {
use crate::io::ReadBuf;
use libc::{c_int, c_void, iovec, off_t, preadv2};
use std::{
os::unix::prelude::AsRawFd,
sync::atomic::{AtomicBool, Ordering},
};

static NONBLOCKING_READ_SUPPORTED: AtomicBool = AtomicBool::new(true);

pub(crate) fn try_nonblocking_read(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
if !NONBLOCKING_READ_SUPPORTED.load(Ordering::Relaxed) {
return None;
}
let out = preadv2_safe(file, dst, -1, libc::RWF_NOWAIT);
if let Err(err) = &out {
match err.raw_os_error() {
Some(libc::ENOSYS) => {
NONBLOCKING_READ_SUPPORTED.store(false, Ordering::Relaxed);
return None;
}
Some(libc::ENOTSUP) | Some(libc::EAGAIN) => return None,
_ => {}
}
}
Some(out)
}

fn preadv2_safe(
file: &crate::fs::sys::File,
dst: &mut ReadBuf<'_>,
offset: off_t,
flags: c_int,
) -> std::io::Result<()> {
unsafe {
/* We're manually have to defend against buffer overflows here. The slice API makes
* this fairly streightforward. */
let unfilled = dst.unfilled_mut();
let iov = iovec {
iov_base: unfilled.as_mut_ptr() as *mut c_void,
iov_len: unfilled.len(),
};
/* We take a File object rather than an fd as reading from a sensitive fd may confuse
* other unsafe code that assumes that only they have access to that fd. */
let bytes_read = preadv2(file.as_raw_fd(), &iov as *const iovec, 1, offset, flags);
if bytes_read < 0 {
Err(std::io::Error::last_os_error())
} else {
/* preadv2 returns the number of bytes read, e.g. the number of bytes that have
* written into `unfilled`. So it's safe to assume that the data is now
* initialised */
dst.assume_init(dst.filled().len() + bytes_read as usize);
dst.advance(bytes_read as usize);
Ok(())
}
}
}
}

#[cfg(any(not(target_os = "linux"), test))]
mod read_nowait {
use crate::io::ReadBuf;

pub(crate) fn try_nonblocking_read(
_file: &crate::fs::sys::File,
_dst: &mut ReadBuf<'_>,
) -> Option<std::io::Result<()>> {
None
}
}

0 comments on commit 755a704

Please sign in to comment.