Skip to content

Commit

Permalink
feat!: support waiting from both standard streams
Browse files Browse the repository at this point in the history
  • Loading branch information
DDtKey committed Jul 7, 2024
1 parent e9cc412 commit 7305971
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
13 changes: 13 additions & 0 deletions testcontainers/src/core/logs/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ impl LogStream {
.boxed()
}

/// Converts the log stream into a stream of raw bytes, regardless of the source.
pub(crate) fn into_bytes(self) -> RawLogStream {
self.inner
.filter_map(|record| async move {
match record {
Ok(LogFrame::StdErr(bytes)) => Some(Ok(bytes)),
Ok(LogFrame::StdOut(bytes)) => Some(Ok(bytes)),
Err(e) => Some(Err(e)),
}
})
.boxed()
}

/// Splits the log stream into two streams, one for stdout and one for stderr.
pub(crate) async fn split(self) -> (RawLogStream, RawLogStream) {
let (stdout_tx, stdout_rx) = tokio::sync::mpsc::unbounded_channel();
Expand Down
13 changes: 7 additions & 6 deletions testcontainers/src/core/wait/log_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{

#[derive(Debug, Clone)]
pub struct LogWaitStrategy {
source: LogSource,
source: Option<LogSource>,
message: Bytes,
times: usize,
}
Expand All @@ -21,18 +21,18 @@ impl LogWaitStrategy {
/// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard output logs.
/// Shortcut for `LogWaitStrategy::new(LogSource::StdOut, message)`.
pub fn stdout(message: impl AsRef<[u8]>) -> Self {
Self::new(LogSource::StdOut, message)
Self::new(Some(LogSource::StdOut), message)
}

/// Create a new [`LogWaitStrategy`] that waits for the given message to appear in the standard error logs.
/// Shortcut for `LogWaitStrategy::new(LogSource::StdErr, message)`.
pub fn stderr(message: impl AsRef<[u8]>) -> Self {
Self::new(LogSource::StdErr, message)
Self::new(Some(LogSource::StdErr), message)
}

/// Create a new `LogWaitStrategy` with the given log source and message.
/// The message is expected to appear in the logs exactly once by default.
pub fn new(source: LogSource, message: impl AsRef<[u8]>) -> Self {
pub fn new(source: Option<LogSource>, message: impl AsRef<[u8]>) -> Self {
Self {
source,
message: Bytes::from(message.as_ref().to_vec()),
Expand All @@ -54,8 +54,9 @@ impl WaitStrategy for LogWaitStrategy {
container: &ContainerAsync<I>,
) -> crate::core::error::Result<()> {
let log_stream = match self.source {
LogSource::StdOut => client.stdout_logs(container.id(), true),
LogSource::StdErr => client.stderr_logs(container.id(), true),
Some(LogSource::StdOut) => client.stdout_logs(container.id(), true),
Some(LogSource::StdErr) => client.stderr_logs(container.id(), true),
None => client.logs(container.id(), true).into_bytes(),
};

WaitingStreamWrapper::new(log_stream)
Expand Down
4 changes: 2 additions & 2 deletions testcontainers/src/core/wait/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ pub enum WaitFor {
impl WaitFor {
/// Wait for the message to appear on the container's stdout.
pub fn message_on_stdout(message: impl AsRef<[u8]>) -> WaitFor {
Self::log(LogWaitStrategy::new(LogSource::StdOut, message))
Self::log(LogWaitStrategy::stdout(message))
}

/// Wait for the message to appear on the container's stderr.
pub fn message_on_stderr(message: impl AsRef<[u8]>) -> WaitFor {
Self::log(LogWaitStrategy::new(LogSource::StdErr, message))
Self::log(LogWaitStrategy::stderr(message))
}

/// Wait for the message to appear on the container's stdout.
Expand Down

0 comments on commit 7305971

Please sign in to comment.