Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Client::configure* on unix #100

Merged
merged 5 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ mod test {
client.try_acquire().unwrap().unwrap();
}

#[cfg(not(unix))]
#[cfg(windows)]
#[test]
fn test_try_acquire() {
let client = Client::new(0).unwrap();
Expand Down
224 changes: 131 additions & 93 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::{self, Read, Write};
use std::mem;
use std::mem::MaybeUninit;
use std::os::unix::prelude::*;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::process::Command;
use std::ptr;
use std::sync::{
Expand All @@ -17,17 +17,27 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;

#[derive(Debug)]
pub enum Client {
/// `--jobserver-auth=R,W`
Pipe { read: File, write: File },
/// `--jobserver-auth=fifo:PATH`
Fifo {
file: File,
path: PathBuf,
/// it can only go from false -> true but not the other way around, since that
/// could cause a race condition.
is_non_blocking: AtomicBool,
},
/// This preserves the `--jobserver-auth` type at creation time,
/// so auth type will be passed down to and inherit from sub-Make processes correctly.
///
/// See <https://github.com/rust-lang/jobserver-rs/issues/99> for details.
enum ClientCreationArg {
NobodyXu marked this conversation as resolved.
Show resolved Hide resolved
Fds { read: c_int, write: c_int },
NobodyXu marked this conversation as resolved.
Show resolved Hide resolved
Fifo(Box<Path>),
}

#[derive(Debug)]
pub struct Client {
read: File,
write: File,
creation_arg: ClientCreationArg,
/// It is set to `None` if the pipe is shared with other processes, so it
/// cannot support non-blocking mode.
///
/// If it is set to `Some`, then it can only go from
/// `Some(false)` -> `Some(true)` but not the other way around,
/// since that could cause a race condition.
is_non_blocking: Option<AtomicBool>,
NobodyXu marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug)]
Expand All @@ -43,7 +53,7 @@ impl Client {
// wrong!
const BUFFER: [u8; 128] = [b'|'; 128];

let mut write = client.write();
let mut write = &client.write;

set_nonblocking(write.as_raw_fd(), true)?;

Expand Down Expand Up @@ -111,16 +121,24 @@ impl Client {
FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
})?;
let path = Path::new(path_str);
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))?;

