diff --git a/src/liblibc b/src/liblibc index e19309c8b4e8b..2278a549559c3 160000 --- a/src/liblibc +++ b/src/liblibc @@ -1 +1 @@ -Subproject commit e19309c8b4e8bbd11f4d84dfffd75e3d1ac477fe +Subproject commit 2278a549559c38872b4338cb002ecc2a80d860dc diff --git a/src/libstd/process.rs b/src/libstd/process.rs index fe5e49ecb09bb..5813d82a315a6 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -20,10 +20,9 @@ use fmt; use io; use path::Path; use str; -use sys::pipe::AnonPipe; +use sys::pipe::{read2, AnonPipe}; use sys::process as imp; use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner}; -use thread::{self, JoinHandle}; /// Representation of a running or exited child process. /// @@ -503,24 +502,29 @@ impl Child { #[stable(feature = "process", since = "1.0.0")] pub fn wait_with_output(mut self) -> io::Result { drop(self.stdin.take()); - fn read(mut input: R) -> JoinHandle>> - where R: Read + Send + 'static - { - thread::spawn(move || { - let mut ret = Vec::new(); - input.read_to_end(&mut ret).map(|_| ret) - }) + + let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); + match (self.stdout.take(), self.stderr.take()) { + (None, None) => {} + (Some(mut out), None) => { + let res = out.read_to_end(&mut stdout); + res.unwrap(); + } + (None, Some(mut err)) => { + let res = err.read_to_end(&mut stderr); + res.unwrap(); + } + (Some(out), Some(err)) => { + let res = read2(out.inner, &mut stdout, err.inner, &mut stderr); + res.unwrap(); + } } - let stdout = self.stdout.take().map(read); - let stderr = self.stderr.take().map(read); - let status = try!(self.wait()); - let stdout = stdout.and_then(|t| t.join().unwrap().ok()); - let stderr = stderr.and_then(|t| t.join().unwrap().ok()); + let status = try!(self.wait()); Ok(Output { status: status, - stdout: stdout.unwrap_or(Vec::new()), - stderr: stderr.unwrap_or(Vec::new()), + stdout: stdout, + stderr: stderr, }) } } diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index a00e6c3eb7254..8ec073858fd21 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(reason = "not public", issue = "0", feature = "fd")] + use prelude::v1::*; use io::{self, Read}; @@ -75,6 +77,20 @@ impl FileDesc { } } + pub fn set_nonblocking(&self, nonblocking: bool) { + unsafe { + let previous = libc::fcntl(self.fd, libc::F_GETFL); + debug_assert!(previous != -1); + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + let ret = libc::fcntl(self.fd, libc::F_SETFL, new); + debug_assert!(ret != -1); + } + } + pub fn duplicate(&self) -> io::Result { // We want to atomically duplicate this file descriptor and set the // CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This @@ -126,7 +142,6 @@ impl FileDesc { } } -#[unstable(reason = "not public", issue = "0", feature = "fd_read")] impl<'a> Read for &'a FileDesc { fn read(&mut self, buf: &mut [u8]) -> io::Result { (**self).read(buf) diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index d88193e62273b..e5cb37610011b 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + +use cmp; use io; use libc::{self, c_int}; +use mem; use sys::cvt_r; use sys::fd::FileDesc; @@ -68,3 +72,54 @@ impl AnonPipe { pub fn fd(&self) -> &FileDesc { &self.0 } pub fn into_fd(self) -> FileDesc { self.0 } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + // Set both pipes into nonblocking mode as we're gonna be reading from both + // in the `select` loop below, and we wouldn't want one to block the other! + let p1 = p1.into_fd(); + let p2 = p2.into_fd(); + p1.set_nonblocking(true); + p2.set_nonblocking(true); + + let max = cmp::max(p1.raw(), p2.raw()); + loop { + // wait for either pipe to become readable using `select` + try!(cvt_r(|| unsafe { + let mut read: libc::fd_set = mem::zeroed(); + libc::FD_SET(p1.raw(), &mut read); + libc::FD_SET(p2.raw(), &mut read); + libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _, + 0 as *mut _) + })); + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let read = |fd: &FileDesc, dst: &mut Vec| { + match fd.read_to_end(dst) { + Ok(_) => Ok(true), + Err(e) => { + if e.raw_os_error() == Some(libc::EWOULDBLOCK) || + e.raw_os_error() == Some(libc::EAGAIN) { + Ok(false) + } else { + Err(e) + } + } + } + }; + if try!(read(&p1, v1)) { + p2.set_nonblocking(false); + return p2.read_to_end(v2).map(|_| ()); + } + if try!(read(&p2, v2)) { + p1.set_nonblocking(false); + return p1.read_to_end(v1).map(|_| ()); + } + } +} diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 5696cb2b52f73..47b0ff42f9322 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -651,7 +651,7 @@ mod tests { cmd.stdin(Stdio::MakePipe); cmd.stdout(Stdio::MakePipe); - let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null)); + let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true)); let stdin_write = pipes.stdin.take().unwrap(); let stdout_read = pipes.stdout.take().unwrap(); diff --git a/src/libstd/sys/windows/c.rs b/src/libstd/sys/windows/c.rs index 472ffdf9e1d93..002ffc7c8685a 100644 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@ -12,6 +12,7 @@ #![allow(bad_style)] #![cfg_attr(test, allow(dead_code))] +#![unstable(issue = "0", feature = "windows_c")] use os::raw::{c_int, c_uint, c_ulong, c_long, c_longlong, c_ushort,}; use os::raw::{c_char, c_ulonglong}; @@ -181,6 +182,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3; pub const ERROR_ACCESS_DENIED: DWORD = 5; pub const ERROR_INVALID_HANDLE: DWORD = 6; pub const ERROR_NO_MORE_FILES: DWORD = 18; +pub const ERROR_HANDLE_EOF: DWORD = 38; pub const ERROR_BROKEN_PIPE: DWORD = 109; pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120; pub const ERROR_INSUFFICIENT_BUFFER: DWORD = 122; @@ -188,6 +190,7 @@ pub const ERROR_ALREADY_EXISTS: DWORD = 183; pub const ERROR_NO_DATA: DWORD = 232; pub const ERROR_ENVVAR_NOT_FOUND: DWORD = 203; pub const ERROR_OPERATION_ABORTED: DWORD = 995; +pub const ERROR_IO_PENDING: DWORD = 997; pub const ERROR_TIMEOUT: DWORD = 0x5B4; pub const INVALID_HANDLE_VALUE: HANDLE = !0 as HANDLE; @@ -292,6 +295,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING | EXCEPTION_TARGET_UNWIND | EXCEPTION_COLLIDED_UNWIND; +pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001; +pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; +pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000; +pub const PIPE_WAIT: DWORD = 0x00000000; +pub const PIPE_TYPE_BYTE: DWORD = 0x00000000; +pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; +pub const PIPE_READMODE_BYTE: DWORD = 0x00000000; + #[repr(C)] #[cfg(target_arch = "x86")] pub struct WSADATA { @@ -913,10 +924,6 @@ extern "system" { nOutBufferSize: DWORD, lpBytesReturned: LPDWORD, lpOverlapped: LPOVERLAPPED) -> BOOL; - pub fn CreatePipe(hReadPipe: LPHANDLE, - hWritePipe: LPHANDLE, - lpPipeAttributes: LPSECURITY_ATTRIBUTES, - nSize: DWORD) -> BOOL; pub fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: extern "system" fn(*mut c_void) @@ -1129,6 +1136,29 @@ extern "system" { OriginalContext: *const CONTEXT, HistoryTable: *const UNWIND_HISTORY_TABLE); pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME); + + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn CreateNamedPipeW(lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES) + -> HANDLE; + pub fn CancelIo(handle: HANDLE) -> BOOL; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; } // Functions that aren't available on Windows XP, but we still use them and just diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index f4b8b2754c5b6..1396d670902bb 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_handle")] + use prelude::v1::*; use cmp; @@ -42,6 +44,20 @@ impl Handle { Handle(RawHandle::new(handle)) } + pub fn new_event(manual: bool, init: bool) -> io::Result { + unsafe { + let event = c::CreateEventW(0 as *mut _, + manual as c::BOOL, + init as c::BOOL, + 0 as *const _); + if event.is_null() { + Err(io::Error::last_os_error()) + } else { + Ok(Handle::new(event)) + } + } + } + pub fn into_raw(self) -> c::HANDLE { let ret = self.raw(); mem::forget(self); @@ -90,6 +106,59 @@ impl RawHandle { } } + pub unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut c::OVERLAPPED) + -> io::Result> { + let len = cmp::min(buf.len(), ::max_value() as usize) as c::DWORD; + let mut amt = 0; + let res = cvt({ + c::ReadFile(self.0, buf.as_ptr() as c::LPVOID, + len, &mut amt, overlapped) + }); + match res { + Ok(_) => Ok(Some(amt as usize)), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) { + Ok(None) + } else if e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(Some(0)) + } else { + Err(e) + } + } + } + } + + pub fn overlapped_result(&self, + overlapped: *mut c::OVERLAPPED, + wait: bool) -> io::Result { + unsafe { + let mut bytes = 0; + let wait = if wait {c::TRUE} else {c::FALSE}; + let res = cvt({ + c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait) + }); + match res { + Ok(_) => Ok(bytes as usize), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) || + e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } + } + + pub fn cancel_io(&self) -> io::Result<()> { + unsafe { + cvt(c::CancelIo(self.raw())).map(|_| ()) + } + } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { let mut me = self; (&mut me).read_to_end(buf) @@ -120,7 +189,6 @@ impl RawHandle { } } -#[unstable(reason = "not public", issue = "0", feature = "fd_read")] impl<'a> Read for &'a RawHandle { fn read(&mut self, buf: &mut [u8]) -> io::Result { (**self).read(buf) diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index 01e3a6cd8ed8f..bb3c79c5a84fd 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_net")] + use prelude::v1::*; use cmp; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index 8c3171d2470bc..fbe38d76e9571 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -9,11 +9,16 @@ // except according to those terms. use prelude::v1::*; +use os::windows::prelude::*; +use ffi::OsStr; +use path::Path; use io; -use ptr; -use sys::cvt; +use mem; +use rand::{self, Rng}; +use slice; use sys::c; +use sys::fs::{File, OpenOptions}; use sys::handle::Handle; //////////////////////////////////////////////////////////////////////////////// @@ -25,14 +30,76 @@ pub struct AnonPipe { } pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> { - let mut reader = c::INVALID_HANDLE_VALUE; - let mut writer = c::INVALID_HANDLE_VALUE; - try!(cvt(unsafe { - c::CreatePipe(&mut reader, &mut writer, ptr::null_mut(), 0) - })); - let reader = Handle::new(reader); - let writer = Handle::new(writer); - Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer })) + // Note that we specifically do *not* use `CreatePipe` here because + // unfortunately the anonymous pipes returned do not support overlapped + // operations. + // + // Instead, we create a "hopefully unique" name and create a named pipe + // which has overlapped operations enabled. + // + // Once we do this, we connect do it as usual via `CreateFileW`, and then we + // return those reader/writer halves. + unsafe { + let reader; + let mut name; + let mut tries = 0; + loop { + tries += 1; + let key: u64 = rand::thread_rng().gen(); + name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", + c::GetCurrentProcessId(), + key); + let wide_name = OsStr::new(&name) + .encode_wide() + .chain(Some(0)) + .collect::>(); + + let handle = c::CreateNamedPipeW(wide_name.as_ptr(), + c::PIPE_ACCESS_INBOUND | + c::FILE_FLAG_FIRST_PIPE_INSTANCE | + c::FILE_FLAG_OVERLAPPED, + c::PIPE_TYPE_BYTE | + c::PIPE_READMODE_BYTE | + c::PIPE_WAIT | + c::PIPE_REJECT_REMOTE_CLIENTS, + 1, + 4096, + 4096, + 0, + 0 as *mut _); + + // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're + // also just doing a best effort at selecting a unique name. If + // ERROR_ACCESS_DENIED is returned then it could mean that we + // accidentally conflicted with an already existing pipe, so we try + // again. + // + // Don't try again too much though as this could also perhaps be a + // legit error. + if handle == c::INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + if tries < 10 && + err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) { + continue + } + return Err(err) + } + reader = Handle::new(handle); + break + } + + // Connect to the named pipe we just created in write-only mode (also + // overlapped for async I/O below). + let mut opts = OpenOptions::new(); + opts.write(true); + opts.read(false); + opts.share_mode(0); + opts.attributes(c::FILE_FLAG_OVERLAPPED); + let writer = try!(File::open(Path::new(&name), &opts)); + let writer = AnonPipe { inner: writer.into_handle() }; + + Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() })) + } } impl AnonPipe { @@ -51,3 +118,185 @@ impl AnonPipe { self.inner.write(buf) } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + let p1 = p1.into_handle(); + let p2 = p2.into_handle(); + + let mut p1 = try!(AsyncPipe::new(p1, v1)); + let mut p2 = try!(AsyncPipe::new(p2, v2)); + let objs = [p1.event.raw(), p2.event.raw()]; + + // In a loop we wait for either pipe's scheduled read operation to complete. + // If the operation completes with 0 bytes, that means EOF was reached, in + // which case we just finish out the other pipe entirely. + // + // Note that overlapped I/O is in general super unsafe because we have to + // be careful to ensure that all pointers in play are valid for the entire + // duration of the I/O operation (where tons of operations can also fail). + // The destructor for `AsyncPipe` ends up taking care of most of this. + loop { + let res = unsafe { + c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) + }; + if res == c::WAIT_OBJECT_0 { + if !try!(p1.result()) || !try!(p1.schedule_read()) { + return p2.finish() + } + } else if res == c::WAIT_OBJECT_0 + 1 { + if !try!(p2.result()) || !try!(p2.schedule_read()) { + return p1.finish() + } + } else { + return Err(io::Error::last_os_error()) + } + } +} + +struct AsyncPipe<'a> { + pipe: Handle, + event: Handle, + overlapped: Box, // needs a stable address + dst: &'a mut Vec, + state: State, +} + +#[derive(PartialEq, Debug)] +enum State { + NotReading, + Reading, + Read(usize), +} + +impl<'a> AsyncPipe<'a> { + fn new(pipe: Handle, dst: &'a mut Vec) -> io::Result> { + // Create an event which we'll use to coordinate our overlapped + // opreations, this event will be used in WaitForMultipleObjects + // and passed as part of the OVERLAPPED handle. + // + // Note that we do a somewhat clever thing here by flagging the + // event as being manually reset and setting it initially to the + // signaled state. This means that we'll naturally fall through the + // WaitForMultipleObjects call above for pipes created initially, + // and the only time an even will go back to "unset" will be once an + // I/O operation is successfully scheduled (what we want). + let event = try!(Handle::new_event(true, true)); + let mut overlapped: Box = unsafe { + Box::new(mem::zeroed()) + }; + overlapped.hEvent = event.raw(); + Ok(AsyncPipe { + pipe: pipe, + overlapped: overlapped, + event: event, + dst: dst, + state: State::NotReading, + }) + } + + /// Executes an overlapped read operation. + /// + /// Must not currently be reading, and returns whether the pipe is currently + /// at EOF or not. If the pipe is not at EOF then `result()` must be called + /// to complete the read later on (may block), but if the pipe is at EOF + /// then `result()` should not be called as it will just block forever. + fn schedule_read(&mut self) -> io::Result { + assert_eq!(self.state, State::NotReading); + let amt = unsafe { + let slice = slice_to_end(self.dst); + try!(self.pipe.read_overlapped(slice, &mut *self.overlapped)) + }; + + // If this read finished immediately then our overlapped event will + // remain signaled (it was signaled coming in here) and we'll progress + // down to the method below. + // + // Otherwise the I/O operation is scheduled and the system set our event + // to not signaled, so we flag ourselves into the reading state and move + // on. + self.state = match amt { + Some(0) => return Ok(false), + Some(amt) => State::Read(amt), + None => State::Reading, + }; + Ok(true) + } + + /// Wait for the result of the overlapped operation previously executed. + /// + /// Takes a parameter `wait` which indicates if this pipe is currently being + /// read whether the function should block waiting for the read to complete. + /// + /// Return values: + /// + /// * `true` - finished any pending read and the pipe is not at EOF (keep + /// going) + /// * `false` - finished any pending read and pipe is at EOF (stop issuing + /// reads) + fn result(&mut self) -> io::Result { + let amt = match self.state { + State::NotReading => return Ok(true), + State::Reading => { + try!(self.pipe.overlapped_result(&mut *self.overlapped, true)) + } + State::Read(amt) => amt, + }; + self.state = State::NotReading; + unsafe { + let len = self.dst.len(); + self.dst.set_len(len + amt); + } + Ok(amt != 0) + } + + /// Finishes out reading this pipe entirely. + /// + /// Waits for any pending and schedule read, and then calls `read_to_end` + /// if necessary to read all the remaining information. + fn finish(&mut self) -> io::Result<()> { + while try!(self.result()) && try!(self.schedule_read()) { + // ... + } + Ok(()) + } +} + +impl<'a> Drop for AsyncPipe<'a> { + fn drop(&mut self) { + match self.state { + State::Reading => {} + _ => return, + } + + // If we have a pending read operation, then we have to make sure that + // it's *done* before we actually drop this type. The kernel requires + // that the `OVERLAPPED` and buffer pointers are valid for the entire + // I/O operation. + // + // To do that, we call `CancelIo` to cancel any pending operation, and + // if that succeeds we wait for the overlapped result. + // + // If anything here fails, there's not really much we can do, so we leak + // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. + if self.pipe.cancel_io().is_err() || self.result().is_err() { + let buf = mem::replace(self.dst, Vec::new()); + let overlapped = Box::new(unsafe { mem::zeroed() }); + let overlapped = mem::replace(&mut self.overlapped, overlapped); + mem::forget((buf, overlapped)); + } + } +} + +unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), + v.capacity() - v.len()) +} diff --git a/src/libstd/sys/windows/stdio.rs b/src/libstd/sys/windows/stdio.rs index 5a8705bf0cb98..5883904c21d72 100644 --- a/src/libstd/sys/windows/stdio.rs +++ b/src/libstd/sys/windows/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_stdio")] + use prelude::v1::*; use io::prelude::*;