Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

std: Improve pipe functionality #14781

Merged
merged 1 commit into from
Jun 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/liblibc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(unix)] pub use consts::os::posix88::{ENOTCONN, ECONNABORTED, EADDRNOTAVAIL, EINTR};
#[cfg(unix)] pub use consts::os::posix88::{EADDRINUSE, ENOENT, EISDIR, EAGAIN, EWOULDBLOCK};
#[cfg(unix)] pub use consts::os::posix88::{ECANCELED, SIGINT, EINPROGRESS};
#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT};
#[cfg(unix)] pub use consts::os::posix88::{ENOSYS, ENOTTY, ETIMEDOUT, EMFILE};
#[cfg(unix)] pub use consts::os::posix88::{SIGTERM, SIGKILL, SIGPIPE, PROT_NONE};
#[cfg(unix)] pub use consts::os::posix01::{SIG_IGN};
#[cfg(unix)] pub use consts::os::bsd44::{AF_UNIX};
Expand All @@ -196,7 +196,7 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::c95::{WSAECONNREFUSED, WSAECONNRESET, WSAEACCES};
#[cfg(windows)] pub use consts::os::c95::{WSAEWOULDBLOCK, WSAENOTCONN, WSAECONNABORTED};
#[cfg(windows)] pub use consts::os::c95::{WSAEADDRNOTAVAIL, WSAEADDRINUSE, WSAEINTR};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL};
#[cfg(windows)] pub use consts::os::c95::{WSAEINPROGRESS, WSAEINVAL, WSAEMFILE};
#[cfg(windows)] pub use consts::os::extra::{ERROR_INSUFFICIENT_BUFFER};
#[cfg(windows)] pub use consts::os::extra::{O_BINARY, O_NOINHERIT, PAGE_NOACCESS};
#[cfg(windows)] pub use consts::os::extra::{PAGE_READONLY, PAGE_READWRITE, PAGE_EXECUTE};
Expand Down
6 changes: 3 additions & 3 deletions src/libnative/io/file_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,9 @@ mod tests {
fn test_file_desc() {
// Run this test with some pipes so we don't have to mess around with
// opening or closing files.
let os::Pipe { input, out } = os::pipe();
let mut reader = FileDesc::new(input, true);
let mut writer = FileDesc::new(out, true);
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
let mut reader = FileDesc::new(reader, true);
let mut writer = FileDesc::new(writer, true);

writer.inner_write(bytes!("test")).ok().unwrap();
let mut buf = [0u8, ..4];
Expand Down
4 changes: 2 additions & 2 deletions src/libnative/io/helper_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ mod imp {
pub type signal = libc::c_int;

pub fn new() -> (signal, signal) {
let pipe = os::pipe();
(pipe.input, pipe.out)
let os::Pipe { reader, writer } = unsafe { os::pipe().unwrap() };
(reader, writer)
}

pub fn signal(fd: libc::c_int) {
Expand Down
203 changes: 123 additions & 80 deletions src/libnative/io/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@

use libc::{pid_t, c_void, c_int};
use libc;
use std::c_str::CString;
use std::io;
use std::mem;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::rt::rtio::{ProcessConfig, IoResult, IoError};
use std::c_str::CString;
use std::rt::rtio;

use super::file;
use super::util;
Expand Down Expand Up @@ -73,47 +74,43 @@ impl Process {

fn get_io(io: rtio::StdioContainer,
ret: &mut Vec<Option<file::FileDesc>>)
-> (Option<os::Pipe>, c_int)
-> IoResult<Option<file::FileDesc>>
{
match io {
rtio::Ignored => { ret.push(None); (None, -1) }
rtio::InheritFd(fd) => { ret.push(None); (None, fd) }
rtio::Ignored => { ret.push(None); Ok(None) }
rtio::InheritFd(fd) => {
ret.push(None);
Ok(Some(file::FileDesc::new(fd, true)))
}
rtio::CreatePipe(readable, _writable) => {
let pipe = os::pipe();
let (reader, writer) = try!(pipe());
let (theirs, ours) = if readable {
(pipe.input, pipe.out)
(reader, writer)
} else {
(pipe.out, pipe.input)
(writer, reader)
};
ret.push(Some(file::FileDesc::new(ours, true)));
(Some(pipe), theirs)
ret.push(Some(ours));
Ok(Some(theirs))
}
}
}

let mut ret_io = Vec::new();
let (in_pipe, in_fd) = get_io(cfg.stdin, &mut ret_io);
let (out_pipe, out_fd) = get_io(cfg.stdout, &mut ret_io);
let (err_pipe, err_fd) = get_io(cfg.stderr, &mut ret_io);

let res = spawn_process_os(cfg, in_fd, out_fd, err_fd);

unsafe {
for pipe in in_pipe.iter() { let _ = libc::close(pipe.input); }
for pipe in out_pipe.iter() { let _ = libc::close(pipe.out); }
for pipe in err_pipe.iter() { let _ = libc::close(pipe.out); }
}
let res = spawn_process_os(cfg,
try!(get_io(cfg.stdin, &mut ret_io)),
try!(get_io(cfg.stdout, &mut ret_io)),
try!(get_io(cfg.stderr, &mut ret_io)));

match res {
Ok(res) => {
Ok((Process {
pid: res.pid,
handle: res.handle,
exit_code: None,
exit_signal: None,
deadline: 0,
},
ret_io))
let p = Process {
pid: res.pid,
handle: res.handle,
exit_code: None,
exit_signal: None,
deadline: 0,
};
Ok((p, ret_io))
}
Err(e) => Err(e)
}
Expand Down Expand Up @@ -194,6 +191,37 @@ impl Drop for Process {
}
}

fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
#[cfg(unix)] use ERROR = libc::EMFILE;
#[cfg(windows)] use ERROR = libc::WSAEMFILE;
struct Closer { fd: libc::c_int }

let os::Pipe { reader, writer } = match unsafe { os::pipe() } {
Ok(p) => p,
Err(io::IoError { detail, .. }) => return Err(IoError {
code: ERROR as uint,
extra: 0,
detail: detail,
})
};
let mut reader = Closer { fd: reader };
let mut writer = Closer { fd: writer };

let native_reader = file::FileDesc::new(reader.fd, true);
reader.fd = -1;
let native_writer = file::FileDesc::new(writer.fd, true);
writer.fd = -1;
return Ok((native_reader, native_writer));

impl Drop for Closer {
fn drop(&mut self) {
if self.fd != -1 {
let _ = unsafe { libc::close(self.fd) };
}
}
}
}

#[cfg(windows)]
unsafe fn killpid(pid: pid_t, signal: int) -> IoResult<()> {
let handle = libc::OpenProcess(libc::PROCESS_TERMINATE |
Expand Down Expand Up @@ -246,7 +274,9 @@ struct SpawnProcessResult {

#[cfg(windows)]
fn spawn_process_os(cfg: ProcessConfig,
in_fd: c_int, out_fd: c_int, err_fd: c_int)
in_fd: Option<file::FileDesc>,
out_fd: Option<file::FileDesc>,
err_fd: Option<file::FileDesc>)
-> IoResult<SpawnProcessResult> {
use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO};
use libc::consts::os::extra::{
Expand Down Expand Up @@ -283,47 +313,51 @@ fn spawn_process_os(cfg: ProcessConfig,
// Similarly to unix, we don't actually leave holes for the stdio file
// descriptors, but rather open up /dev/null equivalents. These
// equivalents are drawn from libuv's windows process spawning.
let set_fd = |fd: c_int, slot: &mut HANDLE, is_stdin: bool| {
if fd == -1 {
let access = if is_stdin {
libc::FILE_GENERIC_READ
} else {
libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
};
let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
let mut sa = libc::SECURITY_ATTRIBUTES {
nLength: size as libc::DWORD,
lpSecurityDescriptor: ptr::mut_null(),
bInheritHandle: 1,
};
let filename = "NUL".to_utf16().append_one(0);
*slot = libc::CreateFileW(filename.as_ptr(),
access,
libc::FILE_SHARE_READ |
libc::FILE_SHARE_WRITE,
&mut sa,
libc::OPEN_EXISTING,
0,
ptr::mut_null());
if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
return Err(super::last_error())
}
} else {
let orig = get_osfhandle(fd) as HANDLE;
if orig == INVALID_HANDLE_VALUE as HANDLE {
return Err(super::last_error())
let set_fd = |fd: &Option<file::FileDesc>, slot: &mut HANDLE,
is_stdin: bool| {
match *fd {
None => {
let access = if is_stdin {
libc::FILE_GENERIC_READ
} else {
libc::FILE_GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES
};
let size = mem::size_of::<libc::SECURITY_ATTRIBUTES>();
let mut sa = libc::SECURITY_ATTRIBUTES {
nLength: size as libc::DWORD,
lpSecurityDescriptor: ptr::mut_null(),
bInheritHandle: 1,
};
let filename = "NUL".to_utf16().append_one(0);
*slot = libc::CreateFileW(filename.as_ptr(),
access,
libc::FILE_SHARE_READ |
libc::FILE_SHARE_WRITE,
&mut sa,
libc::OPEN_EXISTING,
0,
ptr::mut_null());
if *slot == INVALID_HANDLE_VALUE as libc::HANDLE {
return Err(super::last_error())
}
}
if DuplicateHandle(cur_proc, orig, cur_proc, slot,
0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
return Err(super::last_error())
Some(ref fd) => {
let orig = get_osfhandle(fd.fd()) as HANDLE;
if orig == INVALID_HANDLE_VALUE as HANDLE {
return Err(super::last_error())
}
if DuplicateHandle(cur_proc, orig, cur_proc, slot,
0, TRUE, DUPLICATE_SAME_ACCESS) == FALSE {
return Err(super::last_error())
}
}
}
Ok(())
};

try!(set_fd(in_fd, &mut si.hStdInput, true));
try!(set_fd(out_fd, &mut si.hStdOutput, false));
try!(set_fd(err_fd, &mut si.hStdError, false));
try!(set_fd(&in_fd, &mut si.hStdInput, true));
try!(set_fd(&out_fd, &mut si.hStdOutput, false));
try!(set_fd(&err_fd, &mut si.hStdError, false));

let cmd_str = make_command_line(cfg.program, cfg.args);
let mut pi = zeroed_process_information();
Expand Down Expand Up @@ -464,7 +498,10 @@ fn make_command_line(prog: &CString, args: &[CString]) -> String {
}

#[cfg(unix)]
fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_int)
fn spawn_process_os(cfg: ProcessConfig,
in_fd: Option<file::FileDesc>,
out_fd: Option<file::FileDesc>,
err_fd: Option<file::FileDesc>)
-> IoResult<SpawnProcessResult>
{
use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
Expand Down Expand Up @@ -498,9 +535,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i

with_envp(cfg.env, proc(envp) {
with_argv(cfg.program, cfg.args, proc(argv) unsafe {
let pipe = os::pipe();
let mut input = file::FileDesc::new(pipe.input, true);
let mut output = file::FileDesc::new(pipe.out, true);
let (mut input, mut output) = try!(pipe());

// We may use this in the child, so perform allocations before the
// fork
Expand All @@ -510,7 +545,7 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i

let pid = fork();
if pid < 0 {
fail!("failure in fork: {}", os::last_os_error());
return Err(super::last_error())
} else if pid > 0 {
drop(output);
let mut bytes = [0, ..4];
Expand Down Expand Up @@ -586,16 +621,24 @@ fn spawn_process_os(cfg: ProcessConfig, in_fd: c_int, out_fd: c_int, err_fd: c_i
// up /dev/null into that file descriptor. Otherwise, the first file
// descriptor opened up in the child would be numbered as one of the
// stdio file descriptors, which is likely to wreak havoc.
let setup = |src: c_int, dst: c_int| {
let src = if src == -1 {
let flags = if dst == libc::STDIN_FILENO {
libc::O_RDONLY
} else {
libc::O_RDWR
};
devnull.with_ref(|p| libc::open(p, flags, 0))
} else {
src
let setup = |src: Option<file::FileDesc>, dst: c_int| {
let src = match src {
None => {
let flags = if dst == libc::STDIN_FILENO {
libc::O_RDONLY
} else {
libc::O_RDWR
};
devnull.with_ref(|p| libc::open(p, flags, 0))
}
Some(obj) => {
let fd = obj.fd();
// Leak the memory and the file descriptor. We're in the
// child now an all our resources are going to be
// cleaned up very soon
mem::forget(obj);
fd
}
};
src != -1 && retry(|| dup2(src, dst)) != -1
};
Expand Down
Loading