Skip to content

Commit

Permalink
Add back preadv2 optimization for try_acquire on Linux
Browse files Browse the repository at this point in the history
Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Apr 22, 2024
1 parent 66782f5 commit c65cd8e
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,33 @@ impl Client {
pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
let mut buf = [0];

// On Linux, we can use preadv2 to do non-blocking read,
// even if `O_NONBLOCK` is not set.
//
// TODO: musl libc supports preadv2 since 1.2.5, but `libc` crate
// hasn't yet added it.
#[cfg(target_os = "linux")]
{
let read = self.read().as_raw_fd();
loop {
match linux::non_blocking_read(read, &mut buf) {
Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
Ok(_) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"early EOF on jobserver pipe",
))
}

Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None),
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) if e.kind() == io::ErrorKind::Unsupported => break,

Err(err) => return Err(err),
}
}
}

let (mut fifo, is_non_blocking) = match self {
Self::Fifo {
file,
Expand Down Expand Up @@ -368,6 +395,82 @@ impl Client {
}
}

// This should be available for all linux targets,
// though only [`non_blocking_read`] currently uses it so adding gnu cfg.
#[cfg(target_os = "linux")]
mod linux {
use super::*;

use libc::{iovec, off_t, ssize_t, syscall, SYS_preadv2};

fn cvt_ssize(t: ssize_t) -> io::Result<ssize_t> {
if t == -1 {
Err(io::Error::last_os_error())
} else {
Ok(t)
}
}

unsafe fn preadv2(
fd: c_int,
iov: *const iovec,
iovcnt: c_int,
offset: off_t,
flags: c_int,
) -> ssize_t {
#[cfg(all(target_arch = "x86_64", target_pointer_width = "64"))]
let res = syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0, flags);

#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))]
let res = syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0, flags);

#[cfg(not(target_arch = "x86_64"))]
let res = syscall(
SYS_preadv2,
fd,
iov,
iovcnt,
offset as libc::c_long,
((offset as u64) >> 32) as libc::c_long,
flags,
);

res.try_into().unwrap()
}

pub fn non_blocking_read(fd: c_int, buf: &mut [u8]) -> io::Result<usize> {
static IS_NONBLOCKING_READ_UNSUPPORTED: AtomicBool = AtomicBool::new(false);

if IS_NONBLOCKING_READ_UNSUPPORTED.load(Ordering::Relaxed) {
return Err(io::ErrorKind::Unsupported.into());
}

match cvt_ssize(unsafe {
preadv2(
fd,
&iovec {
iov_base: buf.as_ptr() as *mut _,
iov_len: buf.len(),
},
1,
-1,
libc::RWF_NOWAIT,
)
}) {
Ok(cnt) => Ok(cnt.try_into().unwrap()),
Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => {
IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed);
Err(io::ErrorKind::Unsupported.into())
}
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed);
Err(err)
}
Err(err) => Err(err),
}
}
}

#[derive(Debug)]
pub struct Helper {
thread: JoinHandle<()>,
Expand Down

0 comments on commit c65cd8e

Please sign in to comment.