Ok(Some(Client::Fifo {
file,
path: path.into(),
is_non_blocking: AtomicBool::new(false),

let open_file = || {
// Opening with read write is necessary, since opening with
// read-only or write-only could block the thread until another
// thread opens it with write-only or read-only (or RDWR)
// correspondingly.
OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
};

Ok(Some(Client {
read: open_file()?,
write: open_file()?,
creation_arg: ClientCreationArg::Fifo(path.into()),
is_non_blocking: Some(AtomicBool::new(false)),
}))
}

Expand Down Expand Up @@ -148,6 +166,8 @@ impl Client {
return Err(FromEnvErrorInner::NegativeFd(write));
}

let creation_arg = ClientCreationArg::Fds { read, write };

// Ok so we've got two integers that look like file descriptors, but
// for extra sanity checking let's see if they actually look like
// valid files and instances of a pipe if feature enabled before we
Expand All @@ -174,40 +194,36 @@ impl Client {
//
// I tested this on macOS 14 and Linux 6.5.13
#[cfg(target_os = "linux")]
if let Ok(Some(jobserver)) =
Self::from_fifo(&format!("fifo:/dev/fd/{}", read.as_raw_fd()))
{
return Ok(Some(jobserver));
if let (Ok(read), Ok(write)) = (
File::open(format!("/dev/fd/{}", read)),
OpenOptions::new()
.write(true)
.open(format!("/dev/fd/{}", write)),
) {
return Ok(Some(Client {
read,
write,
creation_arg,
is_non_blocking: Some(AtomicBool::new(false)),
}));
}
}
}

Ok(Some(Client::Pipe {
Ok(Some(Client {
read: clone_fd_and_set_cloexec(read)?,
write: clone_fd_and_set_cloexec(write)?,
creation_arg,
is_non_blocking: None,
}))
}

unsafe fn from_fds(read: c_int, write: c_int) -> Client {
Client::Pipe {
Client {
read: File::from_raw_fd(read),
write: File::from_raw_fd(write),
}
}

/// Gets the read end of our jobserver client.
fn read(&self) -> &File {
match self {
Client::Pipe { read, .. } => read,
Client::Fifo { file, .. } => file,
}
}

/// Gets the write end of our jobserver client.
fn write(&self) -> &File {
match self {
Client::Pipe { write, .. } => write,
Client::Fifo { file, .. } => file,
creation_arg: ClientCreationArg::Fds { read, write },
is_non_blocking: None,
}
}

Expand Down Expand Up @@ -245,7 +261,7 @@ impl Client {
// to shut us down, so we otherwise punt all errors upwards.
unsafe {
let mut fd: libc::pollfd = mem::zeroed();
let mut read = self.read();
let mut read = &self.read;
fd.fd = read.as_raw_fd();
fd.events = libc::POLLIN;
loop {
Expand Down Expand Up @@ -284,19 +300,15 @@ impl Client {

pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
let mut buf = [0];
let mut fifo = &self.read;

let (mut fifo, is_non_blocking) = match self {
Self::Fifo {
file,
is_non_blocking,
..
} => (file, is_non_blocking),
_ => return Err(io::ErrorKind::Unsupported.into()),
};

if !is_non_blocking.load(Ordering::Relaxed) {
set_nonblocking(fifo.as_raw_fd(), true)?;
is_non_blocking.store(true, Ordering::Relaxed);
if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
if !is_non_blocking.load(Ordering::Relaxed) {
set_nonblocking(fifo.as_raw_fd(), true)?;
is_non_blocking.store(true, Ordering::Relaxed);
}
} else {
NobodyXu marked this conversation as resolved.
Show resolved Hide resolved
return Err(io::ErrorKind::Unsupported.into());
}

loop {
Expand All @@ -323,7 +335,7 @@ impl Client {
// always quickly release a token). If that turns out to not be the
// case we'll get an error anyway!
let byte = data.map(|d| d.byte).unwrap_or(b'+');
match self.write().write(&[byte])? {
match (&self.write).write(&[byte])? {
1 => Ok(()),
_ => Err(io::Error::new(
io::ErrorKind::Other,
Expand All @@ -333,31 +345,30 @@ impl Client {
}

pub fn string_arg(&self) -> String {
match self {
Client::Pipe { read, write } => format!("{},{}", read.as_raw_fd(), write.as_raw_fd()),
Client::Fifo { path, .. } => format!("fifo:{}", path.to_str().unwrap()),
match &self.creation_arg {
ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
}
}

pub fn available(&self) -> io::Result<usize> {
let mut len = MaybeUninit::<c_int>::uninit();
cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
Ok(unsafe { len.assume_init() } as usize)
}

pub fn configure(&self, cmd: &mut Command) {
match self {
if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are passing the fds specified in the environment, and we always clone the fds before using them or setting cloexec, I suppose we could remove this function completely? cc @the8472

This should improve the performance dramatically on platform with vfork available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should happen in a separate PR since we need to consider both the environment and the file descriptors together, basically #64 but also the role of client.configure. It'd be a breaking change of that API. Right now the docs say that only children built with this method should inherit the jobserver.

The goal of this PR is to fix the fd to fifo conversion and I think it's already big enough for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it thanks

// We `File::open`ed it when inheriting from environment,
// so no need to set cloexec for fifo.
Client::Fifo { .. } => return,
Client::Pipe { .. } => {}
};
return;
}
// Here we basically just want to say that in the child process
// we'll configure the read/write file descriptors to *not* be
// cloexec, so they're inherited across the exec and specified as
// integers through `string_arg` above.
let read = self.read().as_raw_fd();
let write = self.write().as_raw_fd();
let read = self.read.as_raw_fd();
let write = self.write.as_raw_fd();
unsafe {
cmd.pre_exec(move || {
set_cloexec(read, false)?;
Expand Down Expand Up @@ -559,55 +570,82 @@ mod test {

use crate::{test::run_named_fifo_try_acquire_tests, Client};

use std::sync::Arc;
use std::{
fs::File,
io::{self, Write},
os::unix::io::AsRawFd,
sync::Arc,
};

fn from_imp_client(imp: ClientImp) -> Client {
Client {
inner: Arc::new(imp),
}
}

#[test]
fn test_try_acquire_named_fifo() {
fn new_client_from_fifo() -> (Client, String) {
let file = tempfile::NamedTempFile::new().unwrap();
let fifo_path = file.path().to_owned();
file.close().unwrap(); // Remove the NamedTempFile to create fifo

nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();

let client = ClientImp::from_fifo(&format!("fifo:{}", fifo_path.to_str().unwrap()))
.unwrap()
.map(from_imp_client)
.unwrap();
let arg = format!("fifo:{}", fifo_path.to_str().unwrap());

run_named_fifo_try_acquire_tests(&client);
(
ClientImp::from_fifo(&arg)
.unwrap()
.map(from_imp_client)
.unwrap(),
arg,
)
}

#[cfg(not(target_os = "linux"))]
#[test]
fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
use std::{
fs::File,
io::{self, Write},
os::unix::io::AsRawFd,
};

fn new_client_from_pipe() -> (Client, String) {
let (read, write) = nix::unistd::pipe().unwrap();
let read = File::from(read);
let mut write = File::from(write);

write.write_all(b"1").unwrap();

let client = unsafe {
ClientImp::from_pipe(&format!("{},{}", read.as_raw_fd(), write.as_raw_fd()), true)
}
.unwrap()
.map(from_imp_client)
.unwrap();
let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());

(
unsafe { ClientImp::from_pipe(&arg, true) }
.unwrap()
.map(from_imp_client)
.unwrap(),
arg,
)
}

#[test]
fn test_try_acquire_named_fifo() {
run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
}

#[test]
fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
#[cfg(not(target_os = "linux"))]
assert_eq!(
client.try_acquire().unwrap_err().kind(),
new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
io::ErrorKind::Unsupported
);

#[cfg(target_os = "linux")]
{
let client = new_client_from_pipe().0;
client.acquire().unwrap().drop_without_releasing();
run_named_fifo_try_acquire_tests(&client);
}
}

#[test]
fn test_string_arg() {
let (client, arg) = new_client_from_fifo();
assert_eq!(client.inner.string_arg(), arg);

let (client, arg) = new_client_from_pipe();
assert_eq!(client.inner.string_arg(), arg);
}
}