Skip to content

Commit

Permalink
feat: make container and exec logs Sendable
Browse files Browse the repository at this point in the history
There is no need to stick logs to lifetime of container:
- this way logs can't be sent to another thread/task
- logs will stop with EOF if container has been dropped anyway, what's reasonable
- it's step forward log followers (like in [Java implementation](https://java.testcontainers.org/features/container_logs/))
  • Loading branch information
DDtKey committed May 26, 2024
1 parent d51b9a0 commit ec63ebb
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 91 deletions.
14 changes: 7 additions & 7 deletions testcontainers/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ impl Client {
Ok(Client { config, bollard })
}

pub(crate) fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> {
self.logs(id, LogSource::StdOut)
pub(crate) fn stdout_logs(&self, id: &str, follow: bool) -> LogStreamAsync {
self.logs(id, LogSource::StdOut, follow)
}

pub(crate) fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> {
self.logs(id, LogSource::StdErr)
pub(crate) fn stderr_logs(&self, id: &str, follow: bool) -> LogStreamAsync {
self.logs(id, LogSource::StdErr, follow)
}

pub(crate) async fn ports(&self, id: &str) -> Result<Ports, ClientError> {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl Client {
&self,
container_id: &str,
cmd: Vec<String>,
) -> Result<ExecResult<'_>, ClientError> {
) -> Result<ExecResult, ClientError> {
let config = CreateExecOptions {
cmd: Some(cmd),
attach_stdout: Some(true),
Expand Down Expand Up @@ -245,9 +245,9 @@ impl Client {
.map_err(ClientError::InspectExec)
}

fn logs(&self, container_id: &str, log_source: LogSource) -> LogStreamAsync<'_> {
fn logs(&self, container_id: &str, log_source: LogSource, follow: bool) -> LogStreamAsync {
let options = LogsOptions {
follow: true,
follow,
stdout: log_source.is_stdout(),
stderr: log_source.is_stderr(),
tail: "all".to_owned(),
Expand Down
12 changes: 6 additions & 6 deletions testcontainers/src/core/client/exec.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::core::logs::LogStreamAsync;

pub(crate) struct ExecResult<'a> {
pub(crate) struct ExecResult {
pub(crate) id: String,
pub(crate) stdout: LogStreamAsync<'a>,
pub(crate) stderr: LogStreamAsync<'a>,
pub(crate) stdout: LogStreamAsync,
pub(crate) stderr: LogStreamAsync,
}

impl<'a> ExecResult<'a> {
impl ExecResult {
pub(crate) fn id(&self) -> &str {
&self.id
}

pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync<'a> {
pub(crate) fn stdout(&mut self) -> &mut LogStreamAsync {
&mut self.stdout
}

pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync<'a> {
pub(crate) fn stderr(&mut self) -> &mut LogStreamAsync {
&mut self.stderr
}
}
59 changes: 35 additions & 24 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
}

/// Executes a command in the container.
pub async fn exec(&self, cmd: ExecCommand) -> Result<exec::ExecResult<'_>> {
pub async fn exec(&self, cmd: ExecCommand) -> Result<exec::ExecResult> {
let ExecCommand {
cmd,
container_ready_conditions,
Expand Down Expand Up @@ -262,14 +262,14 @@ where
}

/// Returns an asynchronous reader for stdout.
pub fn stdout(&self) -> Pin<Box<dyn AsyncBufRead + '_>> {
let stdout = self.docker_client.stdout_logs(&self.id);
pub fn stdout(&self) -> Pin<Box<dyn AsyncBufRead + Send>> {
let stdout = self.docker_client.stdout_logs(&self.id, true);
Box::pin(tokio_util::io::StreamReader::new(stdout.into_inner()))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr(&self) -> Pin<Box<dyn AsyncBufRead + '_>> {
let stderr = self.docker_client.stderr_logs(&self.id);
pub fn stderr(&self) -> Pin<Box<dyn AsyncBufRead + Send>> {
let stderr = self.docker_client.stderr_logs(&self.id, true);
Box::pin(tokio_util::io::StreamReader::new(stderr.into_inner()))
}

Expand All @@ -281,13 +281,13 @@ where
match condition {
WaitFor::StdOutMessage { message } => self
.docker_client
.stdout_logs(id)
.stdout_logs(id, true)
.wait_for_message(message)
.await
.map_err(WaitContainerError::from)?,
WaitFor::StdErrMessage { message } => self
.docker_client
.stderr_logs(id)
.stderr_logs(id, true)
.wait_for_message(message)
.await
.map_err(WaitContainerError::from)?,
Expand Down Expand Up @@ -382,23 +382,34 @@ mod tests {
let image = GenericImage::new("testcontainers/helloworld", "1.1.0");
let container = RunnableImage::from(image).start().await?;

let mut stderr_lines = container.stderr().lines();

let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next_line().await?.expect("line must exist");
assert!(
line.contains(expected_message),
"Log message ('{line}') doesn't contain expected message ('{expected_message}')"
);
}
let stderr = container.stderr();

// it's possible to send logs into background task
let log_follower_task = tokio::spawn(async move {
let mut stderr_lines = stderr.lines();
let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next_line().await?.expect("line must exist");
if !line.contains(expected_message) {
anyhow::bail!(
"Log message ('{}') doesn't contain expected message ('{}')",
line,
expected_message
);
}
}
Ok(())
});
log_follower_task
.await
.map_err(|_| anyhow::anyhow!("failed to join log follower task"))??;

// logs are accessible after container is stopped
container.stop().await?;
Expand Down
10 changes: 5 additions & 5 deletions testcontainers/src/core/containers/async_container/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use tokio::io::{AsyncBufRead, AsyncReadExt};
use crate::core::{client::Client, error::Result};

/// Represents the result of an executed command in a container.
pub struct ExecResult<'a> {
pub struct ExecResult {
pub(super) client: Arc<Client>,
pub(crate) id: String,
pub(super) stdout: BoxStream<'a, std::result::Result<Bytes, io::Error>>,
pub(super) stderr: BoxStream<'a, std::result::Result<Bytes, io::Error>>,
pub(super) stdout: BoxStream<'static, std::result::Result<Bytes, io::Error>>,
pub(super) stderr: BoxStream<'static, std::result::Result<Bytes, io::Error>>,
}

impl<'a> ExecResult<'a> {
impl ExecResult {
/// Returns the exit code of the executed command.
/// If the command has not yet exited, this will return `None`.
pub async fn exit_code(&self) -> Result<Option<i64>> {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl<'a> ExecResult<'a> {
}
}

impl fmt::Debug for ExecResult<'_> {
impl fmt::Debug for ExecResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExecResult").field("id", &self.id).finish()
}
Expand Down
67 changes: 39 additions & 28 deletions testcontainers/src/core/containers/sync_container.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, io::BufRead, net::IpAddr};
use std::{fmt, io::BufRead, net::IpAddr, sync::Arc};

use crate::{
core::{env, error::Result, ports::Ports, ExecCommand},
Expand Down Expand Up @@ -28,7 +28,7 @@ pub struct Container<I: Image> {

/// Internal representation of a running docker container, to be able to terminate runtime correctly when `Container` is dropped.
struct ActiveContainer<I: Image> {
runtime: tokio::runtime::Runtime,
runtime: Arc<tokio::runtime::Runtime>,
async_impl: ContainerAsync<I>,
}

Expand All @@ -50,7 +50,7 @@ impl<I: Image> Container<I> {
pub(crate) fn new(runtime: tokio::runtime::Runtime, async_impl: ContainerAsync<I>) -> Self {
Self {
inner: Some(ActiveContainer {
runtime,
runtime: Arc::new(runtime),
async_impl,
}),
}
Expand Down Expand Up @@ -132,11 +132,11 @@ where
}

/// Executes a command in the container.
pub fn exec(&self, cmd: ExecCommand) -> Result<exec::SyncExecResult<'_>> {
pub fn exec(&self, cmd: ExecCommand) -> Result<exec::SyncExecResult> {
let async_exec = self.rt().block_on(self.async_impl().exec(cmd))?;
Ok(exec::SyncExecResult {
inner: async_exec,
runtime: self.rt(),
runtime: self.rt().clone(),
})
}

Expand All @@ -159,23 +159,23 @@ where
}

/// Returns a reader for stdout.
pub fn stdout(&self) -> Box<dyn BufRead + '_> {
pub fn stdout(&self) -> Box<dyn BufRead + Send> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stdout(),
self.rt(),
self.rt().clone(),
))
}

/// Returns a reader for stderr.
pub fn stderr(&self) -> Box<dyn BufRead + '_> {
pub fn stderr(&self) -> Box<dyn BufRead + Send> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stderr(),
self.rt(),
self.rt().clone(),
))
}

/// Returns reference to inner `Runtime`. It's safe to unwrap because it's `Some` until `Container` is dropped.
fn rt(&self) -> &tokio::runtime::Runtime {
fn rt(&self) -> &Arc<tokio::runtime::Runtime> {
&self.inner.as_ref().unwrap().runtime
}

Expand Down Expand Up @@ -234,27 +234,38 @@ mod test {
fn assert_send_and_sync<T: Send + Sync>() {}

#[test]
fn async_logs_are_accessible() -> anyhow::Result<()> {
fn sync_logs_are_accessible() -> anyhow::Result<()> {
let image = GenericImage::new("testcontainers/helloworld", "1.1.0");
let container = RunnableImage::from(image).start()?;

let mut stderr_lines = container.stderr().lines();

let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next().expect("line must exist")?;
assert!(
line.contains(expected_message),
"Log message ('{line}') doesn't contain expected message ('{expected_message}')"
);
}
let stderr = container.stderr();

// it's possible to send logs to another thread
let log_follower_thread = std::thread::spawn(move || {
let mut stderr_lines = stderr.lines();
let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next().expect("line must exist")?;
if !line.contains(expected_message) {
anyhow::bail!(
"Log message ('{}') doesn't contain expected message ('{}')",
line,
expected_message
);
}
}
Ok(())
});
log_follower_thread
.join()
.map_err(|_| anyhow::anyhow!("failed to join log follower thread"))??;

// logs are accessible after container is stopped
container.stop()?;
Expand Down
16 changes: 8 additions & 8 deletions testcontainers/src/core/containers/sync_container/exec.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{fmt, io::BufRead};
use std::{fmt, io::BufRead, sync::Arc};

use crate::{core::sync_container::sync_reader, TestcontainersError};

/// Represents the result of an executed command in a container.
pub struct SyncExecResult<'a> {
pub(super) inner: crate::core::async_container::exec::ExecResult<'a>,
pub(super) runtime: &'a tokio::runtime::Runtime,
pub struct SyncExecResult {
pub(super) inner: crate::core::async_container::exec::ExecResult,
pub(super) runtime: Arc<tokio::runtime::Runtime>,
}

impl<'a> SyncExecResult<'a> {
impl SyncExecResult {
/// Returns the exit code of the executed command.
/// If the command has not yet exited, this will return `None`.
pub fn exit_code(&self) -> Result<Option<i64>, TestcontainersError> {
Expand All @@ -19,15 +19,15 @@ impl<'a> SyncExecResult<'a> {
pub fn stdout<'b>(&'b mut self) -> Box<dyn BufRead + 'b> {
Box::new(sync_reader::SyncReadBridge::new(
self.inner.stdout(),
self.runtime,
self.runtime.clone(),
))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr<'b>(&'b mut self) -> Box<dyn BufRead + 'b> {
Box::new(sync_reader::SyncReadBridge::new(
self.inner.stderr(),
self.runtime,
self.runtime.clone(),
))
}

Expand All @@ -42,7 +42,7 @@ impl<'a> SyncExecResult<'a> {
}
}

impl fmt::Debug for SyncExecResult<'_> {
impl fmt::Debug for SyncExecResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExecResult")
.field("id", &self.inner.id)
Expand Down
Loading

0 comments on commit ec63ebb

Please sign in to comment.