Skip to content

Commit

Permalink
std: Don't spawn threads in wait_with_output
Browse files Browse the repository at this point in the history
Semantically there's actually no reason for us to spawn threads as part of the
call to `wait_with_output`, and that's generally an incredibly heavyweight
operation for just reading a few bytes (especially when stderr probably rarely
has bytes!). An equivalent operation in terms of what's implemented today would
be to just drain both pipes of all contents and then call `wait` on the child
process itself.

On Unix we can implement this through some convenient use of the `select`
function, whereas on Windows we can make use of overlapped I/O. Note that on
Windows this requires us to use named pipes instead of anonymous pipes, but
they're semantically the same under the hood.
  • Loading branch information
alexcrichton committed Mar 9, 2016
1 parent 6afa32a commit 7c3038f
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 34 deletions.
36 changes: 20 additions & 16 deletions src/libstd/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use fmt;
use io;
use path::Path;
use str;
use sys::pipe::AnonPipe;
use sys::pipe::{read2, AnonPipe};
use sys::process as imp;
use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner};
use thread::{self, JoinHandle};

/// Representation of a running or exited child process.
///
Expand Down Expand Up @@ -503,24 +502,29 @@ impl Child {
#[stable(feature = "process", since = "1.0.0")]
pub fn wait_with_output(mut self) -> io::Result<Output> {
drop(self.stdin.take());
fn read<R>(mut input: R) -> JoinHandle<io::Result<Vec<u8>>>
where R: Read + Send + 'static
{
thread::spawn(move || {
let mut ret = Vec::new();
input.read_to_end(&mut ret).map(|_| ret)
})

let (mut stdout, mut stderr) = (Vec::new(), Vec::new());
match (self.stdout.take(), self.stderr.take()) {
(None, None) => {}
(Some(mut out), None) => {
let res = out.read_to_end(&mut stdout);
res.unwrap();
}
(None, Some(mut err)) => {
let res = err.read_to_end(&mut stderr);
res.unwrap();
}
(Some(out), Some(err)) => {
let res = read2(out.inner, &mut stdout, err.inner, &mut stderr);
res.unwrap();
}
}
let stdout = self.stdout.take().map(read);
let stderr = self.stderr.take().map(read);
let status = try!(self.wait());
let stdout = stdout.and_then(|t| t.join().unwrap().ok());
let stderr = stderr.and_then(|t| t.join().unwrap().ok());

let status = try!(self.wait());
Ok(Output {
status: status,
stdout: stdout.unwrap_or(Vec::new()),
stderr: stderr.unwrap_or(Vec::new()),
stdout: stdout,
stderr: stderr,
})
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/libstd/sys/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![unstable(reason = "not public", issue = "0", feature = "fd")]

use prelude::v1::*;

use io::{self, Read};
Expand Down Expand Up @@ -75,6 +77,20 @@ impl FileDesc {
}
}

pub fn set_nonblocking(&self, nonblocking: bool) {
unsafe {
let previous = libc::fcntl(self.fd, libc::F_GETFL);
debug_assert!(previous != -1);
let new = if nonblocking {
previous | libc::O_NONBLOCK
} else {
previous & !libc::O_NONBLOCK
};
let ret = libc::fcntl(self.fd, libc::F_SETFL, new);
debug_assert!(ret != -1);
}
}

pub fn duplicate(&self) -> io::Result<FileDesc> {
// We want to atomically duplicate this file descriptor and set the
// CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This
Expand Down Expand Up @@ -126,7 +142,6 @@ impl FileDesc {
}
}

#[unstable(reason = "not public", issue = "0", feature = "fd_read")]
impl<'a> Read for &'a FileDesc {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(**self).read(buf)
Expand Down
55 changes: 55 additions & 0 deletions src/libstd/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use prelude::v1::*;

use cmp;
use io;
use libc::{self, c_int};
use mem;
use sys::cvt_r;
use sys::fd::FileDesc;

Expand Down Expand Up @@ -68,3 +72,54 @@ impl AnonPipe {
pub fn fd(&self) -> &FileDesc { &self.0 }
pub fn into_fd(self) -> FileDesc { self.0 }
}

pub fn read2(p1: AnonPipe,
v1: &mut Vec<u8>,
p2: AnonPipe,
v2: &mut Vec<u8>) -> io::Result<()> {
// Set both pipes into nonblocking mode as we're gonna be reading from both
// in the `select` loop below, and we wouldn't want one to block the other!
let p1 = p1.into_fd();
let p2 = p2.into_fd();
p1.set_nonblocking(true);
p2.set_nonblocking(true);

let max = cmp::max(p1.raw(), p2.raw());
loop {
// wait for either pipe to become readable using `select`
try!(cvt_r(|| unsafe {
let mut read: libc::fd_set = mem::zeroed();
libc::FD_SET(p1.raw(), &mut read);
libc::FD_SET(p2.raw(), &mut read);
libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _,
0 as *mut _)
}));

// Read as much as we can from each pipe, ignoring EWOULDBLOCK or
// EAGAIN. If we hit EOF, then this will happen because the underlying
// reader will return Ok(0), in which case we'll see `Ok` ourselves. In
// this case we flip the other fd back into blocking mode and read
// whatever's leftover on that file descriptor.
let read = |fd: &FileDesc, dst: &mut Vec<u8>| {
match fd.read_to_end(dst) {
Ok(_) => Ok(true),
Err(e) => {
if e.raw_os_error() == Some(libc::EWOULDBLOCK) ||
e.raw_os_error() == Some(libc::EAGAIN) {
Ok(false)
} else {
Err(e)
}
}
}
};
if try!(read(&p1, v1)) {
p2.set_nonblocking(false);
return p2.read_to_end(v2).map(|_| ());
}
if try!(read(&p2, v2)) {
p1.set_nonblocking(false);
return p1.read_to_end(v1).map(|_| ());
}
}
}
2 changes: 1 addition & 1 deletion src/libstd/sys/unix/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ mod tests {
cmd.stdin(Stdio::MakePipe);
cmd.stdout(Stdio::MakePipe);

let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null));
let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true));
let stdin_write = pipes.stdin.take().unwrap();
let stdout_read = pipes.stdout.take().unwrap();

