Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Actively tail worker stdio from supervisor agent #588

Merged
17 commits merged into from
Feb 26, 2021
Merged
142 changes: 126 additions & 16 deletions src/agent/onefuzz-supervisor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@

use std::{
path::{Path, PathBuf},
process::{Child, Command},
process::{Child, ChildStderr, ChildStdout, Command, Stdio},
thread::{self, JoinHandle},
};

use anyhow::{Context as AnyhowContext, Result};
use anyhow::{format_err, Context as AnyhowContext, Result};
use downcast_rs::Downcast;
use onefuzz::process::{ExitStatus, Output};
use tokio::fs;

use crate::work::*;

// Max length of captured output streams from worker child processes.
const MAX_TAIL_LEN: usize = 4096;

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerEvent {
Expand Down Expand Up @@ -228,25 +232,97 @@ impl IWorkerRunner for WorkerRunner {
cmd.arg("managed");
cmd.arg("config.json");
cmd.arg(setup_dir);
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());

let child = cmd.spawn().context("onefuzz-agent failed to start")?;
let child = Box::new(child);
Ok(Box::new(RedirectedChild::new(child)?))
}
}

/// Child process with redirected output streams, tailed by two worker threads.
struct RedirectedChild {
/// The child process.
child: Child,

/// Worker threads which continuously read from the redirected streams.
streams: Option<StreamReaderThreads>,
}

Ok(child)
impl RedirectedChild {
ranweiler marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(mut child: Child) -> Result<Self> {
let stderr = child.stderr.take().ok_or_else(|| format_err!("onefuzz-agent stderr not piped"))?;
let stdout = child.stdout.take().ok_or_else(|| format_err!("onefuzz-agent stdout not piped"))?;
let streams = Some(StreamReaderThreads::new(stderr, stdout));
Ok(Self { child, streams })
}
}

impl IWorkerChild for Child {
/// Worker threads that tail the redirected output streams of a running child process.
struct StreamReaderThreads {
stderr: JoinHandle<CircularBuffer>,
stdout: JoinHandle<CircularBuffer>,
}

struct CapturedStreams {
stderr: String,
stdout: String,
}

impl StreamReaderThreads {
pub fn new(mut stderr: ChildStderr, mut stdout: ChildStdout) -> Self {
use std::io::Read;

let stderr = thread::spawn(move || {
let mut buf = CircularBuffer::new(MAX_TAIL_LEN);
let mut tmp = [0u8; MAX_TAIL_LEN];

while let Ok(count) = stderr.read(&mut tmp) {
if let Err(err) = std::io::copy(&mut &tmp[..count], &mut buf) {
log::error!("error copying to circular buffer: {}", err);
break;
}
}

buf
});

let stdout = thread::spawn(move || {
let mut buf = CircularBuffer::new(MAX_TAIL_LEN);
let mut tmp = [0u8; MAX_TAIL_LEN];

while let Ok(count) = stdout.read(&mut tmp) {
if let Err(err) = std::io::copy(&mut &tmp[..count], &mut buf) {
log::error!("error copying to circular buffer: {}", err);
break;
}
}

buf
});

Self { stderr, stdout }
}

pub fn join(self) -> Result<CapturedStreams> {
let stderr = self.stderr.join().map_err(|_| format_err!("stderr tail thread panicked"))?.to_string_lossy();
let stdout = self.stdout.join().map_err(|_| format_err!("stdout tail thread panicked"))?.to_string_lossy();

Ok(CapturedStreams { stderr, stdout })
}
}

impl IWorkerChild for RedirectedChild {
fn try_wait(&mut self) -> Result<Option<Output>> {
let output = if let Some(exit_status) = self.try_wait()? {
let output = if let Some(exit_status) = self.child.try_wait()? {
let exit_status = exit_status.into();
let stderr = read_to_string(&mut self.stderr)?;
let stdout = read_to_string(&mut self.stdout)?;
let streams = self.streams.take();
let streams = streams.ok_or_else(|| format_err!("onefuzz-agent streams not captured"))?.join()?;

Some(Output {
exit_status,
stderr,
stdout,
stderr: streams.stderr,
stdout: streams.stdout,
})
} else {
None
Expand All @@ -258,7 +334,7 @@ impl IWorkerChild for Child {
fn kill(&mut self) -> Result<()> {
use std::io::ErrorKind;

let killed = self.kill();
let killed = self.child.kill();

if let Err(err) = &killed {
if let ErrorKind::InvalidInput = err.kind() {
Expand All @@ -271,15 +347,49 @@ impl IWorkerChild for Child {
}
}

fn read_to_string(stream: &mut Option<impl std::io::Read>) -> Result<String> {
let mut data = Vec::new();
if let Some(stream) = stream {
stream.read_to_end(&mut data)?;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CircularBuffer {
data: Vec<u8>,
capacity: usize,
}

impl CircularBuffer {
pub fn new(capacity: usize) -> Self {
let data = Vec::with_capacity(capacity);
Self { data, capacity }
}

pub fn data(&self) -> &[u8] {
&self.data
}

pub fn to_string_lossy(&self) -> String {
String::from_utf8_lossy(&self.data).to_string()
}
}

impl std::io::Write for CircularBuffer {
fn write(&mut self, new_data: &[u8]) -> std::io::Result<usize> {
// Write the new data to the internal buffer, allocating internally as needed.
self.data.extend(new_data);

// Shift and truncate the buffer if it is too big.
if self.data.len() > self.capacity {
let lo = self.data.len() - self.capacity;
let range = lo..self.data.len();
self.data.copy_within(range, 0);
self.data.truncate(self.capacity);
}

Ok(new_data.len())
}

Ok(String::from_utf8_lossy(&data).into_owned())
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}


#[cfg(test)]
pub mod double;

Expand Down