Skip to content

Commit

Permalink
Try to more thoroughly kill child Docker processes
Browse files Browse the repository at this point in the history
During a recent outage, the Docker service reported 161 running
`docker` processes, while the playground was only aware of 27. Each
`docker` processes (and their children) use about 30 MiB of RAM. That
would total 4.7 GiB of memory, while the instance only has 4GiB.

My hypothesis is that we are starting containers, then encountering an
error and bubbling it up to the caller. Along the way, we skip the
call to shutdown the container.

This change tries to shutdown the container when the value is
dropped. This should allow it to be handled in error cases as well.

It's possible that this might also help with some of the zombie
processes we see from time-to-time!
  • Loading branch information
shepmaster committed Sep 24, 2024
1 parent ca5d2ca commit d28aa3e
Showing 1 changed file with 72 additions and 29 deletions.
101 changes: 72 additions & 29 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::{
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::{io::SyncIoBridge, sync::CancellationToken};
use tracing::{info_span, instrument, trace, trace_span, warn, Instrument};
use tracing::{error, info_span, instrument, trace, trace_span, warn, Instrument};

use crate::{
bincode_input_closed,
Expand Down Expand Up @@ -1150,7 +1150,7 @@ where
struct Container {
permit: Box<dyn ContainerPermit>,
task: JoinHandle<Result<()>>,
kill_child: Option<Command>,
kill_child: TerminateContainer,
modify_cargo_toml: ModifyCargoToml,
commander: Commander,
}
Expand Down Expand Up @@ -1908,24 +1908,14 @@ impl Container {
let Self {
permit,
task,
kill_child,
mut kill_child,
modify_cargo_toml,
commander,
} = self;
drop(commander);
drop(modify_cargo_toml);

if let Some(mut kill_child) = kill_child {
// We don't care if the command itself succeeds or not; it
// may already be dead!
let _ = kill_child
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.await
.context(KillWorkerSnafu)?;
}
kill_child.terminate_now().await?;

let r = task.await;
drop(permit);
Expand Down Expand Up @@ -2542,12 +2532,64 @@ pub enum CommanderError {
WorkerOperationFailed { source: SerializedError2 },
}

#[derive(Debug)]
pub struct TerminateContainer(Option<Command>);

impl TerminateContainer {
pub fn new(command: Command) -> Self {
Self(Some(command))
}

pub fn none() -> Self {
Self(None)
}

async fn terminate_now(&mut self) -> Result<(), TerminateContainerError> {
if let Some(mut kill_child) = self.take_command() {
// [ALREADY-DEAD] We don't care if the command itself
// succeeds or not; the container may already be dead!
let _ = kill_child.status().await?;
}

Ok(())
}

fn take_command(&mut self) -> Option<Command> {
self.0.take().map(|mut kill_child| {
kill_child
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
kill_child
})
}
}

impl Drop for TerminateContainer {
fn drop(&mut self) {
if let Some(mut kill_child) = self.take_command() {
if let Err(e) = kill_child.as_std_mut().status() {
// See [ALREADY-DEAD]
error!("Unable to kill the container while dropping: {e}");
}
}
}
}

#[derive(Debug, Snafu)]
#[snafu(module)]
pub enum TerminateContainerError {
#[snafu(display("Unable to kill the child process"))]
#[snafu(context(false))]
Execute { source: std::io::Error },
}

pub trait Backend {
fn run_worker_in_background(
&self,
channel: Channel,
id: impl fmt::Display,
) -> Result<(Child, Option<Command>, ChildStdin, ChildStdout)> {
) -> Result<(Child, TerminateContainer, ChildStdin, ChildStdout)> {
let (mut start, kill) = self.prepare_worker_command(channel, id);

let mut child = start
Expand All @@ -2565,7 +2607,7 @@ pub trait Backend {
&self,
channel: Channel,
id: impl fmt::Display,
) -> (Command, Option<Command>);
) -> (Command, TerminateContainer);
}

impl<B> Backend for &B
Expand All @@ -2576,7 +2618,7 @@ where
&self,
channel: Channel,
id: impl fmt::Display,
) -> (Command, Option<Command>) {
) -> (Command, TerminateContainer) {
B::prepare_worker_command(self, channel, id)
}
}
Expand Down Expand Up @@ -2635,7 +2677,7 @@ impl Backend for DockerBackend {
&self,
channel: Channel,
id: impl fmt::Display,
) -> (Command, Option<Command>) {
) -> (Command, TerminateContainer) {
let name = format!("playground-{id}");

let mut command = basic_secure_docker_command();
Expand All @@ -2654,8 +2696,9 @@ impl Backend for DockerBackend {

let mut kill = Command::new("docker");
kill.arg("kill").args(["--signal", "KILL"]).arg(name);
let kill = TerminateContainer::new(kill);

(command, Some(kill))
(command, kill)
}
}

Expand Down Expand Up @@ -2688,8 +2731,8 @@ pub enum Error {
#[snafu(display("The IO queue task panicked"))]
IoQueuePanicked { source: tokio::task::JoinError },

#[snafu(display("Unable to kill the child process"))]
KillWorker { source: std::io::Error },
#[snafu(transparent)]
KillWorker { source: TerminateContainerError },

#[snafu(display("The container task panicked"))]
ContainerTaskPanicked { source: tokio::task::JoinError },
Expand Down Expand Up @@ -2863,14 +2906,14 @@ mod tests {
&self,
channel: Channel,
_id: impl fmt::Display,
) -> (Command, Option<Command>) {
) -> (Command, TerminateContainer) {
let channel_dir = self.project_dir.path().join(channel.to_str());

let mut command = Command::new("./target/debug/worker");
command.env("RUSTUP_TOOLCHAIN", channel.to_str());
command.arg(channel_dir);

(command, None)
(command, TerminateContainer::none())
}
}

Expand Down Expand Up @@ -2982,7 +3025,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
});

try_join_all(tests).with_timeout().await?;
Expand Down Expand Up @@ -3028,7 +3071,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
},
)
});
Expand Down Expand Up @@ -3067,7 +3110,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
});

try_join_all(tests).with_timeout().await?;
Expand Down Expand Up @@ -3097,7 +3140,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
});

try_join_all(tests).with_timeout().await?;
Expand Down Expand Up @@ -3134,7 +3177,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
});

try_join_all(tests).with_timeout().await?;
Expand Down Expand Up @@ -3857,7 +3900,7 @@ mod tests {

coordinator.shutdown().await?;

Ok(())
Ok::<_, Error>(())
},
)
});
Expand Down

0 comments on commit d28aa3e

Please sign in to comment.