diff --git a/src/unix.rs b/src/unix.rs index 8f3905e..4d5a80e 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -285,6 +285,30 @@ impl Client { pub fn try_acquire(&self) -> io::Result> { let mut buf = [0]; + // On Linux, we can use preadv2 to do non-blocking read, + // even if `O_NONBLOCK` is not set. + #[cfg(any(target_os = "linux", target_os = "android"))] + { + let read = self.read().as_raw_fd(); + loop { + match linux::non_blocking_read(read, &mut buf) { + Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), + Ok(_) => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "early EOF on jobserver pipe", + )) + } + + Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None), + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) if e.kind() == io::ErrorKind::Unsupported => break, + + Err(err) => return Err(err), + } + } + } + let (mut fifo, is_non_blocking) = match self { Self::Fifo { file, @@ -368,6 +392,73 @@ impl Client { } } +#[cfg(any(target_os = "linux", target_os = "android"))] +mod linux { + use super::*; + + use libc::{iovec, off_t, ssize_t, syscall, SYS_preadv2}; + + // TODO: Replace this with libc::RWF_NOWAIT once they have it for musl + // targets + const RWF_NOWAIT: c_int = 0x00000008; + + fn cvt_ssize(t: ssize_t) -> io::Result { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } + } + + fn preadv2(fd: c_int, iov: &iovec) -> ssize_t { + let iovcnt: c_int = 1; + let offset: off_t = -1; + + if cfg!(all(target_arch = "x86_64", target_pointer_width = "64")) { + unsafe { syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0 as off_t, RWF_NOWAIT) } + } else if cfg!(all(target_arch = "x86_64", target_pointer_width = "32")) { + unsafe { syscall(SYS_preadv2, fd, iov, iovcnt, offset, RWF_NOWAIT) } + } else { + unsafe { + syscall( + SYS_preadv2, + fd, + iov, + iovcnt, + offset as libc::c_long, + ((offset as u64) >> 32) as libc::c_long, + RWF_NOWAIT, + ) + } + } + .try_into() + .unwrap() + } + + pub fn non_blocking_read(fd: c_int, buf: &mut [u8]) -> io::Result { + static IS_NONBLOCKING_READ_UNSUPPORTED: AtomicBool = AtomicBool::new(false); + + if IS_NONBLOCKING_READ_UNSUPPORTED.load(Ordering::Relaxed) { + return Err(io::ErrorKind::Unsupported.into()); + } + + match cvt_ssize(preadv2( + fd, + &iovec { + iov_base: buf.as_ptr() as *mut _, + iov_len: buf.len(), + }, + )) { + Ok(cnt) => Ok(cnt.try_into().unwrap()), + Err(err) if matches!(err.raw_os_error(), Some(libc::EOPNOTSUPP | libc::ENOSYS)) => { + IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed); + Err(io::ErrorKind::Unsupported.into()) + } + Err(err) => Err(err), + } + } +} + #[derive(Debug)] pub struct Helper { thread: JoinHandle<()>,