diff --git a/testcontainers/src/core/logs/stream.rs b/testcontainers/src/core/logs/stream.rs index 898245c6..c9fa6213 100644 --- a/testcontainers/src/core/logs/stream.rs +++ b/testcontainers/src/core/logs/stream.rs @@ -51,6 +51,19 @@ impl LogStream { .boxed() } + /// Converts the log stream into a stream of raw bytes, regardless of the source. + pub(crate) fn into_bytes(self) -> RawLogStream { + self.inner + .filter_map(|record| async move { + match record { + Ok(LogFrame::StdErr(bytes)) => Some(Ok(bytes)), + Ok(LogFrame::StdOut(bytes)) => Some(Ok(bytes)), + Err(e) => Some(Err(e)), + } + }) + .boxed() + } + /// Splits the log stream into two streams, one for stdout and one for stderr. pub(crate) async fn split(self) -> (RawLogStream, RawLogStream) { let (stdout_tx, stdout_rx) = tokio::sync::mpsc::unbounded_channel(); diff --git a/testcontainers/src/core/wait/log_strategy.rs b/testcontainers/src/core/wait/log_strategy.rs index 6347bdbf..bd828842 100644 --- a/testcontainers/src/core/wait/log_strategy.rs +++ b/testcontainers/src/core/wait/log_strategy.rs @@ -12,7 +12,7 @@ use crate::{ #[derive(Debug, Clone)] pub struct LogWaitStrategy { - source: LogSource, + source: Option, message: Bytes, times: usize, } @@ -21,18 +21,18 @@ impl LogWaitStrategy { /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard output logs. /// Shortcut for `LogWaitStrategy::new(LogSource::StdOut, message)`. pub fn stdout(message: impl AsRef<[u8]>) -> Self { - Self::new(LogSource::StdOut, message) + Self::new(Some(LogSource::StdOut), message) } /// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard error logs. /// Shortcut for `LogWaitStrategy::new(LogSource::StdErr, message)`. pub fn stderr(message: impl AsRef<[u8]>) -> Self { - Self::new(LogSource::StdErr, message) + Self::new(Some(LogSource::StdErr), message) } /// Create a new `LogWaitStrategy` with the given log source and message. /// The message is expected to appear in the logs exactly once by default. - pub fn new(source: LogSource, message: impl AsRef<[u8]>) -> Self { + pub fn new(source: Option, message: impl AsRef<[u8]>) -> Self { Self { source, message: Bytes::from(message.as_ref().to_vec()), @@ -54,8 +54,9 @@ impl WaitStrategy for LogWaitStrategy { container: &ContainerAsync, ) -> crate::core::error::Result<()> { let log_stream = match self.source { - LogSource::StdOut => client.stdout_logs(container.id(), true), - LogSource::StdErr => client.stderr_logs(container.id(), true), + Some(LogSource::StdOut) => client.stdout_logs(container.id(), true), + Some(LogSource::StdErr) => client.stderr_logs(container.id(), true), + None => client.logs(container.id(), true).into_bytes(), }; WaitingStreamWrapper::new(log_stream) diff --git a/testcontainers/src/core/wait/mod.rs b/testcontainers/src/core/wait/mod.rs index 77eb5c00..55fcf6b9 100644 --- a/testcontainers/src/core/wait/mod.rs +++ b/testcontainers/src/core/wait/mod.rs @@ -44,12 +44,12 @@ pub enum WaitFor { impl WaitFor { /// Wait for the message to appear on the container's stdout. pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor { - Self::log(LogWaitStrategy::new(LogSource::StdOut, message)) + Self::log(LogWaitStrategy::stdout(message)) } /// Wait for the message to appear on the container's stderr. pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor { - Self::log(LogWaitStrategy::new(LogSource::StdErr, message)) + Self::log(LogWaitStrategy::stderr(message)) } /// Wait for the message to appear on the container's stdout.