Skip to content

Commit

Permalink
std: Don't spawn threads in wait_with_output
Browse files Browse the repository at this point in the history
Semantically there's actually no reason for us to spawn threads as part of the
call to `wait_with_output`, and that's generally an incredibly heavyweight
operation for just reading a few bytes (especially when stderr probably rarely
has bytes!). An equivalent operation in terms of what's implemented today would
be to just drain both pipes of all contents and then call `wait` on the child
process itself.

On Unix we can implement this through some convenient use of the `select`
function, whereas on Windows we can make use of overlapped I/O. Note that on
Windows this requires us to use named pipes instead of anonymous pipes, but
they're semantically the same under the hood.
  • Loading branch information
alexcrichton committed Feb 12, 2016
1 parent 616ca6a commit 05c188d
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 29 deletions.
36 changes: 20 additions & 16 deletions src/libstd/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use fmt;
use io;
use path::Path;
use str;
use sys::pipe::AnonPipe;
use sys::pipe::{read2, AnonPipe};
use sys::process as imp;
use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner};
use thread::{self, JoinHandle};

/// Representation of a running or exited child process.
///
Expand Down Expand Up @@ -503,24 +502,29 @@ impl Child {
#[stable(feature = "process", since = "1.0.0")]
pub fn wait_with_output(mut self) -> io::Result<Output> {
drop(self.stdin.take());
fn read<R>(mut input: R) -> JoinHandle<io::Result<Vec<u8>>>
where R: Read + Send + 'static
{
thread::spawn(move || {
let mut ret = Vec::new();
input.read_to_end(&mut ret).map(|_| ret)
})

let (mut stdout, mut stderr) = (Vec::new(), Vec::new());
match (self.stdout.take(), self.stderr.take()) {
(None, None) => {}
(Some(mut out), None) => {
let res = out.read_to_end(&mut stdout);
debug_assert!(res.is_ok());
}
(None, Some(mut err)) => {
let res = err.read_to_end(&mut stderr);
debug_assert!(res.is_ok());
}
(Some(out), Some(err)) => {
let res = read2(out.inner, &mut stdout, err.inner, &mut stderr);
debug_assert!(res.is_ok());
}
}
let stdout = self.stdout.take().map(read);
let stderr = self.stderr.take().map(read);
let status = try!(self.wait());
let stdout = stdout.and_then(|t| t.join().unwrap().ok());
let stderr = stderr.and_then(|t| t.join().unwrap().ok());

let status = try!(self.wait());
Ok(Output {
status: status,
stdout: stdout.unwrap_or(Vec::new()),
stderr: stderr.unwrap_or(Vec::new()),
stdout: stdout,
stderr: stderr,
})
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/libstd/sys/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ impl FileDesc {
}
}

pub fn set_nonblocking(&self, nonblocking: bool) {
unsafe {
let previous = libc::fcntl(self.fd, libc::F_GETFL);
debug_assert!(previous != -1);
let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};
let ret = libc::fcntl(self.fd, libc::F_SETFL, new);
debug_assert!(ret != -1);
}
}

pub fn duplicate(&self) -> io::Result<FileDesc> {
// We want to atomically duplicate this file descriptor and set the
// CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This
Expand Down
55 changes: 55 additions & 0 deletions src/libstd/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use prelude::v1::*;

use cmp;
use io;
use libc::{self, c_int};
use mem;
use sys::cvt_r;
use sys::fd::FileDesc;

Expand Down Expand Up @@ -68,3 +72,54 @@ impl AnonPipe {
pub fn fd(&self) -> &FileDesc { &self.0 }
pub fn into_fd(self) -> FileDesc { self.0 }
}

pub fn read2(p1: AnonPipe,
v1: &mut Vec<u8>,
p2: AnonPipe,
v2: &mut Vec<u8>) -> io::Result<()> {
// Set both pipes into nonblocking mode as we're gonna be reading from both
// in the `select` loop below, and we wouldn't want one to block the other!
let p1 = p1.into_fd();
let p2 = p2.into_fd();
p1.set_nonblocking(true);
p2.set_nonblocking(true);

let max = cmp::max(p1.raw(), p2.raw());
loop {
// wait for either pipe to become readable using `select`
try!(cvt_r(|| unsafe {
let mut read: libc::fd_set = mem::zeroed();
libc::FD_SET(p1.raw(), &mut read);
libc::FD_SET(p2.raw(), &mut read);
libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _,
0 as *mut _)
}));

// Read as much as we can from each pipe, ignoring EWOULDBLOCK or
// EAGAIN. If we hit EOF, then this will happen because the underlying
// reader will return Ok(0), in which case we'll see `Ok` ourselves. In
// this case we flip the other fd back into blocking mode and read
// whatever's leftover on that file descriptor.
let read = |fd: &FileDesc, dst: &mut Vec<u8>| {
match fd.read_to_end(dst) {
Ok(_) => Ok(true),
Err(e) => {
if e.raw_os_error() == Some(libc::EWOULDBLOCK) ||
e.raw_os_error() == Some(libc::EAGAIN) {
Ok(false)
} else {
Err(e)
}
}
}
};
if try!(read(&p1, v1)) {
p2.set_nonblocking(false);
return p2.read_to_end(v2);
}
if try!(read(&p2, v2)) {
p1.set_nonblocking(false);
return p1.read_to_end(v1);
}
}
}
2 changes: 1 addition & 1 deletion src/libstd/sys/unix/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ mod tests {
cmd.stdin(Stdio::MakePipe);
cmd.stdout(Stdio::MakePipe);

let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null));
let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true));
let stdin_write = pipes.stdin.take().unwrap();
let stdout_read = pipes.stdout.take().unwrap();