Expand Down
38 changes: 34 additions & 4 deletions src/libstd/sys/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#![allow(bad_style)]
#![cfg_attr(test, allow(dead_code))]
#![unstable(issue = "0", feature = "windows_c")]

use os::raw::{c_int, c_uint, c_ulong, c_long, c_longlong, c_ushort,};
use os::raw::{c_char, c_ulonglong};
Expand Down Expand Up @@ -181,13 +182,15 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3;
pub const ERROR_ACCESS_DENIED: DWORD = 5;
pub const ERROR_INVALID_HANDLE: DWORD = 6;
pub const ERROR_NO_MORE_FILES: DWORD = 18;
pub const ERROR_HANDLE_EOF: DWORD = 38;
pub const ERROR_BROKEN_PIPE: DWORD = 109;
pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120;
pub const ERROR_INSUFFICIENT_BUFFER: DWORD = 122;
pub const ERROR_ALREADY_EXISTS: DWORD = 183;
pub const ERROR_NO_DATA: DWORD = 232;
pub const ERROR_ENVVAR_NOT_FOUND: DWORD = 203;
pub const ERROR_OPERATION_ABORTED: DWORD = 995;
pub const ERROR_IO_PENDING: DWORD = 997;
pub const ERROR_TIMEOUT: DWORD = 0x5B4;

pub const INVALID_HANDLE_VALUE: HANDLE = !0 as HANDLE;
Expand Down Expand Up @@ -292,6 +295,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING |
EXCEPTION_TARGET_UNWIND |
EXCEPTION_COLLIDED_UNWIND;

pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001;
pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000;
pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000;
pub const PIPE_WAIT: DWORD = 0x00000000;
pub const PIPE_TYPE_BYTE: DWORD = 0x00000000;
pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008;
pub const PIPE_READMODE_BYTE: DWORD = 0x00000000;

#[repr(C)]
#[cfg(target_arch = "x86")]
pub struct WSADATA {
Expand Down Expand Up @@ -913,10 +924,6 @@ extern "system" {
nOutBufferSize: DWORD,
lpBytesReturned: LPDWORD,
lpOverlapped: LPOVERLAPPED) -> BOOL;
pub fn CreatePipe(hReadPipe: LPHANDLE,
hWritePipe: LPHANDLE,
lpPipeAttributes: LPSECURITY_ATTRIBUTES,
nSize: DWORD) -> BOOL;
pub fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES,
dwStackSize: SIZE_T,
lpStartAddress: extern "system" fn(*mut c_void)
Expand Down Expand Up @@ -1129,6 +1136,29 @@ extern "system" {
OriginalContext: *const CONTEXT,
HistoryTable: *const UNWIND_HISTORY_TABLE);
pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME);

pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES,
bManualReset: BOOL,
bInitialState: BOOL,
lpName: LPCWSTR) -> HANDLE;
pub fn WaitForMultipleObjects(nCount: DWORD,
lpHandles: *const HANDLE,
bWaitAll: BOOL,
dwMilliseconds: DWORD) -> DWORD;
pub fn CreateNamedPipeW(lpName: LPCWSTR,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
lpSecurityAttributes: LPSECURITY_ATTRIBUTES)
-> HANDLE;
pub fn CancelIo(handle: HANDLE) -> BOOL;
pub fn GetOverlappedResult(hFile: HANDLE,
lpOverlapped: LPOVERLAPPED,
lpNumberOfBytesTransferred: LPDWORD,
bWait: BOOL) -> BOOL;
}

// Functions that aren't available on Windows XP, but we still use them and just
Expand Down
70 changes: 69 additions & 1 deletion src/libstd/sys/windows/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![unstable(issue = "0", feature = "windows_handle")]

use prelude::v1::*;

use cmp;
Expand Down Expand Up @@ -42,6 +44,20 @@ impl Handle {
Handle(RawHandle::new(handle))
}

pub fn new_event(manual: bool, init: bool) -> io::Result<Handle> {
unsafe {
let event = c::CreateEventW(0 as *mut _,
manual as c::BOOL,
init as c::BOOL,
0 as *const _);
if event.is_null() {
Err(io::Error::last_os_error())
} else {
Ok(Handle::new(event))
}
}
}

pub fn into_raw(self) -> c::HANDLE {
let ret = self.raw();
mem::forget(self);
Expand Down Expand Up @@ -90,6 +106,59 @@ impl RawHandle {
}
}

pub unsafe fn read_overlapped(&self,
buf: &mut [u8],
overlapped: *mut c::OVERLAPPED)
-> io::Result<Option<usize>> {
let len = cmp::min(buf.len(), <c::DWORD>::max_value() as usize) as c::DWORD;
let mut amt = 0;
let res = cvt({
c::ReadFile(self.0, buf.as_ptr() as c::LPVOID,
len, &mut amt, overlapped)
});
match res {
Ok(_) => Ok(Some(amt as usize)),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) {
Ok(None)
} else if e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) {
Ok(Some(0))
} else {
Err(e)
}
}
}
}

pub fn overlapped_result(&self,
overlapped: *mut c::OVERLAPPED,
wait: bool) -> io::Result<usize> {
unsafe {
let mut bytes = 0;
let wait = if wait {c::TRUE} else {c::FALSE};
let res = cvt({
c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait)
});
match res {
Ok(_) => Ok(bytes as usize),
Err(e) => {
if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) ||
e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) {
Ok(0)
} else {
Err(e)
}
}
}
}
}

pub fn cancel_io(&self) -> io::Result<()> {
unsafe {
cvt(c::CancelIo(self.raw())).map(|_| ())
}
}

pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
let mut me = self;
(&mut me).read_to_end(buf)
Expand Down Expand Up @@ -120,7 +189,6 @@ impl RawHandle {
}
}

#[unstable(reason = "not public", issue = "0", feature = "fd_read")]
impl<'a> Read for &'a RawHandle {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(**self).read(buf)
Expand Down
2 changes: 2 additions & 0 deletions src/libstd/sys/windows/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![unstable(issue = "0", feature = "windows_net")]

use prelude::v1::*;

use cmp;
Expand Down
Loading

1 comment on commit 7c3038f

@cheako
Copy link

@cheako cheako commented on 7c3038f Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: Sorry, I misunderstood... I thought the streams were being combined.

This is an incorrect implementation on Unix. You're supposed to create a single pipe pair and dup(A kernel syscall) the write end. Typically, you'll be calling dup2 to setup FDs 1 and 2, and that's how bash does this for 2>&1, just calling dup2(1,2) during the redirection phase.

On Windows I'm unsure if this is correct or nice, do you do any handling of line buffering? Appending lines is supposed to be an atomic operation, so a partially read line from stdout *should not intermingle with data from stderr... and it looks like there would be a single buffer to hold two partial lines, not good. Though I haven't run Windows in over two decades so I don't really care if you fix that.

  • I hope I don't need to explain why this is how POSIX defined this behavior.

Please sign in to comment.