Expand Down
32 changes: 32 additions & 0 deletions src/libstd/sys/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3;
pub const ERROR_ACCESS_DENIED: DWORD = 5;
pub const ERROR_INVALID_HANDLE: DWORD = 6;
pub const ERROR_NO_MORE_FILES: DWORD = 18;
pub const ERROR_HANDLE_EOF: DWORD = 38;
pub const ERROR_BROKEN_PIPE: DWORD = 109;
pub const ERROR_DISK_FULL: DWORD = 112;
pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120;
Expand Down Expand Up @@ -361,6 +362,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING |
EXCEPTION_TARGET_UNWIND |
EXCEPTION_COLLIDED_UNWIND;

pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001;
pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000;
pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000;
pub const PIPE_WAIT: DWORD = 0x00000000;
pub const PIPE_TYPE_BYTE: DWORD = 0x00000000;
pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008;
pub const PIPE_READMODE_BYTE: DWORD = 0x00000000;

#[repr(C)]
#[cfg(target_arch = "x86")]
pub struct WSADATA {
Expand Down Expand Up @@ -1261,6 +1270,29 @@ extern "system" {
OriginalContext: *const CONTEXT,
HistoryTable: *const UNWIND_HISTORY_TABLE);
pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME);

pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR) -> HANDLE;
pub fn WaitForMultipleObjects(nCount: DWORD,
lpHandles: *const HANDLE,
bWaitAll: BOOL,
dwMilliseconds: DWORD) -> DWORD;
pub fn CreateNamedPipeW(lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPSECURITY_ATTRIBUTES)
-> HANDLE;
pub fn CancelIo(handle: HANDLE) -> BOOL;
pub fn GetOverlappedResult(hFile: HANDLE,
lpOverlapped: LPOVERLAPPED,
lpNumberOfBytesTransferred: LPDWORD,
bWait: BOOL) -> BOOL;
}

// Functions that aren't available on Windows XP, but we still use them and just
Expand Down
53 changes: 51 additions & 2 deletions src/libstd/sys/windows/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl RawHandle {
let mut read = 0;
let res = cvt(unsafe {
c::ReadFile(self.0, buf.as_ptr() as c::LPVOID,
buf.len() as c::DWORD, &mut read,
ptr::null_mut())
buf.len() as c::DWORD, &mut read,
0 as *mut _)
});

match res {
Expand All @@ -86,6 +86,55 @@ impl RawHandle {
}
}

pub unsafe fn read_overlapped(&self,
buf: &mut [u8],
overlapped: *mut c::OVERLAPPED)
-> io::Result<bool> {
let res = cvt({
c::ReadFile(self.0, buf.as_ptr() as c::LPVOID,
buf.len() as c::DWORD, 0 as *mut _,
overlapped)
});
match res {
Ok(_) => Ok(true),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) {
Ok(false)
} else {
Err(e)
}
}
}
}

pub fn overlapped_result(&self,
overlapped: *mut c::OVERLAPPED,
wait: bool) -> io::Result<usize> {
unsafe {
let mut bytes = 0;
let wait = if wait {c::TRUE} else {c::FALSE};
let res = cvt({
c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait)
});
match res {
Ok(_) => Ok(bytes as usize),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) {
Ok(0)
} else {
Err(e)
}
}
}
}
}

pub fn cancel_io(&self) -> io::Result<()> {
unsafe {
cvt(c::CancelIo(self.raw())).map(|_| ())
}
}

pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
let mut me = self;
(&mut me).read_to_end(buf)
Expand Down
Loading

0 comments on commit 05c188d

Please sign in to comment.