diff --git a/Cargo.toml b/Cargo.toml index 5abee9d9..eee107b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ keywords = ["docker", "testcontainers"] license = "MIT OR Apache-2.0" readme = "README.md" repository = "https://github.com/testcontainers/testcontainers-rs" -rust-version = "1.66" +rust-version = "1.70" [workspace.dependencies] testimages = { path = "testimages" } diff --git a/README.md b/README.md index 5925cf2e..02e5005a 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,38 @@ The crate provides an API for working with containers in a test environment. 1. Depend on `testcontainers` 2. Implement `testcontainers::core::Image` for necessary docker-images -3. Run it with any available client `testcontainers::clients::*` +3. Run it with any available runner `testcontainers::runners::*` (use `blocking` feature for synchronous API) + +#### Example: + +- Blocking API (under `blocking` feature) + +```rust +use testcontainers::{core::WaitFor, runners::SyncRunner, GenericImage}; + +#[test] +fn test_redis() { + let container = GenericImage::new("redis", "7.2.4") + .with_exposed_port(6379) + .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) + .start(); +} +``` + +- Async API + +```rust +use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage}; + +#[tokio::test] +async fn test_redis() { + let container = GenericImage::new("redis", "7.2.4") + .with_exposed_port(6379) + .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) + .start() + .await; +} +``` ### Ready-to-use images diff --git a/testcontainers/Cargo.toml b/testcontainers/Cargo.toml index 734c3420..f286848a 100644 --- a/testcontainers/Cargo.toml +++ b/testcontainers/Cargo.toml @@ -10,25 +10,30 @@ repository.workspace = true rust-version.workspace = true description = "A library for integration-testing against docker containers from within Rust." +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [dependencies] -async-trait = { version = "0.1", optional = true } -bollard = { version = "0.16.1", optional = true } +async-trait = { version = "0.1" } +bollard = { version = "0.16.1", features = ["ssl"] } bollard-stubs = "=1.44.0-rc.2" conquer-once = { version = "0.4", optional = true } +dirs = "5.0.1" futures = "0.3" -hex = "0.4" -hmac = "0.12" log = "0.4" -rand = "0.8" serde = { version = "1", features = ["derive"] } +serde-java-properties = "0.1.1" serde_json = "1" -sha2 = "0.10" +serde_with = "3.7.0" signal-hook = { version = "0.3", optional = true } -tokio = { version = "1", features = ["macros"], optional = true } +tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] } +tokio-util = "0.7.10" +url = { version = "2", features = ["serde"] } [features] default = [] -experimental = ["async-trait", "bollard", "tokio"] +blocking = [] watchdog = ["signal-hook", "conquer-once"] [dev-dependencies] diff --git a/testcontainers/src/clients.rs b/testcontainers/src/clients.rs deleted file mode 100644 index e407ef34..00000000 --- a/testcontainers/src/clients.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod cli; - -#[cfg(feature = "experimental")] -mod http; - -pub use self::cli::Cli; - -#[cfg(feature = "experimental")] -pub use self::http::Http; diff --git a/testcontainers/src/clients/cli.rs b/testcontainers/src/clients/cli.rs deleted file mode 100644 index 097057b5..00000000 --- a/testcontainers/src/clients/cli.rs +++ /dev/null @@ -1,726 +0,0 @@ -use std::{ - collections::HashMap, - ffi::{OsStr, OsString}, - process::{Child, Command, Stdio}, - sync::{Arc, RwLock}, - thread::sleep, - time::{Duration, Instant}, -}; - -use bollard_stubs::models::{ContainerInspectResponse, HealthStatusEnum}; - -use crate::{ - core::{ - env::{self, GetEnvValue}, - logs::LogStream, - ports::Ports, - ContainerState, Docker, WaitFor, - }, - Container, Image, ImageArgs, RunnableImage, -}; - -const ONE_SECOND: Duration = Duration::from_secs(1); -const ZERO: Duration = Duration::from_secs(0); - -/// Implementation of the Docker client API using the docker cli. -/// -/// This (fairly naive) implementation of the Docker client API simply creates `Command`s to the `docker` CLI. It thereby assumes that the `docker` CLI is installed and that it is in the PATH of the current execution environment. -#[derive(Debug)] -pub struct Cli { - inner: Arc, -} - -impl Cli { - pub fn run(&self, image: impl Into>) -> Container<'_, I> { - let image = image.into(); - - if let Some(network) = image.network() { - if self.inner.create_network_if_not_exists(network) { - let mut guard = self - .inner - .created_networks - .write() - .expect("failed to lock RwLock"); - - guard.push(network.to_owned()); - } - } - - let mut command = Client::build_run_command(&image, self.inner.command()); - - log::debug!("Executing command: {:?}", command); - - let output = command.output().expect("Failed to execute docker command"); - if !output.status.success() { - let stdout = std::str::from_utf8(&output.stdout).unwrap_or("{not utf8}"); - let stderr = std::str::from_utf8(&output.stderr).unwrap_or("{not utf8}"); - log::error!("Failed to start container.\nContainer stdout: {stdout}\nContainer stderr: {stderr}"); - panic!("Failed to start container, check log for details") - } - - let container_id = String::from_utf8(output.stdout) - .expect("output is not valid utf8") - .trim() - .to_string(); - - #[cfg(feature = "watchdog")] - if self.inner.command == env::Command::Remove { - crate::watchdog::register(container_id.clone()); - } - - self.inner.register_container_started(container_id.clone()); - - self.block_until_ready(&container_id, image.ready_conditions()); - - let client = Cli { - inner: self.inner.clone(), - }; - - let container = Container::new(container_id, client, image, self.inner.command); - - for cmd in container - .image() - .exec_after_start(ContainerState::new(container.ports())) - { - container.exec(cmd); - } - - container - } -} - -#[derive(Debug)] -struct Client { - /// The docker CLI has an issue that if you request logs for a container - /// too quickly after it was started up, the resulting stream will never - /// emit any data, even if the container is already emitting logs. - /// - /// We keep track of when we started a container in order to make sure - /// that we wait at least one second after that. Subsequent invocations - /// directly fetch the logs of a container. - container_startup_timestamps: RwLock>, - created_networks: RwLock>, - binary: OsString, - command: env::Command, -} - -impl Client { - fn command(&self) -> Command { - Command::new(self.binary.clone()) - } - - fn register_container_started(&self, id: String) { - let mut lock_guard = match self.container_startup_timestamps.write() { - Ok(lock_guard) => lock_guard, - - // We only need the mutex to not require a &mut self in this function. - // Data cannot be in-consistent even if a thread panics while holding the lock - Err(e) => e.into_inner(), - }; - let start_timestamp = Instant::now(); - - log::trace!( - "Registering starting of container {} at {:?}", - id, - start_timestamp - ); - - lock_guard.insert(id, start_timestamp); - } - - fn time_since_container_was_started(&self, id: &str) -> Option { - let lock_guard = match self.container_startup_timestamps.read() { - Ok(lock_guard) => lock_guard, - - // We only need the mutex to not require a &mut self in this function. - // Data cannot be in-consistent even if a thread panics while holding the lock - Err(e) => e.into_inner(), - }; - - let result = lock_guard.get(id).map(|i| Instant::now() - *i); - - log::trace!("Time since container {} was started: {:?}", id, result); - - result - } - - fn wait_at_least_one_second_after_container_was_started(&self, id: &str) { - if let Some(duration) = self.time_since_container_was_started(id) { - if duration < ONE_SECOND { - sleep(ONE_SECOND.checked_sub(duration).unwrap_or(ZERO)) - } - } - } - - fn build_run_command(image: &RunnableImage, mut command: Command) -> Command { - command.arg("run"); - - if image.privileged() { - command.arg("--privileged"); - } - - if let Some(bytes) = image.shm_size() { - command.arg(format!("--shm-size={bytes}")); - } - - if let Some(network) = image.network() { - command.arg(format!("--network={network}")); - } - - if let Some(name) = image.container_name() { - command.arg(format!("--name={name}")); - } - - for (key, value) in image.env_vars() { - command.arg("-e").arg(format!("{key}={value}")); - } - - for (key, value) in image.hosts() { - command.arg("--add-host").arg(format!("{key}:{value}")); - } - - for (orig, dest) in image.volumes() { - command.arg("-v").arg(format!("{orig}:{dest}")); - } - - if let Some(entrypoint) = image.entrypoint() { - command.arg("--entrypoint").arg(entrypoint); - } - - let is_container_networked = image - .network() - .as_ref() - .map(|network| network.starts_with("container:")) - .unwrap_or(false); - if let Some(ports) = image.ports() { - for port in ports { - command - .arg("-p") - .arg(format!("{}:{}", port.local, port.internal)); - } - } else if !is_container_networked { - for port in image.expose_ports() { - command.arg(format!("--expose={port}")); - } - command.arg("-P"); // publish all exposed ports - } - - command - .arg("-d") // Always run detached - .arg(image.descriptor()) - .args(image.args().clone().into_iterator()) - .stdout(Stdio::piped()); - - command - } - - fn create_network_if_not_exists(&self, name: &str) -> bool { - if self.network_exists(name) { - return false; - } - - let mut docker = self.command(); - docker.args(["network", "create", name]); - - let output = docker.output().expect("failed to create docker network"); - assert!(output.status.success(), "failed to create docker network"); - - true - } - - fn network_exists(&self, name: &str) -> bool { - let mut docker = self.command(); - docker.args(["network", "ls", "--format", "{{.Name}}"]); - - let output = docker.output().expect("failed to list docker networks"); - let output = String::from_utf8(output.stdout).expect("output is not valid utf-8"); - - output.lines().any(|network| network == name) - } - - fn delete_networks(&self, networks: I) - where - I: IntoIterator, - S: AsRef, - { - let mut docker = self.command(); - docker.args(["network", "rm"]); - docker.args(networks); - - let output = docker.output().expect("failed to delete docker networks"); - - assert!( - output.status.success(), - "failed to delete docker networks: {}", - String::from_utf8(output.stderr).unwrap() - ) - } -} - -impl Default for Cli { - fn default() -> Self { - Self::new::() - } -} - -impl Cli { - pub fn new() -> Self - where - E: GetEnvValue, - { - Self { - inner: Arc::new(Client { - container_startup_timestamps: Default::default(), - created_networks: Default::default(), - binary: "docker".into(), - command: env::command::().unwrap_or_default(), - }), - } - } -} - -impl Docker for Cli { - fn stdout_logs(&self, id: &str) -> LogStream { - self.inner - .wait_at_least_one_second_after_container_was_started(id); - - let child = self - .inner - .command() - .arg("logs") - .arg("-f") - .arg(id) - .stdout(Stdio::piped()) - .stderr(Stdio::null()) - .spawn() - .expect("Failed to execute docker command"); - - LogStream::new(child.stdout.expect("stdout to be captured")) - } - - fn stderr_logs(&self, id: &str) -> LogStream { - self.inner - .wait_at_least_one_second_after_container_was_started(id); - - let child = self - .inner - .command() - .arg("logs") - .arg("-f") - .arg(id) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .spawn() - .expect("Failed to execute docker command"); - - LogStream::new(child.stderr.expect("stderr to be captured")) - } - - fn ports(&self, id: &str) -> Ports { - self.inspect(id) - .network_settings - .unwrap_or_default() - .ports - .map(Ports::from) - .unwrap_or_default() - } - - fn inspect(&self, id: &str) -> ContainerInspectResponse { - let output = self - .inner - .command() - .arg("inspect") - .arg(id) - .stdout(Stdio::piped()) - .output() - .expect("Failed to execute docker command"); - assert!( - output.status.success(), - "Failed to inspect docker container" - ); - - let stdout = output.stdout; - - let mut infos: Vec = serde_json::from_slice(&stdout).unwrap(); - - let info = infos.remove(0); - - log::trace!("Fetched container info: {:#?}", info); - info - } - - fn rm(&self, id: &str) { - let output = self - .inner - .command() - .arg("rm") - .arg("-f") - .arg("-v") // Also remove volumes - .arg(id) - .output() - .expect("Failed to execute docker command"); - let error_msg = "Failed to remove docker container"; - assert!(output.status.success(), "{}", error_msg); - // The container's id is printed on stdout if it was removed successfully. - assert!( - String::from_utf8(output.stdout) - .expect("Could not decode daemon's response.") - .contains(id), - "{}", - error_msg - ); - } - - fn stop(&self, id: &str) { - self.inner - .command() - .arg("stop") - .arg(id) - .stdout(Stdio::piped()) - .spawn() - .expect("Failed to execute docker command") - .wait() - .expect("Failed to stop docker container"); - } - - fn start(&self, id: &str) { - self.inner - .command() - .arg("start") - .arg(id) - .stdout(Stdio::piped()) - .spawn() - .expect("Failed to execute docker command") - .wait() - .expect("Failed to start docker container"); - } - - fn exec(&self, id: &str, cmd: String) -> std::process::Output { - let exec_output = self - .inner - .command() - .arg("exec") - .arg(id) - .arg("sh") - .arg("-c") - .arg(&cmd) - .stdout(Stdio::piped()) - .spawn() - .and_then(Child::wait_with_output) - .expect("Failed to execute docker command"); - - log::debug!("command {} was executed!", cmd); - - exec_output - } - - fn block_until_ready(&self, id: &str, ready_conditions: Vec) { - log::debug!("Waiting for container {} to be ready", id); - - for condition in ready_conditions { - match condition { - WaitFor::StdOutMessage { message } => { - self.stdout_logs(id).wait_for_message(&message).unwrap() - } - WaitFor::StdErrMessage { message } => { - self.stderr_logs(id).wait_for_message(&message).unwrap() - } - WaitFor::Duration { length } => { - std::thread::sleep(length); - } - WaitFor::Healthcheck => loop { - use HealthStatusEnum::*; - - let health_status = self - .inspect(id) - .state - .unwrap_or_else(|| panic!("Container state not available")) - .health - .unwrap_or_else(|| panic!("Health state not available")) - .status; - - match health_status { - Some(HEALTHY) => break, - None | Some(EMPTY) | Some(NONE) => { - panic!("Healthcheck not configured for container") - } - Some(UNHEALTHY) => panic!("Healthcheck reports unhealthy"), - Some(STARTING) => sleep(Duration::from_millis(100)), - } - }, - WaitFor::Nothing => {} - } - } - - log::debug!("Container {} is now ready!", id); - } -} - -impl Drop for Client { - fn drop(&mut self) { - let networks = self.created_networks.read().expect("failed to lock RwLock"); - let created_networks = networks.len() > 0; - - match self.command { - env::Command::Remove if created_networks => { - self.delete_networks(networks.iter()); - } - env::Command::Remove => { - // nothing to do - } - env::Command::Keep => { - let networks = networks.join(","); - - log::warn!( - "networks '{}' will not be automatically removed due to `TESTCONTAINERS` command", - networks - ); - } - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - - use super::*; - use crate::images::generic::GenericImage; - - #[derive(Default)] - struct HelloWorld { - volumes: BTreeMap, - env_vars: BTreeMap, - } - - impl Image for HelloWorld { - type Args = (); - - fn name(&self) -> String { - "hello-world".to_owned() - } - - fn tag(&self) -> String { - "latest".to_owned() - } - - fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout("Hello from Docker!")] - } - - fn env_vars(&self) -> Box + '_> { - Box::new(self.env_vars.iter()) - } - - fn volumes(&self) -> Box + '_> { - Box::new(self.volumes.iter()) - } - } - - #[test] - fn cli_run_command_should_include_env_vars() { - let mut volumes = BTreeMap::new(); - volumes.insert("one-from".to_owned(), "one-dest".to_owned()); - volumes.insert("two-from".to_owned(), "two-dest".to_owned()); - - let mut env_vars = BTreeMap::new(); - env_vars.insert("one-key".to_owned(), "one-value".to_owned()); - env_vars.insert("two-key".to_owned(), "two-value".to_owned()); - - let image = HelloWorld { volumes, env_vars }; - - let command = - Client::build_run_command(&RunnableImage::from(image), Command::new("docker")); - - println!("Executing command: {command:?}"); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "-e" "one-key=one-value" "-e" "two-key=two-value" "-v" "one-from:one-dest" "-v" "two-from:two-dest" "-P" "-d" "hello-world:latest""# - ); - } - - #[test] - fn cli_run_command_should_expose_all_ports_if_no_explicit_mapping_requested() { - let image = GenericImage::new("hello", "0.0"); - - let command = - Client::build_run_command(&RunnableImage::from(image), Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "-P" "-d" "hello:0.0""# - ); - } - - #[test] - fn cli_run_command_should_expose_requested_ports() { - let image = GenericImage::new("hello", "0.0"); - - let image = RunnableImage::from(image) - .with_mapped_port((123, 456)) - .with_mapped_port((555, 888)); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "-p" "123:456" "-p" "555:888" "-d" "hello:0.0""# - ); - } - - #[test] - #[should_panic(expected = "Failed to remove docker container")] - fn cli_rm_command_should_panic_on_invalid_container() { - let docker = Cli::default(); - docker.rm("!INVALID_NAME_DUE_TO_SYMBOLS!"); - unreachable!() - } - - #[test] - fn cli_run_command_should_include_network() { - let image = GenericImage::new("hello", "0.0"); - - let image = RunnableImage::from(image).with_network("awesome-net"); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "--network=awesome-net" "-P" "-d" "hello:0.0""# - ); - } - - #[test] - fn cli_run_command_should_include_name() { - let image = GenericImage::new("hello", "0.0"); - let image = RunnableImage::from(image).with_container_name("hello_container"); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "--name=hello_container" "-P" "-d" "hello:0.0""# - ); - } - - #[test] - fn cli_run_command_with_container_network_should_not_expose_ports() { - let image = GenericImage::new("hello", "0.0"); - let image = RunnableImage::from(image) - .with_container_name("hello_container") - .with_network("container:the_other_one"); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "--network=container:the_other_one" "--name=hello_container" "-d" "hello:0.0""# - ); - } - - #[test] - fn cli_run_command_should_include_privileged() { - let image = GenericImage::new("hello", "0.0"); - let image = RunnableImage::from(image).with_privileged(true); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "--privileged" "-P" "-d" "hello:0.0""# - ); - } - - #[test] - fn cli_run_command_should_include_shm_size() { - let image = GenericImage::new("hello", "0.0"); - let image = RunnableImage::from(image).with_shm_size(1_000_000); - let command = Client::build_run_command(&image, Command::new("docker")); - - assert_eq!( - format!("{command:?}"), - r#""docker" "run" "--shm-size=1000000" "-P" "-d" "hello:0.0""# - ); - } - - #[test] - fn should_create_network_if_image_needs_it_and_drop_it_in_the_end() { - { - let docker = Cli::default(); - - assert!(!docker.inner.network_exists("awesome-net")); - - // creating the first container creates the network - let _container1 = - docker.run(RunnableImage::from(HelloWorld::default()).with_network("awesome-net")); - // creating a 2nd container doesn't fail because check if the network exists already - let _container2 = - docker.run(RunnableImage::from(HelloWorld::default()).with_network("awesome-net")); - - assert!(docker.inner.network_exists("awesome-net")); - } - - { - let docker = Cli::default(); - // original client has been dropped, should clean up networks - assert!(!docker.inner.network_exists("awesome-net")) - } - } - - struct FakeEnvAlwaysKeep; - - impl GetEnvValue for FakeEnvAlwaysKeep { - fn get_env_value(_: &str) -> Option { - Some("keep".to_owned()) - } - } - - #[test] - fn should_not_delete_network_if_command_is_keep() { - let network_name = "foobar-net"; - - { - let docker = Cli::new::(); - - assert!(!docker.inner.network_exists(network_name)); - - // creating the first container creates the network - let container1 = - docker.run(RunnableImage::from(HelloWorld::default()).with_network(network_name)); - - assert!(docker.inner.network_exists(network_name)); - - // remove container, so network can get cleaned up after the test - docker.rm(container1.id()); - } - - let docker = Cli::default(); - - assert!( - docker.inner.network_exists(network_name), - "network should still exist after client is dropped" - ); - - docker.inner.delete_networks(vec![network_name]); - } - - #[test] - fn should_wait_for_at_least_one_second_before_fetching_logs() { - let _ = pretty_env_logger::try_init(); - let docker = Cli::default(); - - let before_run = Instant::now(); - let container = docker.run(HelloWorld::default()); - let after_run = Instant::now(); - - let before_logs = Instant::now(); - docker.stdout_logs(container.id()); - let after_logs = Instant::now(); - - const ONE_SEC: Duration = Duration::from_secs(1); - assert!( - (after_run - before_run) > ONE_SEC, - "run completed in less than a second" - ); - assert!( - (after_logs - before_logs) < ONE_SEC, - "log fetching took more than a second" - ); - } -} diff --git a/testcontainers/src/clients/http.rs b/testcontainers/src/clients/http.rs deleted file mode 100644 index 65e6513b..00000000 --- a/testcontainers/src/clients/http.rs +++ /dev/null @@ -1,486 +0,0 @@ -use crate::{ - core::{env, logs::LogStreamAsync, ports::Ports, DockerAsync, Port}, - ContainerAsync, Image, ImageArgs, RunnableImage, -}; -use async_trait::async_trait; -use bollard::{ - container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions}, - image::CreateImageOptions, - models::{ContainerCreateResponse, ContainerInspectResponse, HostConfig, PortBinding}, - network::CreateNetworkOptions, - Docker, -}; -use futures::{executor::block_on, stream::StreamExt, TryStreamExt}; -use std::{ - collections::HashMap, - fmt, io, - sync::{Arc, RwLock}, -}; - -/// A testcontainers client that uses HTTP to communicate with the docker daemon. -/// -/// This client provides an async-based interface. -pub struct Http { - inner: Arc, -} - -/// The internal client. -/// -/// This exists so we don't have to make the outer client clonable and still can have only a single instance around which is important for `Drop` behaviour. -struct Client { - command: env::Command, - bollard: Docker, - created_networks: RwLock>, -} - -impl fmt::Debug for Http { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Http").finish() - } -} - -impl Default for Http { - fn default() -> Self { - Self::new() - } -} - -// public API -impl Http { - pub async fn run(&self, image: impl Into>) -> ContainerAsync<'_, I> { - let image = image.into(); - let mut create_options: Option> = None; - let mut config: Config = Config { - image: Some(image.descriptor()), - host_config: Some(HostConfig::default()), - ..Default::default() - }; - - // shared memory - if let Some(bytes) = image.shm_size() { - config.host_config = config.host_config.map(|mut host_config| { - host_config.shm_size = Some(bytes as i64); - host_config - }); - } - - // create network and add it to container creation - if let Some(network) = image.network() { - config.host_config = config.host_config.map(|mut host_config| { - host_config.network_mode = Some(network.to_string()); - host_config - }); - if self.create_network_if_not_exists(network).await { - let mut guard = self - .inner - .created_networks - .write() - .expect("'failed to lock RwLock'"); - guard.push(network.clone()); - } - } - - // name of the container - if let Some(name) = image.container_name() { - create_options = Some(CreateContainerOptions { - name: name.to_owned(), - platform: None, - }) - } - - // handle environment variables - let envs: Vec = image - .env_vars() - .into_iter() - .map(|(k, v)| format!("{k}={v}")) - .collect(); - config.env = Some(envs); - - // volumes - let vols: HashMap> = image - .volumes() - .into_iter() - .map(|(orig, dest)| (format!("{orig}:{dest}"), HashMap::new())) - .collect(); - config.volumes = Some(vols); - - // entrypoint - if let Some(entrypoint) = image.entrypoint() { - config.entrypoint = Some(vec![entrypoint]); - } - - // exposed ports - config.exposed_ports = Some( - image - .expose_ports() - .into_iter() - .map(|p| (format!("{p}/tcp"), HashMap::new())) - .collect(), - ); - - // ports - if image.ports().is_some() || image.expose_ports().len() > 0 { - let empty: Vec = Vec::new(); - let bindings = image - .ports() - .as_ref() - .unwrap_or(&empty) - .iter() - .map(|p| { - ( - format!("{}/tcp", p.internal), - Some(vec![PortBinding { - host_ip: Some(String::from("127.0.0.1")), - host_port: Some(p.local.to_string()), - }]), - ) - }) - .chain( - image - .expose_ports() - .into_iter() - .map(|p| (format!("{}/tcp", p), Some(vec![PortBinding::default()]))), - ); - - config.host_config = config.host_config.map(|mut host_config| { - host_config.port_bindings = Some(bindings.collect()); - host_config - }); - } else { - config.host_config = config.host_config.map(|mut host_config| { - host_config.publish_all_ports = Some(true); - host_config - }); - } - - let args = image - .args() - .clone() - .into_iterator() - .collect::>(); - if !args.is_empty() { - config.cmd = Some(args); - } - - // create the container with options - let create_result = self - .create_container(create_options.clone(), config.clone()) - .await; - let container_id = { - match create_result { - Ok(container) => container.id, - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, .. - }) => { - { - let pull_options = Some(CreateImageOptions { - from_image: image.descriptor(), - ..Default::default() - }); - let mut pulling = self.inner.bollard.create_image(pull_options, None, None); - while let Some(result) = pulling.next().await { - if result.is_err() { - result.unwrap(); - } - } - } - self.create_container(create_options, config) - .await - .unwrap() - .id - } - Err(err) => panic!("{}", err), - } - }; - - #[cfg(feature = "watchdog")] - if self.inner.command == env::Command::Remove { - crate::watchdog::register(container_id.clone()); - } - - self.inner - .bollard - .start_container::(&container_id, None) - .await - .unwrap(); - - let client = Http { - inner: self.inner.clone(), - }; - - ContainerAsync::new(container_id, client, image, self.inner.command).await - } -} - -impl Http { - fn new() -> Self { - Http { - inner: Arc::new(Client { - command: env::command::().unwrap_or_default(), - bollard: Docker::connect_with_http_defaults().unwrap(), - created_networks: RwLock::new(Vec::new()), - }), - } - } - - async fn create_network_if_not_exists(&self, network: &str) -> bool { - if !network_exists(&self.inner.bollard, network).await { - self.inner - .bollard - .create_network(CreateNetworkOptions { - name: network.to_owned(), - ..Default::default() - }) - .await - .unwrap(); - - return true; - } - - false - } - - async fn create_container( - &self, - options: Option>, - config: Config, - ) -> Result { - self.inner.bollard.create_container(options, config).await - } - - fn logs(&self, container_id: String, options: LogsOptions) -> LogStreamAsync<'_> { - let stream = self - .inner - .bollard - .logs(&container_id, Some(options)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) - .map(|chunk| { - let bytes = chunk?.into_bytes(); - let str = std::str::from_utf8(bytes.as_ref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - Ok(String::from(str)) - }) - .boxed(); - - LogStreamAsync::new(stream) - } -} - -async fn network_exists(client: &Docker, network: &str) -> bool { - let networks = client.list_networks::(None).await.unwrap(); - networks - .iter() - .any(|i| matches!(&i.name, Some(name) if name == network)) -} - -impl Drop for Client { - fn drop(&mut self) { - match self.command { - env::Command::Remove => { - let guard = self.created_networks.read().expect("failed to lock RwLock"); - for network in guard.iter() { - block_on(async { self.bollard.remove_network(network).await.unwrap() }); - } - } - env::Command::Keep => {} - } - } -} - -#[async_trait] -impl DockerAsync for Http { - fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs( - id.to_owned(), - LogsOptions { - follow: true, - stdout: true, - tail: "all".to_owned(), - ..Default::default() - }, - ) - } - - fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> { - self.logs( - id.to_owned(), - LogsOptions { - follow: true, - stderr: true, - tail: "all".to_owned(), - ..Default::default() - }, - ) - } - - async fn ports(&self, id: &str) -> Ports { - self.inspect(id) - .await - .network_settings - .unwrap_or_default() - .ports - .map(Ports::from) - .unwrap_or_default() - } - - async fn inspect(&self, id: &str) -> ContainerInspectResponse { - self.inner - .bollard - .inspect_container(id, None) - .await - .unwrap() - } - - async fn rm(&self, id: &str) { - self.inner - .bollard - .remove_container( - id, - Some(RemoveContainerOptions { - force: true, - v: true, - ..Default::default() - }), - ) - .await - .unwrap(); - } - - async fn stop(&self, id: &str) { - self.inner.bollard.stop_container(id, None).await.unwrap(); - } - - async fn start(&self, id: &str) { - self.inner - .bollard - .start_container::(id, None) - .await - .unwrap(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::images::generic::GenericImage; - - async fn inspect(client: &bollard::Docker, id: &str) -> ContainerInspectResponse { - client.inspect_container(id, None).await.unwrap() - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_expose_all_ports_if_no_explicit_mapping_requested() { - let docker = Http::new(); - let image = GenericImage::new("hello-world", "latest"); - let container = docker.run(image).await; - - // inspect volume and env - let container_details = inspect(&docker.inner.bollard, container.id()).await; - let publish_ports = container_details - .host_config - .unwrap() - .publish_all_ports - .unwrap(); - assert_eq!(publish_ports, true, "publish_all_ports must be `true`"); - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_map_exposed_port() { - let docker = Http::new(); - let image = GenericImage::new("simple_web_server", "latest").with_exposed_port(5000); - let container = docker.run(image).await; - container.get_host_port_ipv4(5000).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_expose_only_requested_ports() { - let docker = Http::new(); - let image = GenericImage::new("hello-world", "latest"); - let image = RunnableImage::from(image) - .with_mapped_port((123, 456)) - .with_mapped_port((555, 888)); - let container = docker.run(image).await; - - let container_details = inspect(&docker.inner.bollard, container.id()).await; - - let port_bindings = container_details - .host_config - .unwrap() - .port_bindings - .unwrap(); - assert!(port_bindings.contains_key("456/tcp")); - assert!(port_bindings.contains_key("888/tcp")); - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_include_network() { - let docker = Http::new(); - let image = GenericImage::new("hello-world", "latest"); - let image = RunnableImage::from(image).with_network("awesome-net-1"); - let container = docker.run(image).await; - - let container_details = inspect(&docker.inner.bollard, container.id()).await; - let networks = container_details - .network_settings - .unwrap() - .networks - .unwrap(); - - assert!( - networks.contains_key("awesome-net-1"), - "Networks is {networks:?}" - ); - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_include_name() { - let docker = Http::new(); - let image = GenericImage::new("hello-world", "latest"); - let image = RunnableImage::from(image).with_container_name("hello_container"); - let container = docker.run(image).await; - - let container_details = inspect(&docker.inner.bollard, container.id()).await; - let container_name = container_details.name.unwrap(); - assert!(container_name.ends_with("hello_container")); - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_should_create_network_if_image_needs_it_and_drop_it_in_the_end() { - let client = bollard::Docker::connect_with_http_defaults().unwrap(); - let hello_world = GenericImage::new("hello-world", "latest"); - - { - let docker = Http::new(); - assert!(!network_exists(&client, "awesome-net-2").await); - - // creating the first container creates the network - let _container1 = docker - .run(RunnableImage::from(hello_world.clone()).with_network("awesome-net-2")) - .await; - - // creating a 2nd container doesn't fail because check if the network exists already - let _container2 = docker - .run(RunnableImage::from(hello_world).with_network("awesome-net-2")) - .await; - - assert!(network_exists(&client, "awesome-net-2").await); - } - - // client has been dropped, should clean up networks - assert!(!network_exists(&client, "awesome-net-2").await) - } - - #[tokio::test(flavor = "multi_thread")] - async fn http_run_command_should_set_shared_memory_size() { - let docker = Http::new(); - let image = GenericImage::new("hello-world", "latest"); - let image = RunnableImage::from(image).with_shm_size(1_000_000); - let container = docker.run(image).await; - - let container_details = inspect(&docker.inner.bollard, container.id()).await; - let shm_size = container_details.host_config.unwrap().shm_size.unwrap(); - - assert_eq!(shm_size, 1_000_000); - } -} diff --git a/testcontainers/src/core.rs b/testcontainers/src/core.rs index 49a5b656..91be08e8 100644 --- a/testcontainers/src/core.rs +++ b/testcontainers/src/core.rs @@ -1,20 +1,15 @@ -pub(crate) use container::Docker; -#[cfg(feature = "experimental")] -pub(crate) use container_async::DockerAsync; - -pub use self::{ - container::Container, - image::{ContainerState, ExecCommand, Host, Image, ImageArgs, Port, RunnableImage, WaitFor}, +pub use self::image::{ + ContainerState, ExecCommand, Host, Image, ImageArgs, Port, RunnableImage, WaitFor, }; -#[cfg(feature = "experimental")] -pub use self::container_async::ContainerAsync; +pub use self::containers::*; -mod container; -#[cfg(feature = "experimental")] -mod container_async; -pub mod env; mod image; +pub(crate) mod client; +pub(crate) mod containers; +pub(crate) mod env; pub(crate) mod logs; +pub(crate) mod macros; +pub(crate) mod network; pub(crate) mod ports; diff --git a/testcontainers/src/core/client.rs b/testcontainers/src/core/client.rs new file mode 100644 index 00000000..6d46655d --- /dev/null +++ b/testcontainers/src/core/client.rs @@ -0,0 +1,285 @@ +use crate::core::{env, logs::LogStreamAsync, ports::Ports, WaitFor}; +use bollard::{ + container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions}, + exec::{CreateExecOptions, StartExecOptions, StartExecResults}, + image::CreateImageOptions, + network::CreateNetworkOptions, + Docker, +}; +use bollard_stubs::models::{ContainerCreateResponse, ContainerInspectResponse, HealthStatusEnum}; +use futures::{StreamExt, TryStreamExt}; +use std::{io, time::Duration}; + +mod bollard_client; +mod factory; + +/// The desired log stream. +pub(crate) enum DesiredLogStream { + Stdout, + Stderr, +} + +/// The internal client. +pub(crate) struct Client { + pub(crate) config: env::Config, + pub(crate) bollard: Docker, +} + +impl Client { + async fn new() -> Client { + let config = env::Config::load::().await; + let bollard = bollard_client::init(&config); + + Client { config, bollard } + } + + pub(crate) fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_> { + self.logs(id, DesiredLogStream::Stdout) + } + + pub(crate) fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_> { + self.logs(id, DesiredLogStream::Stderr) + } + + pub(crate) async fn ports(&self, id: &str) -> Ports { + self.inspect(id) + .await + .network_settings + .unwrap_or_default() + .ports + .map(Ports::from) + .unwrap_or_default() + } + + pub(crate) async fn inspect(&self, id: &str) -> ContainerInspectResponse { + self.bollard.inspect_container(id, None).await.unwrap() + } + + pub(crate) async fn rm(&self, id: &str) { + self.bollard + .remove_container( + id, + Some(RemoveContainerOptions { + force: true, + v: true, + ..Default::default() + }), + ) + .await + .unwrap(); + } + + pub(crate) async fn stop(&self, id: &str) { + self.bollard.stop_container(id, None).await.unwrap(); + } + + pub(crate) async fn start(&self, id: &str) { + self.bollard + .start_container::(id, None) + .await + .unwrap(); + } + + pub(crate) async fn exec( + &self, + container_id: &str, + cmd: Vec, + desired_log: DesiredLogStream, + ) -> LogStreamAsync<'_> { + let (stdout, stderr) = match desired_log { + DesiredLogStream::Stdout => (true, false), + DesiredLogStream::Stderr => (false, true), + }; + + let config = CreateExecOptions { + cmd: Some(cmd), + attach_stdout: Some(stdout), + attach_stderr: Some(stderr), + ..Default::default() + }; + + let exec = self + .bollard + .create_exec(container_id, config) + .await + .expect("failed to create exec"); + + let res = self + .bollard + .start_exec( + &exec.id, + Some(StartExecOptions { + detach: false, + tty: false, + output_capacity: None, + }), + ) + .await + .expect("failed to start exec"); + + match res { + StartExecResults::Attached { output, .. } => { + let stream = output + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .map(|chunk| { + let bytes = chunk?.into_bytes(); + let str = std::str::from_utf8(bytes.as_ref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(str.to_string()) + }) + .boxed(); + + LogStreamAsync::new(stream) + } + StartExecResults::Detached => unreachable!("detach is false"), + } + } + + pub(crate) async fn block_until_ready(&self, id: &str, ready_conditions: &[WaitFor]) { + log::debug!("Waiting for container {id} to be ready"); + + for condition in ready_conditions { + match condition { + WaitFor::StdOutMessage { message } => self + .stdout_logs(id) + .wait_for_message(message) + .await + .unwrap(), + WaitFor::StdErrMessage { message } => self + .stderr_logs(id) + .wait_for_message(message) + .await + .unwrap(), + WaitFor::Duration { length } => { + tokio::time::sleep(*length).await; + } + WaitFor::Healthcheck => loop { + use HealthStatusEnum::*; + + let health_status = self + .inspect(id) + .await + .state + .unwrap_or_else(|| panic!("Container state not available")) + .health + .unwrap_or_else(|| panic!("Health state not available")) + .status; + + match health_status { + Some(HEALTHY) => break, + None | Some(EMPTY) | Some(NONE) => { + panic!("Healthcheck not configured for container") + } + Some(UNHEALTHY) => panic!("Healthcheck reports unhealthy"), + Some(STARTING) => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + }, + WaitFor::Nothing => {} + } + } + + log::debug!("Container {id} is now ready!"); + } + + fn logs(&self, container_id: &str, desired_log: DesiredLogStream) -> LogStreamAsync<'_> { + let (stdout, stderr) = match desired_log { + DesiredLogStream::Stdout => (true, false), + DesiredLogStream::Stderr => (false, true), + }; + let options = LogsOptions { + follow: true, + stdout, + stderr, + tail: "all".to_owned(), + ..Default::default() + }; + + let stream = self + .bollard + .logs(container_id, Some(options)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .map(|chunk| { + let bytes = chunk?.into_bytes(); + let str = std::str::from_utf8(bytes.as_ref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(String::from(str)) + }) + .boxed(); + + LogStreamAsync::new(stream) + } + + /// Creates a network with given name and returns an ID + pub(crate) async fn create_network(&self, name: &str) -> Option { + let network = self + .bollard + .create_network(CreateNetworkOptions { + name: name.to_owned(), + check_duplicate: true, + ..Default::default() + }) + .await + .unwrap(); + + network.id + } + + pub(crate) async fn create_container( + &self, + options: Option>, + config: Config, + ) -> Result { + self.bollard.create_container(options, config).await + } + + pub(crate) async fn pull_image(&self, descriptor: &str) { + let pull_options = Some(CreateImageOptions { + from_image: descriptor, + ..Default::default() + }); + let mut pulling = self.bollard.create_image(pull_options, None, None); + while let Some(result) = pulling.next().await { + result.unwrap_or_else(|err| { + panic!("Error pulling the image: '{descriptor}', error: {err}") + }); + } + } + + pub(crate) async fn network_exists(&self, network: &str) -> bool { + let networks = self.bollard.list_networks::(None).await.unwrap(); + networks + .iter() + .any(|i| matches!(&i.name, Some(name) if name == network)) + } + + pub(crate) async fn remove_network(&self, network: &str) { + self.bollard + .remove_network(network) + .await + .expect("Failed to remove network"); + } + + pub(crate) async fn docker_host_ip_address(&self) -> String { + let docker_host = self.config.docker_host(); + match docker_host.scheme() { + "tcp" | "http" | "https" => docker_host.host().unwrap().to_string(), + "unix" | "npipe" => self + .bollard + .inspect_network::("bridge", None) + .await + .ok() + .and_then(|net| net.ipam) + .and_then(|ipam| ipam.config) + .unwrap_or_default() + .into_iter() + .filter_map(|ipam_cfg| ipam_cfg.gateway) + .next() + .unwrap_or_else(|| "localhost".to_string()), + _ => unreachable!("docker host is already validated in the config"), + } + } +} diff --git a/testcontainers/src/core/client/bollard_client.rs b/testcontainers/src/core/client/bollard_client.rs new file mode 100644 index 00000000..53de5a30 --- /dev/null +++ b/testcontainers/src/core/client/bollard_client.rs @@ -0,0 +1,53 @@ +use crate::core::env; +use bollard::{Docker, API_DEFAULT_VERSION}; +use std::time::Duration; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2 * 60); + +pub(super) fn init(config: &env::Config) -> Docker { + let host = config.docker_host(); + + match host.scheme() { + "https" => connect_with_ssl(config), + "http" | "tcp" => { + if config.tls_verify() { + connect_with_ssl(config) + } else { + Docker::connect_with_http( + host.as_str(), + DEFAULT_TIMEOUT.as_secs(), + API_DEFAULT_VERSION, + ) + } + } + #[cfg(unix)] + "unix" => Docker::connect_with_unix( + host.as_str(), + DEFAULT_TIMEOUT.as_secs(), + API_DEFAULT_VERSION, + ), + #[cfg(windows)] + "npipe" => Docker::connect_with_named_pipe( + host.as_str(), + DEFAULT_TIMEOUT.as_secs(), + API_DEFAULT_VERSION, + ), + scheme => { + panic!("Unsupported scheme: {scheme}"); + } + } + .expect("Failed to connect to Docker") +} + +fn connect_with_ssl(config: &env::Config) -> Result { + let cert_path = config.cert_path().expect("cert path not found"); + + Docker::connect_with_ssl( + config.docker_host().as_str(), + &cert_path.join("key.pem"), + &cert_path.join("cert.pem"), + &cert_path.join("ca.pem"), + DEFAULT_TIMEOUT.as_secs(), + API_DEFAULT_VERSION, + ) +} diff --git a/testcontainers/src/core/client/factory.rs b/testcontainers/src/core/client/factory.rs new file mode 100644 index 00000000..b7238760 --- /dev/null +++ b/testcontainers/src/core/client/factory.rs @@ -0,0 +1,29 @@ +use crate::core::client::Client; +use std::sync::{Arc, OnceLock, Weak}; +use tokio::sync::Mutex; + +// We use `Weak` in order not to prevent `Drop` of being called. +// Instead, we re-create the client if it was dropped and asked one more time. +// This way we provide on `Drop` guarantees and avoid unnecessary instantiation at the same time. +static DOCKER_CLIENT: OnceLock>> = OnceLock::new(); + +impl Client { + /// Returns a client instance, reusing already created or initializing a new one. + // We don't expose this function to the public API for now. We can do it later if needed. + pub(crate) async fn lazy_client() -> Arc { + let mut guard = DOCKER_CLIENT + .get_or_init(|| Mutex::new(Weak::new())) + .lock() + .await; + let maybe_client = guard.upgrade(); + + if let Some(client) = maybe_client { + client + } else { + let client = Arc::new(Client::new().await); + *guard = Arc::downgrade(&client); + + client + } + } +} diff --git a/testcontainers/src/core/container.rs b/testcontainers/src/core/container.rs deleted file mode 100644 index 068c9926..00000000 --- a/testcontainers/src/core/container.rs +++ /dev/null @@ -1,309 +0,0 @@ -use std::{fmt, marker::PhantomData, net::IpAddr, str::FromStr}; - -use bollard_stubs::models::ContainerInspectResponse; - -use crate::{ - core::{env::Command, logs::LogStream, ports::Ports, ExecCommand, WaitFor}, - Image, RunnableImage, -}; - -/// Represents a running docker container. -/// -/// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they go out of scope: -/// -/// ```rust,no_run -/// use testcontainers::*; -/// #[test] -/// fn a_test() { -/// let docker = clients::Cli::default(); -/// -/// { -/// let container = docker.run(MyImage::default()); -/// -/// // Docker container is stopped/removed at the end of this scope. -/// } -/// } -/// ``` -/// -/// [drop_impl]: struct.Container.html#impl-Drop -pub struct Container<'d, I: Image> { - id: String, - docker_client: Box, - image: RunnableImage, - command: Command, - ports: Ports, - /// Tracks the lifetime of the client to make sure the container is dropped before the client. - client_lifetime: PhantomData<&'d ()>, -} - -impl<'d, I> fmt::Debug for Container<'d, I> -where - I: fmt::Debug + Image, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Container") - .field("id", &self.id) - .field("image", &self.image) - .field("command", &self.command) - .finish() - } -} - -impl<'d, I> Container<'d, I> -where - I: Image, -{ - /// Constructs a new container given an id, a docker client and the image. - /// - /// This function will block the current thread (if [`wait_until_ready`] is implemented correctly) until the container is actually ready to be used. - /// - /// [`wait_until_ready`]: trait.Image.html#tymethod.wait_until_ready - pub(crate) fn new( - id: String, - docker_client: impl Docker + 'static, - image: RunnableImage, - command: Command, - ) -> Self { - let ports = docker_client.ports(&id); - Self { - id, - docker_client: Box::new(docker_client), - image, - command, - ports, - client_lifetime: PhantomData, - } - } - - /// Returns a reference to the [`Image`] of this container. - /// - /// [`Image`]: trait.Image.html - pub fn image(&self) -> &I { - self.image.inner() - } - - /// Returns a reference to the [`arguments`] of the [`Image`] of this container. - /// - /// Access to this is useful to retrieve relevant information which had been passed as [`arguments`] - /// - /// [`Image`]: trait.Image.html - /// [`arguments`]: trait.Image.html#associatedtype.Args - pub fn image_args(&self) -> &I::Args { - self.image.args() - } - - pub fn ports(&self) -> Ports { - self.ports.clone() - } -} - -impl<'d, I> Container<'d, I> -where - I: Image, -{ - /// Returns the id of this container. - pub fn id(&self) -> &str { - &self.id - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv4 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - #[deprecated( - since = "0.13.1", - note = "Use `get_host_port_ipv4()` or `get_host_port_ipv6()` instead." - )] - pub fn get_host_port(&self, internal_port: u16) -> u16 { - self.get_host_port_ipv4(internal_port) - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv4 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - pub fn get_host_port_ipv4(&self, internal_port: u16) -> u16 { - self.ports - .map_to_host_port_ipv4(internal_port) - .unwrap_or_else(|| { - panic!( - "container {} does not expose port {}", - self.id, internal_port - ) - }) - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv6 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - pub fn get_host_port_ipv6(&self, internal_port: u16) -> u16 { - self.ports - .map_to_host_port_ipv6(internal_port) - .unwrap_or_else(|| { - panic!( - "container {} does not expose port {}", - self.id, internal_port - ) - }) - } - - /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress - pub fn get_bridge_ip_address(&self) -> IpAddr { - let settings = self - .docker_client - .inspect(&self.id) - .network_settings - .unwrap_or_else(|| panic!("container {} has no network settings", self.id)); - - let mut networks = settings - .networks - .unwrap_or_else(|| panic!("container {} has no any networks", self.id)); - - let bridge_name = self - .image - .network() - .clone() - .or(settings.bridge) - .unwrap_or_else(|| panic!("container {} has missing bridge name", self.id)); - - let ip = networks - .remove(&bridge_name) - .and_then(|network| network.ip_address) - .unwrap_or_else(|| panic!("container {} has missing bridge IP", self.id)); - - IpAddr::from_str(&ip) - .unwrap_or_else(|_| panic!("container {} has invalid bridge IP", self.id)) - } - - pub fn exec(&self, cmd: ExecCommand) -> ExecOutput { - let ExecCommand { - cmd, - ready_conditions, - } = cmd; - - log::debug!("Executing command {:?}", cmd); - - let output = self.docker_client.exec(self.id(), cmd); - - self.docker_client - .block_until_ready(self.id(), ready_conditions); - - ExecOutput { - stdout: output.stdout, - stderr: output.stderr, - } - } - - pub fn stop(&self) { - log::debug!("Stopping docker container {}", self.id); - - self.docker_client.stop(&self.id) - } - - pub fn start(&self) { - self.docker_client.start(&self.id); - } - - pub fn rm(&self) { - log::debug!("Deleting docker container {}", self.id); - - self.docker_client.rm(&self.id) - } -} - -/// Represents an output of `exec` command. -/// `stdout` & `stderr` is represented as bytes because it might contain non-utf chars and responsibility should be on the caller end. -#[derive(Debug)] -pub struct ExecOutput { - pub stdout: Vec, - pub stderr: Vec, -} - -/// The destructor implementation for a Container. -/// -/// As soon as the container goes out of scope, the destructor will either only stop or delete the docker container, depending on the [`Command`] value. -/// -/// Setting it to `keep` will stop container. -/// Setting it to `remove` will remove it. -impl<'d, I> Drop for Container<'d, I> -where - I: Image, -{ - fn drop(&mut self) { - match self.command { - Command::Keep => {} - Command::Remove => self.rm(), - } - #[cfg(feature = "watchdog")] - crate::watchdog::unregister(self.id()); - } -} - -/// Defines operations that we need to perform on docker containers and other entities. -/// -/// This trait is pub(crate) because it should not be used directly by users but only represents an internal abstraction that allows containers to be generic over the client they have been started with. -/// All functionality of this trait is available on [`Container`]s directly. -pub(crate) trait Docker: Sync + Send { - fn stdout_logs(&self, id: &str) -> LogStream; - fn stderr_logs(&self, id: &str) -> LogStream; - fn ports(&self, id: &str) -> Ports; - fn inspect(&self, id: &str) -> ContainerInspectResponse; - fn rm(&self, id: &str); - fn stop(&self, id: &str); - fn start(&self, id: &str); - fn exec(&self, id: &str, cmd: String) -> std::process::Output; - fn block_until_ready(&self, id: &str, ready_conditions: Vec); -} - -#[cfg(test)] -mod test { - use super::*; - - #[derive(Debug, Default)] - pub struct HelloWorld; - - impl Image for HelloWorld { - type Args = (); - - fn name(&self) -> String { - "hello-world".to_owned() - } - - fn tag(&self) -> String { - "latest".to_owned() - } - - fn ready_conditions(&self) -> Vec { - vec![WaitFor::message_on_stdout("Hello from Docker!")] - } - } - - #[test] - fn container_should_be_send_and_sync() { - assert_send_and_sync::>(); - } - - fn assert_send_and_sync() {} -} diff --git a/testcontainers/src/core/container_async.rs b/testcontainers/src/core/container_async.rs deleted file mode 100644 index 4d0fd001..00000000 --- a/testcontainers/src/core/container_async.rs +++ /dev/null @@ -1,286 +0,0 @@ -use crate::{ - core::{env, env::Command, logs::LogStreamAsync, ports::Ports, WaitFor}, - Image, RunnableImage, -}; -use async_trait::async_trait; -use bollard::models::{ContainerInspectResponse, HealthStatusEnum}; -use futures::executor::block_on; -use std::{fmt, marker::PhantomData, net::IpAddr, str::FromStr, time::Duration}; -use tokio::time::sleep; - -/// Represents a running docker container that has been started using an async client.. -/// -/// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they -/// go out of scope. However, async drop is not available in rust yet. This implementation -/// is using block_on. Therefore required #[tokio::test(flavor = "multi_thread")] in your test -/// to use drop effectively. Otherwise your test might stall: -/// -/// ```rust -/// use testcontainers::*; -/// #[tokio::test(flavor = "multi_thread")] -/// async fn a_test() { -/// let docker = clients::Http::default(); -/// -/// { -/// let container = docker.run(MyImage::default()).await; -/// -/// // Docker container is stopped/removed at the end of this scope. -/// } -/// } -/// ``` -/// -/// [drop_impl]: struct.ContainerAsync.html#impl-Drop -pub struct ContainerAsync<'d, I: Image> { - id: String, - docker_client: Box, - image: RunnableImage, - command: Command, - - /// Tracks the lifetime of the client to make sure the container is dropped before the client. - client_lifetime: PhantomData<&'d ()>, -} - -impl<'d, I> ContainerAsync<'d, I> -where - I: Image, -{ - /// Returns the id of this container. - pub fn id(&self) -> &str { - &self.id - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv4 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - #[deprecated( - since = "0.13.1", - note = "Use `get_host_port_ipv4()` or `get_host_port_ipv6()` instead." - )] - pub async fn get_host_port(&self, internal_port: u16) -> u16 { - self.get_host_port_ipv4(internal_port).await - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv4 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - pub async fn get_host_port_ipv4(&self, internal_port: u16) -> u16 { - self.docker_client - .ports(&self.id) - .await - .map_to_host_port_ipv4(internal_port) - .unwrap_or_else(|| { - panic!( - "container {} does not expose port {}", - self.id, internal_port - ) - }) - } - - /// Returns the mapped host port for an internal port of this docker container, on the host's - /// IPv6 interfaces. - /// - /// This method does **not** magically expose the given port, it simply performs a mapping on - /// the already exposed ports. If a docker container does not expose a port, this method will panic. - /// - /// # Panics - /// - /// This method panics if the given port is not mapped. - /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container - /// is unlikely to be useful. - pub async fn get_host_port_ipv6(&self, internal_port: u16) -> u16 { - self.docker_client - .ports(&self.id) - .await - .map_to_host_port_ipv6(internal_port) - .unwrap_or_else(|| { - panic!( - "container {} does not expose port {}", - self.id, internal_port - ) - }) - } - - /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress - pub async fn get_bridge_ip_address(&self) -> IpAddr { - let result = self.docker_client.inspect(&self.id).await; - - let settings = result - .network_settings - .unwrap_or_else(|| panic!("container {} has no network settings", self.id)); - - let mut networks = settings - .networks - .unwrap_or_else(|| panic!("container {} has no any networks", self.id)); - - let bridge_name = self - .image - .network() - .clone() - .or(settings.bridge) - .unwrap_or_else(|| panic!("container {} has missing bridge name", self.id)); - - let ip = networks - .remove(&bridge_name) - .and_then(|network| network.ip_address) - .unwrap_or_else(|| panic!("container {} has missing bridge IP", self.id)); - - IpAddr::from_str(&ip) - .unwrap_or_else(|_| panic!("container {} has invalid bridge IP", self.id)) - } - - pub async fn start(&self) { - self.docker_client.start(&self.id).await - } - - pub async fn stop(&self) { - log::debug!("Stopping docker container {}", self.id); - - self.docker_client.stop(&self.id).await - } - - pub async fn rm(self) { - log::debug!("Deleting docker container {}", self.id); - - self.docker_client.rm(&self.id).await - } - - async fn drop_async(&self) { - match self.command { - env::Command::Remove => self.docker_client.rm(&self.id).await, - env::Command::Keep => {} - } - #[cfg(feature = "watchdog")] - crate::watchdog::unregister(self.id()); - } -} - -impl<'d, I> fmt::Debug for ContainerAsync<'d, I> -where - I: fmt::Debug + Image, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ContainerAsync") - .field("id", &self.id) - .field("image", &self.image) - .finish() - } -} - -/// Represents Docker operations as an async trait. -/// -/// This trait is `pub(crate)` to make sure we can make changes to this API without breaking clients. -/// Users should interact through the [`ContainerAsync`] API. -#[async_trait] -pub(crate) trait DockerAsync -where - Self: Sync, -{ - fn stdout_logs(&self, id: &str) -> LogStreamAsync<'_>; - fn stderr_logs(&self, id: &str) -> LogStreamAsync<'_>; - async fn ports(&self, id: &str) -> Ports; - async fn inspect(&self, id: &str) -> ContainerInspectResponse; - async fn rm(&self, id: &str); - async fn stop(&self, id: &str); - async fn start(&self, id: &str); -} - -impl<'d, I> ContainerAsync<'d, I> -where - I: Image, -{ - /// Constructs a new container given an id, a docker client and the image. - /// ContainerAsync::new().await - pub(crate) async fn new( - id: String, - docker_client: impl DockerAsync + 'static, - image: RunnableImage, - command: env::Command, - ) -> ContainerAsync<'d, I> { - let container = ContainerAsync { - id, - docker_client: Box::new(docker_client), - image, - command, - client_lifetime: PhantomData, - }; - - container.block_until_ready().await; - - container - } - - async fn block_until_ready(&self) { - log::debug!("Waiting for container {} to be ready", self.id); - - for condition in self.image.ready_conditions() { - match condition { - WaitFor::StdOutMessage { message } => self - .docker_client - .stdout_logs(&self.id) - .wait_for_message(&message) - .await - .unwrap(), - WaitFor::StdErrMessage { message } => self - .docker_client - .stderr_logs(&self.id) - .wait_for_message(&message) - .await - .unwrap(), - WaitFor::Duration { length } => { - tokio::time::sleep(length).await; - } - WaitFor::Healthcheck => loop { - use HealthStatusEnum::*; - - let health_status = self - .docker_client - .inspect(&self.id) - .await - .state - .unwrap_or_else(|| panic!("Container state not available")) - .health - .unwrap_or_else(|| panic!("Health state not available")) - .status; - - match health_status { - Some(HEALTHY) => break, - None | Some(EMPTY) | Some(NONE) => { - panic!("Healthcheck not configured for container") - } - Some(UNHEALTHY) => panic!("Healthcheck reports unhealthy"), - Some(STARTING) => sleep(Duration::from_millis(100)).await, - } - panic!("Healthcheck for the container is not configured"); - }, - WaitFor::Nothing => {} - } - } - - log::debug!("Container {} is now ready!", self.id); - } -} - -impl<'d, I> Drop for ContainerAsync<'d, I> -where - I: Image, -{ - fn drop(&mut self) { - block_on(self.drop_async()) - } -} diff --git a/testcontainers/src/core/containers/async_container.rs b/testcontainers/src/core/containers/async_container.rs new file mode 100644 index 00000000..63a45986 --- /dev/null +++ b/testcontainers/src/core/containers/async_container.rs @@ -0,0 +1,276 @@ +use crate::{ + core::{ + client::{Client, DesiredLogStream}, + env, macros, + network::Network, + ports::Ports, + ContainerState, ExecCommand, WaitFor, + }, + Image, RunnableImage, +}; +use std::{fmt, net::IpAddr, str::FromStr, sync::Arc}; +use tokio::runtime::RuntimeFlavor; + +/// Represents a running docker container that has been started using an async client. +/// +/// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they +/// go out of scope. However, async drop is not available in rust yet. This implementation +/// is using block_on. +/// +/// ```rust +/// use testcontainers::*; +/// #[tokio::test] +/// async fn a_test() { +/// let container = MyImage::default().start().await; +/// // Docker container is stopped/removed at the end of this scope. +/// } +/// ``` +/// +/// [drop_impl]: struct.ContainerAsync.html#impl-Drop +pub struct ContainerAsync { + id: String, + image: RunnableImage, + pub(super) docker_client: Arc, + #[cfg_attr(not(feature = "blocking"), allow(dead_code))] + pub(super) network: Option>, + dropped: bool, +} + +impl ContainerAsync +where + I: Image, +{ + /// Constructs a new container given an id, a docker client and the image. + /// ContainerAsync::new().await + pub(crate) async fn new( + id: String, + docker_client: Arc, + image: RunnableImage, + network: Option>, + ) -> ContainerAsync { + let container = ContainerAsync { + id, + image, + docker_client, + network, + dropped: false, + }; + container.block_until_ready().await; + container + } + + /// Returns the id of this container. + pub fn id(&self) -> &str { + &self.id + } + + /// Returns a reference to the [`Image`] of this container. + /// + /// [`Image`]: trait.Image.html + pub fn image(&self) -> &I { + self.image.image() + } + + /// Returns a reference to the [`arguments`] of the [`Image`] of this container. + /// + /// Access to this is useful to retrieve relevant information which had been passed as [`arguments`] + /// + /// [`Image`]: trait.Image.html + /// [`arguments`]: trait.Image.html#associatedtype.Args + pub fn image_args(&self) -> &I::Args { + self.image.args() + } + + pub async fn ports(&self) -> Ports { + self.docker_client.ports(&self.id).await + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv4 interfaces. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will panic. + /// + /// # Panics + /// + /// This method panics if the given port is not mapped. + /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container + /// is unlikely to be useful. + pub async fn get_host_port_ipv4(&self, internal_port: u16) -> u16 { + self.docker_client + .ports(&self.id) + .await + .map_to_host_port_ipv4(internal_port) + .unwrap_or_else(|| { + panic!( + "container {} does not expose (IPV4) port {}", + self.id, internal_port + ) + }) + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv6 interfaces. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will panic. + /// + /// # Panics + /// + /// This method panics if the given port is not mapped. + /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container + /// is unlikely to be useful. + pub async fn get_host_port_ipv6(&self, internal_port: u16) -> u16 { + self.docker_client + .ports(&self.id) + .await + .map_to_host_port_ipv6(internal_port) + .unwrap_or_else(|| { + panic!( + "container {} does not expose (IPV6) port {}", + self.id, internal_port + ) + }) + } + + /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress + pub async fn get_bridge_ip_address(&self) -> IpAddr { + let result = self.docker_client.inspect(&self.id).await; + + let settings = result + .network_settings + .unwrap_or_else(|| panic!("container {} has no network settings", self.id)); + + let mut networks = settings + .networks + .unwrap_or_else(|| panic!("container {} has no any networks", self.id)); + + let bridge_name = self + .image + .network() + .clone() + .or(settings.bridge) + .unwrap_or_else(|| panic!("container {} has missing bridge name", self.id)); + + let ip = networks + .remove(&bridge_name) + .and_then(|network| network.ip_address) + .unwrap_or_else(|| panic!("container {} has missing bridge IP", self.id)); + + IpAddr::from_str(&ip) + .unwrap_or_else(|_| panic!("container {} has invalid bridge IP", self.id)) + } + + /// Returns the host ip address of docker container + pub async fn get_host_ip_address(&self) -> IpAddr { + self.docker_client + .docker_host_ip_address() + .await + .parse() + .expect("invalid host IP") + } + + pub async fn exec(&self, cmd: ExecCommand) { + let ExecCommand { + cmd, + container_ready_conditions, + cmd_ready_condition, + } = cmd; + + log::debug!("Executing command {:?}", cmd); + + let desired_log = if let WaitFor::StdErrMessage { .. } = &cmd_ready_condition { + DesiredLogStream::Stderr + } else { + DesiredLogStream::Stdout + }; + + let output = self.docker_client.exec(&self.id, cmd, desired_log).await; + self.docker_client + .block_until_ready(self.id(), &container_ready_conditions) + .await; + + match cmd_ready_condition { + WaitFor::StdOutMessage { message } | WaitFor::StdErrMessage { message } => { + output.wait_for_message(&message).await.unwrap(); + } + WaitFor::Duration { length } => { + tokio::time::sleep(length).await; + } + _ => {} + } + } + + pub async fn start(&self) { + self.docker_client.start(&self.id).await; + for cmd in self + .image + .exec_after_start(ContainerState::new(self.ports().await)) + { + self.exec(cmd).await; + } + } + + pub async fn stop(&self) { + log::debug!("Stopping docker container {}", self.id); + + self.docker_client.stop(&self.id).await + } + + pub async fn rm(mut self) { + log::debug!("Deleting docker container {}", self.id); + + self.docker_client.rm(&self.id).await; + + #[cfg(feature = "watchdog")] + crate::watchdog::unregister(&self.id); + + self.dropped = true; + } + + async fn block_until_ready(&self) { + self.docker_client + .block_until_ready(self.id(), &self.image().ready_conditions()) + .await; + } +} + +impl fmt::Debug for ContainerAsync +where + I: fmt::Debug + Image, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ContainerAsync") + .field("id", &self.id) + .field("image", &self.image) + .field("command", &self.docker_client.config.command()) + .finish() + } +} + +impl Drop for ContainerAsync +where + I: Image, +{ + fn drop(&mut self) { + if !self.dropped { + let id = self.id.clone(); + let client = self.docker_client.clone(); + let command = self.docker_client.config.command(); + + let drop_task = async move { + log::trace!("Drop was called for container {id}, cleaning up"); + match command { + env::Command::Remove => client.rm(&id).await, + env::Command::Keep => {} + } + #[cfg(feature = "watchdog")] + crate::watchdog::unregister(&id); + + log::debug!("Container {id} was successfully dropped"); + }; + + macros::block_on!(drop_task, "failed to remove container on drop"); + } + } +} diff --git a/testcontainers/src/core/containers/mod.rs b/testcontainers/src/core/containers/mod.rs new file mode 100644 index 00000000..96b6d7fb --- /dev/null +++ b/testcontainers/src/core/containers/mod.rs @@ -0,0 +1,9 @@ +pub(crate) mod async_container; +#[cfg(feature = "blocking")] +pub(crate) mod sync_container; + +pub use async_container::ContainerAsync; + +#[cfg(feature = "blocking")] +#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] +pub use sync_container::Container; diff --git a/testcontainers/src/core/containers/sync_container.rs b/testcontainers/src/core/containers/sync_container.rs new file mode 100644 index 00000000..8975e0b8 --- /dev/null +++ b/testcontainers/src/core/containers/sync_container.rs @@ -0,0 +1,203 @@ +use std::{fmt, net::IpAddr}; + +use crate::{ + core::{env, ports::Ports, ExecCommand}, + ContainerAsync, Image, +}; + +/// Represents a running docker container. +/// +/// Containers have a [`custom destructor`][drop_impl] that removes them as soon as they go out of scope: +/// +/// ```rust,no_run +/// use testcontainers::*; +/// #[test] +/// fn a_test() { +/// let container = MyImage::default().start(); +/// // Docker container is stopped/removed at the end of this scope. +/// } +/// ``` +/// +/// [drop_impl]: struct.Container.html#impl-Drop +pub struct Container { + inner: Option>, +} + +/// Internal representation of a running docker container, to be able to terminate runtime correctly when `Container` is dropped. +struct ActiveContainer { + runtime: tokio::runtime::Runtime, + async_impl: ContainerAsync, +} + +impl fmt::Debug for Container +where + I: fmt::Debug + Image, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Container") + .field("id", &self.id()) + .field("image", &self.image()) + .field("ports", &self.ports()) + .field("command", &self.async_impl().docker_client.config.command()) + .finish() + } +} + +impl Container { + pub(crate) fn new(runtime: tokio::runtime::Runtime, async_impl: ContainerAsync) -> Self { + Self { + inner: Some(ActiveContainer { + runtime: runtime, + async_impl, + }), + } + } +} + +impl Container +where + I: Image, +{ + /// Returns the id of this container. + pub fn id(&self) -> &str { + self.async_impl().id() + } + + /// Returns a reference to the [`Image`] of this container. + /// + /// [`Image`]: trait.Image.html + pub fn image(&self) -> &I { + self.async_impl().image() + } + + /// Returns a reference to the [`arguments`] of the [`Image`] of this container. + /// + /// Access to this is useful to retrieve relevant information which had been passed as [`arguments`] + /// + /// [`Image`]: trait.Image.html + /// [`arguments`]: trait.Image.html#associatedtype.Args + pub fn image_args(&self) -> &I::Args { + self.async_impl().image_args() + } + + pub fn ports(&self) -> Ports { + self.rt().block_on(self.async_impl().ports()) + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv4 interfaces. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will panic. + /// + /// # Panics + /// + /// This method panics if the given port is not mapped. + /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container + /// is unlikely to be useful. + pub fn get_host_port_ipv4(&self, internal_port: u16) -> u16 { + self.rt() + .block_on(self.async_impl().get_host_port_ipv4(internal_port)) + } + + /// Returns the mapped host port for an internal port of this docker container, on the host's + /// IPv6 interfaces. + /// + /// This method does **not** magically expose the given port, it simply performs a mapping on + /// the already exposed ports. If a docker container does not expose a port, this method will panic. + /// + /// # Panics + /// + /// This method panics if the given port is not mapped. + /// Testcontainers is designed to be used in tests only. If a certain port is not mapped, the container + /// is unlikely to be useful. + pub fn get_host_port_ipv6(&self, internal_port: u16) -> u16 { + self.rt() + .block_on(self.async_impl().get_host_port_ipv6(internal_port)) + } + + /// Returns the bridge ip address of docker container as specified in NetworkSettings.Networks.IPAddress + pub fn get_bridge_ip_address(&self) -> IpAddr { + self.rt() + .block_on(self.async_impl().get_bridge_ip_address()) + } + + /// Returns the host ip address of docker container + pub fn get_host_ip_address(&self) -> IpAddr { + self.rt().block_on(self.async_impl().get_host_ip_address()) + } + + pub fn exec(&self, cmd: ExecCommand) { + self.rt().block_on(self.async_impl().exec(cmd)); + } + + pub fn stop(&self) { + self.rt().block_on(self.async_impl().stop()); + } + + pub fn start(&self) { + self.rt().block_on(self.async_impl().start()); + } + + pub fn rm(mut self) { + if let Some(active) = self.inner.take() { + active.runtime.block_on(active.async_impl.rm()); + } + } + + /// Returns reference to inner `Runtime`. It's safe to unwrap because it's `Some` until `Container` is dropped. + fn rt(&self) -> &tokio::runtime::Runtime { + &self.inner.as_ref().unwrap().runtime + } + + /// Returns reference to inner `ContainerAsync`. It's safe to unwrap because it's `Some` until `Container` is dropped. + fn async_impl(&self) -> &ContainerAsync { + &self.inner.as_ref().unwrap().async_impl + } +} + +impl Drop for Container { + fn drop(&mut self) { + if let Some(mut active) = self.inner.take() { + active.runtime.block_on(async { + drop(active.async_impl.network.take()); + match active.async_impl.docker_client.config.command() { + env::Command::Remove => active.async_impl.rm().await, + env::Command::Keep => {} + } + }); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::core::WaitFor; + + #[derive(Debug, Default)] + pub struct HelloWorld; + + impl Image for HelloWorld { + type Args = (); + + fn name(&self) -> String { + "hello-world".to_owned() + } + + fn tag(&self) -> String { + "latest".to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Hello from Docker!")] + } + } + + #[test] + fn container_should_be_send_and_sync() { + assert_send_and_sync::>(); + } + + fn assert_send_and_sync() {} +} diff --git a/testcontainers/src/core/env.rs b/testcontainers/src/core/env.rs index 5626a430..ed4657ac 100644 --- a/testcontainers/src/core/env.rs +++ b/testcontainers/src/core/env.rs @@ -1,27 +1,13 @@ -use std::str::FromStr; +mod config; -/// Lookup and parse the command specified through the `TESTCONTAINERS` env variable. -pub fn command() -> Option -where - E: GetEnvValue, -{ - // warn users that the old behaviour is gone - if E::get_env_value("KEEP_CONTAINERS").is_some() { - log::warn!("`KEEP_CONTAINERS` has been changed to `TESTCONTAINERS`"); - } - - let command = E::get_env_value("TESTCONTAINERS")?; - let command = command.parse().ok()?; - - Some(command) -} +pub(crate) use config::{Command, Config}; /// Abstracts over reading a value from the environment. pub trait GetEnvValue { fn get_env_value(key: &str) -> Option; } -/// Represents the operation system environment for use within a production environment. +/// Represents the operating system environment for use within a production environment. #[derive(Debug)] pub struct Os; @@ -31,26 +17,6 @@ impl GetEnvValue for Os { } } -/// The commands available to the `TESTCONTAINERS` env variable. -#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] -pub enum Command { - Keep, - #[default] - Remove, -} - -impl FromStr for Command { - type Err = (); - - fn from_str(s: &str) -> Result { - match s { - "keep" => Ok(Command::Keep), - "remove" => Ok(Command::Remove), - other => panic!("unknown command '{other}' provided via TESTCONTAINERS env variable",), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -61,23 +27,25 @@ mod tests { impl GetEnvValue for FakeEnvAlwaysKeep { fn get_env_value(key: &str) -> Option { match key { - "TESTCONTAINERS" => Some("keep".to_owned()), + "TESTCONTAINERS_COMMAND" => Some("keep".to_owned()), _ => None, } } } #[test] - #[should_panic(expected = "unknown command 'foobar' provided via TESTCONTAINERS env variable")] + #[should_panic( + expected = "unknown command 'foobar' provided via TESTCONTAINERS_COMMAND env variable" + )] fn panics_on_unknown_command() { let _ = "foobar".parse::(); } #[test] fn command_looks_up_testcontainers_env_variables() { - let cmd = command::().unwrap(); + let cmd = FakeEnvAlwaysKeep::get_env_value("TESTCONTAINERS_COMMAND").unwrap(); - assert_eq!(cmd, Command::Keep) + assert_eq!(cmd.parse::(), Ok(Command::Keep)) } #[test] diff --git a/testcontainers/src/core/env/config.rs b/testcontainers/src/core/env/config.rs new file mode 100644 index 00000000..0dd1c0cc --- /dev/null +++ b/testcontainers/src/core/env/config.rs @@ -0,0 +1,142 @@ +use crate::core::env::GetEnvValue; +use serde::Deserialize; +use serde_with::serde_as; +use std::{ + path::{Path, PathBuf}, + str::FromStr, +}; +use url::Url; + +const TESTCONTAINERS_PROPERTIES: &str = ".testcontainers.properties"; + +/// The default `DOCKER_HOST` address that we will try to connect to. +#[cfg(unix)] +pub const DEFAULT_DOCKER_HOST: &str = "unix:///var/run/docker.sock"; + +/// The default `DOCKER_HOST` address that a windows client will try to connect to. +#[cfg(windows)] +pub const DEFAULT_DOCKER_HOST: &str = "npipe:////./pipe/docker_engine"; + +#[derive(Debug, Default)] +pub(crate) struct Config { + tc_host: Option, + host: Option, + tls_verify: Option, + cert_path: Option, + command: Option, +} + +#[serde_as] +#[derive(Debug, Default, Deserialize)] +struct TestcontainersProperties { + #[serde(rename = "tc.host")] + tc_host: Option, + #[serde(rename = "docker.host")] + host: Option, + #[serde_as(as = "Option")] + #[serde(rename = "docker.tls.verify")] + tls_verify: Option, + #[serde(rename = "docker.cert.path")] + cert_path: Option, +} + +impl TestcontainersProperties { + async fn load() -> Option { + let home_dir = dirs::home_dir()?; + let properties_path = home_dir.join(TESTCONTAINERS_PROPERTIES); + + let content = tokio::fs::read(properties_path).await.ok()?; + let properties = + serde_java_properties::from_slice(&content).expect("Failed to parse properties"); + + Some(properties) + } +} + +impl Config { + pub(crate) async fn load() -> Self + where + E: GetEnvValue, + { + let env_config = Self::load_from_env_config::(); + let properties = TestcontainersProperties::load().await.unwrap_or_default(); + + // Environment variables take precedence over properties + Self { + tc_host: env_config.tc_host.or(properties.tc_host), + host: env_config.host.or(properties.host), + tls_verify: env_config.tls_verify.or(properties.tls_verify), + cert_path: env_config.cert_path.or(properties.cert_path), + command: env_config.command, + } + } + + fn load_from_env_config() -> Self + where + E: GetEnvValue, + { + let host = E::get_env_value("DOCKER_HOST") + .as_deref() + .map(FromStr::from_str) + .transpose() + .expect("Invalid DOCKER_HOST"); + let tls_verify = E::get_env_value("DOCKER_TLS_VERIFY").map(|v| v == "1"); + let cert_path = E::get_env_value("DOCKER_CERT_PATH").map(PathBuf::from); + let command = E::get_env_value("TESTCONTAINERS_COMMAND").and_then(|v| v.parse().ok()); + + Config { + host, + tc_host: None, + command, + tls_verify, + cert_path, + } + } + + /// The Docker host to use. The host is resolved in the following order: + /// 1. Docker host from the "tc.host" property in the ~/.testcontainers.properties file. + /// 2. DOCKER_HOST environment variable. + /// 3. Docker host from the "docker.host" property in the ~/.testcontainers.properties file. + /// 4. Else, the default Docker socket will be returned. + pub(crate) fn docker_host(&self) -> Url { + self.tc_host + .as_ref() + .or(self.host.as_ref()) + .cloned() + .unwrap_or_else(|| Url::from_str(DEFAULT_DOCKER_HOST).unwrap()) + } + + pub(crate) fn tls_verify(&self) -> bool { + self.tls_verify.unwrap_or_default() + } + + pub(crate) fn cert_path(&self) -> Option<&Path> { + self.cert_path.as_deref() + } + + pub(crate) fn command(&self) -> Command { + self.command.unwrap_or_default() + } +} + +/// The commands available to the `TESTCONTAINERS_COMMAND` env variable. +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub(crate) enum Command { + Keep, + #[default] + Remove, +} + +impl FromStr for Command { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "keep" => Ok(Command::Keep), + "remove" => Ok(Command::Remove), + other => { + panic!("unknown command '{other}' provided via TESTCONTAINERS_COMMAND env variable",) + } + } + } +} diff --git a/testcontainers/src/core/image.rs b/testcontainers/src/core/image.rs index 35ab1ba4..fc1935ff 100644 --- a/testcontainers/src/core/image.rs +++ b/testcontainers/src/core/image.rs @@ -18,8 +18,8 @@ use super::ports::Ports; /// [docker_run]: trait.Docker.html#tymethod.run pub trait Image where - Self: Sized, - Self::Args: ImageArgs + Clone + Debug, + Self: Sized + Sync + Send, + Self::Args: ImageArgs + Clone + Debug + Sync + Send, { /// A type representing the arguments for an Image. /// @@ -101,12 +101,40 @@ where } } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct ExecCommand { + pub(super) cmd: Vec, + pub(super) cmd_ready_condition: WaitFor, + pub(super) container_ready_conditions: Vec, +} + +impl ExecCommand { /// Command to be executed - pub cmd: String, + pub fn new(cmd: Vec) -> Self { + Self { + cmd, + cmd_ready_condition: WaitFor::Nothing, + container_ready_conditions: vec![], + } + } + /// Conditions to be checked on related container - pub ready_conditions: Vec, + pub fn with_container_ready_conditions(mut self, ready_conditions: Vec) -> Self { + self.container_ready_conditions = ready_conditions; + self + } + + /// Conditions to be checked on executed command output + pub fn with_cmd_ready_condition(mut self, ready_conditions: WaitFor) -> Self { + self.cmd_ready_condition = ready_conditions; + self + } +} + +impl Default for ExecCommand { + fn default() -> Self { + Self::new(vec![]) + } } #[derive(Debug)] @@ -119,14 +147,6 @@ impl ContainerState { Self { ports } } - #[deprecated( - since = "0.13.1", - note = "Use `host_port_ipv4()` or `host_port_ipv6()` instead." - )] - pub fn host_port(&self, internal_port: u16) -> u16 { - self.host_port_ipv4(internal_port) - } - pub fn host_port_ipv4(&self, internal_port: u16) -> u16 { self.ports .map_to_host_port_ipv4(internal_port) @@ -150,7 +170,7 @@ impl ImageArgs for () { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Host { Addr(IpAddr), HostGateway, @@ -166,7 +186,7 @@ impl Display for Host { } #[must_use] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RunnableImage { image: I, image_args: I::Args, @@ -183,7 +203,7 @@ pub struct RunnableImage { } impl RunnableImage { - pub fn inner(&self) -> &I { + pub fn image(&self) -> &I { &self.image } diff --git a/testcontainers/src/core/logs.rs b/testcontainers/src/core/logs.rs index 49c588cb..7e11a77a 100644 --- a/testcontainers/src/core/logs.rs +++ b/testcontainers/src/core/logs.rs @@ -1,25 +1,18 @@ -#[cfg(feature = "experimental")] use futures::{stream::BoxStream, StreamExt}; -use std::{ - fmt, io, - io::{BufRead, BufReader, Read}, -}; +use std::{fmt, io}; -#[cfg(feature = "experimental")] pub(crate) struct LogStreamAsync<'d> { - inner: BoxStream<'d, Result>, + inner: BoxStream<'d, Result>, } -#[cfg(feature = "experimental")] impl<'d> fmt::Debug for LogStreamAsync<'d> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("LogStreamAsync").finish() } } -#[cfg(feature = "experimental")] impl<'d> LogStreamAsync<'d> { - pub fn new(stream: BoxStream<'d, Result>) -> Self { + pub fn new(stream: BoxStream<'d, Result>) -> Self { Self { inner: stream } } @@ -36,37 +29,6 @@ impl<'d> LogStreamAsync<'d> { } } -pub(crate) struct LogStream { - inner: Box, -} - -impl fmt::Debug for LogStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LogStream").finish() - } -} - -impl LogStream { - pub fn new(stream: impl Read + 'static) -> Self { - Self { - inner: Box::new(stream), - } - } - - pub fn wait_for_message(self, message: &str) -> Result<(), WaitError> { - let logs = BufReader::new(self.inner); - let mut lines = vec![]; - - for line in logs.lines() { - if handle_line(line?, message, &mut lines) { - return Ok(()); - } - } - - Err(end_of_stream(lines)) - } -} - fn handle_line(line: String, message: &str, lines: &mut Vec) -> bool { if line.contains(message) { log::info!("Found message after comparing {} lines", lines.len()); @@ -107,18 +69,16 @@ impl From for WaitError { mod tests { use super::*; - #[test] - fn given_logs_when_line_contains_message_should_find_it() { - let log_stream = LogStream::new( - r" + #[tokio::test(flavor = "multi_thread")] + async fn given_logs_when_line_contains_message_should_find_it() { + let log_stream = LogStreamAsync::new(Box::pin(futures::stream::iter([Ok(r" Message one Message two Message three " - .as_bytes(), - ); + .to_string())]))); - let result = log_stream.wait_for_message("Message three"); + let result = log_stream.wait_for_message("Message three").await; assert!(result.is_ok()) } diff --git a/testcontainers/src/core/macros.rs b/testcontainers/src/core/macros.rs new file mode 100644 index 00000000..5f0adad9 --- /dev/null +++ b/testcontainers/src/core/macros.rs @@ -0,0 +1,28 @@ +/// It allows using both runtime flavors without indefinite hanging. +/// Most useful for async calls in `Drop` implementations. +macro_rules! block_on { + ($future:expr, $err_msg:literal) => { + let handle = tokio::runtime::Handle::current(); + match handle.runtime_flavor() { + RuntimeFlavor::CurrentThread => { + // Not the best approach, but it greatly simplifies the code and use of the library. + // since the main scenario is testing, this is acceptable. + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on($future); + }) + .join() + .expect($err_msg); + } + RuntimeFlavor::MultiThread => { + tokio::task::block_in_place(move || handle.block_on($future)) + } + _ => unreachable!("unsupported runtime flavor"), + } + }; +} + +pub(crate) use block_on; diff --git a/testcontainers/src/core/network.rs b/testcontainers/src/core/network.rs new file mode 100644 index 00000000..02dbae6f --- /dev/null +++ b/testcontainers/src/core/network.rs @@ -0,0 +1,89 @@ +use crate::core::{client::Client, env, macros}; +use std::{ + collections::HashMap, + fmt, + sync::{Arc, OnceLock, Weak}, +}; +use tokio::{runtime::RuntimeFlavor, sync::Mutex}; + +pub(crate) static CREATED_NETWORKS: OnceLock>>> = + OnceLock::new(); + +fn created_networks() -> &'static Mutex>> { + CREATED_NETWORKS.get_or_init(|| Mutex::new(HashMap::new())) +} + +pub(crate) struct Network { + name: String, + id: Option, + client: Arc, +} + +impl Network { + pub(crate) async fn new(name: impl Into, client: Arc) -> Option> { + let name = name.into(); + let mut guard = created_networks().lock().await; + let network = if let Some(network) = guard.get(&name).and_then(Weak::upgrade) { + network + } else { + if client.network_exists(&name).await { + // Networks already exists and created outside the testcontainers + return None; + } + + let id = client.create_network(&name).await; + + let created = Arc::new(Self { + name: name.clone(), + id, + client, + }); + + guard.insert(name, Arc::downgrade(&created)); + + created + }; + + Some(network) + } +} + +impl Drop for Network { + fn drop(&mut self) { + if self.client.config.command() == env::Command::Remove { + let client = self.client.clone(); + let name = self.name.clone(); + + let drop_task = async move { + log::trace!("Drop was called for network {name}, cleaning up"); + let mut guard = created_networks().lock().await; + + // check the strong count under the lock to avoid any possible race-conditions. + let is_network_in_use = guard + .get(&name) + .filter(|weak| weak.strong_count() > 0) + .is_some(); + + if is_network_in_use { + log::trace!("Network {name} was not dropped because it is still in use"); + } else { + guard.remove(&name); + client.remove_network(&name).await; + + log::trace!("Network {name} was successfully dropped"); + } + }; + + macros::block_on!(drop_task, "failed to remove network on drop"); + } + } +} + +impl fmt::Debug for Network { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Network") + .field("id", &self.id) + .field("name", &self.name) + .finish() + } +} diff --git a/testcontainers/src/images.rs b/testcontainers/src/images/mod.rs similarity index 100% rename from testcontainers/src/images.rs rename to testcontainers/src/images/mod.rs diff --git a/testcontainers/src/lib.rs b/testcontainers/src/lib.rs index 35dd2c2a..956456c1 100644 --- a/testcontainers/src/lib.rs +++ b/testcontainers/src/lib.rs @@ -19,36 +19,39 @@ //! //! Unsurprisingly, working with testcontainers is very similar to working with Docker itself. //! -//! First you choose a [`Client`]. Given a client instance, you can [`run`][docker_run] [`Images`]. This gives you back a [`Container`]. Containers implement `Drop`. As soon as they go out of scope, the underlying docker container is removed. +//! First, you need to define the [`Image`] that you want to run, and then simply call the `start` method on it from either the [`AsyncRunner`] or [`SyncRunner`] trait. +//! This will return you [`ContainerAsync`] or [`Container`] respectively. +//! Containers implement `Drop`. As soon as they go out of scope, the underlying docker container is removed. //! -//! # Usage in production code -//! -//! Although nothing inherently prevents testcontainers from being used in production code, the library itself was not designed with that in mind. For example, many methods will panic if something goes wrong but because the usage is intended to be within tests, this is deemed acceptable. +//! See examples in the corresponding runner ([`AsyncRunner`] and [`SyncRunner`]) //! //! # Ecosystem //! //! `testcontainers` is the core crate that provides an API for working with containers in a test environment. -//! However, it does not provide ready-to-use modules, you can implement your [`Image`]s using the library directly or use community supported [`testcontainers-modules`]. +//! The only image that is provided by the core crate is the [`GenericImage`], which is a simple wrapper around any docker image. +//! +//! However, it does not provide ready-to-use modules, you can implement your [`Image`]s using the library directly or use community supported [`testcontainers-modules`]. +//! +//! # Usage in production code +//! +//! Although nothing inherently prevents testcontainers from being used in production code, the library itself was not designed with that in mind. +//! For example, many methods will panic if something goes wrong but because the usage is intended to be within tests, this is deemed acceptable. //! //! [tc_website]: https://testcontainers.org //! [`Docker`]: https://docker.com -//! [docker_run]: trait.Docker.html#tymethod.run -//! [`Client`]: trait.Docker.html#implementors -//! [`Images`]: trait.Image.html#implementors -//! [`Container`]: struct.Container.html +//! [`AsyncRunner`]: runners::AsyncRunner +//! [`SyncRunner`]: runners::SyncRunner //! [`testcontainers-modules`]: https://crates.io/crates/testcontainers-modules -pub use crate::core::{Container, Image, ImageArgs, RunnableImage}; -#[cfg(feature = "experimental")] -pub use crate::core::ContainerAsync; +pub mod core; +pub use crate::core::{containers::*, Image, ImageArgs, RunnableImage}; #[cfg(feature = "watchdog")] +#[cfg_attr(docsrs, doc(cfg(feature = "watchdog")))] pub(crate) mod watchdog; -/// All available Docker clients. -pub mod clients; -pub mod core; /// All available Docker images. mod images; - pub use images::generic::GenericImage; + +pub mod runners; diff --git a/testcontainers/src/runners/async_runner.rs b/testcontainers/src/runners/async_runner.rs new file mode 100644 index 00000000..7c7d0f6f --- /dev/null +++ b/testcontainers/src/runners/async_runner.rs @@ -0,0 +1,363 @@ +use crate::{ + core::{client::Client, network::Network, ContainerState}, + ContainerAsync, Image, ImageArgs, RunnableImage, +}; +use async_trait::async_trait; +use bollard::{ + container::{Config, CreateContainerOptions}, + models::{HostConfig, PortBinding}, +}; +use std::collections::HashMap; + +#[async_trait] +/// Helper trait to start containers asynchronously. +/// +/// ## Example +/// +/// ```rust +/// use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage}; +/// +/// #[tokio::test] +/// async fn test_redis() { +/// let container = GenericImage::new("redis", "7.2.4") +/// .with_exposed_port(6379) +/// .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) +/// .start() +/// .await; +/// } +/// ``` +pub trait AsyncRunner { + /// Starts the container and returns an instance of `ContainerAsync`. + async fn start(self) -> ContainerAsync; +} + +#[async_trait] +impl AsyncRunner for T +where + T: Into> + Send, + I: Image, +{ + async fn start(self) -> ContainerAsync { + let client = Client::lazy_client().await; + let runnable_image = self.into(); + let mut create_options: Option> = None; + + let extra_hosts: Vec<_> = runnable_image + .hosts() + .map(|(key, value)| format!("{key}:{value}")) + .collect(); + + let mut config: Config = Config { + image: Some(runnable_image.descriptor()), + host_config: Some(HostConfig { + privileged: Some(runnable_image.privileged()), + extra_hosts: Some(extra_hosts), + ..Default::default() + }), + ..Default::default() + }; + + // shared memory + if let Some(bytes) = runnable_image.shm_size() { + config.host_config = config.host_config.map(|mut host_config| { + host_config.shm_size = Some(bytes as i64); + host_config + }); + } + + // create network and add it to container creation + let network = if let Some(network) = runnable_image.network() { + config.host_config = config.host_config.map(|mut host_config| { + host_config.network_mode = Some(network.to_string()); + host_config + }); + Network::new(network, client.clone()).await + } else { + None + }; + + // name of the container + if let Some(name) = runnable_image.container_name() { + create_options = Some(CreateContainerOptions { + name: name.to_owned(), + platform: None, + }) + } + + // handle environment variables + let envs: Vec = runnable_image + .env_vars() + .map(|(k, v)| format!("{k}={v}")) + .collect(); + config.env = Some(envs); + + // volumes + let vols: HashMap> = runnable_image + .volumes() + .map(|(orig, dest)| (format!("{orig}:{dest}"), HashMap::new())) + .collect(); + config.volumes = Some(vols); + + // entrypoint + if let Some(entrypoint) = runnable_image.entrypoint() { + config.entrypoint = Some(vec![entrypoint]); + } + + let is_container_networked = runnable_image + .network() + .as_ref() + .map(|network| network.starts_with("container:")) + .unwrap_or(false); + + // exposed ports + if !is_container_networked { + config.exposed_ports = Some( + runnable_image + .expose_ports() + .into_iter() + .map(|p| (format!("{p}/tcp"), HashMap::new())) + .collect(), + ); + } + + // ports + if runnable_image.ports().is_some() || !runnable_image.expose_ports().is_empty() { + let empty: Vec<_> = Vec::new(); + let bindings = runnable_image + .ports() + .as_ref() + .unwrap_or(&empty) + .iter() + .map(|p| { + ( + format!("{}/tcp", p.internal), + Some(vec![PortBinding { + host_ip: None, + host_port: Some(p.local.to_string()), + }]), + ) + }) + .chain( + runnable_image + .expose_ports() + .into_iter() + .map(|p| (format!("{}/tcp", p), Some(vec![PortBinding::default()]))), + ); + + config.host_config = config.host_config.map(|mut host_config| { + host_config.port_bindings = Some(bindings.collect()); + host_config + }); + } else if !is_container_networked { + config.host_config = config.host_config.map(|mut host_config| { + host_config.publish_all_ports = Some(true); + host_config + }); + } + + // extra hosts + + let args = runnable_image + .args() + .clone() + .into_iterator() + .collect::>(); + if !args.is_empty() { + config.cmd = Some(args); + } + + // create the container with options + let create_result = client + .create_container(create_options.clone(), config.clone()) + .await; + let container_id = { + match create_result { + Ok(container) => container.id, + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, .. + }) => { + client.pull_image(&runnable_image.descriptor()).await; + client + .bollard + .create_container(create_options, config) + .await + .unwrap() + .id + } + Err(err) => panic!("{}", err), + } + }; + + #[cfg(feature = "watchdog")] + if client.config.command() == crate::core::env::Command::Remove { + crate::watchdog::register(container_id.clone()); + } + + client + .bollard + .start_container::(&container_id, None) + .await + .unwrap(); + + let container = + ContainerAsync::new(container_id, client.clone(), runnable_image, network).await; + + for cmd in container + .image() + .exec_after_start(ContainerState::new(container.ports().await)) + { + container.exec(cmd).await; + } + + container + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{core::WaitFor, images::generic::GenericImage}; + + #[tokio::test] + async fn async_run_command_should_expose_all_ports_if_no_explicit_mapping_requested() { + let client = Client::lazy_client().await; + let container = RunnableImage::from(GenericImage::new("hello-world", "latest")) + .start() + .await; + + // inspect volume and env + let container_details = client.inspect(container.id()).await; + let publish_ports = container_details + .host_config + .unwrap() + .publish_all_ports + .unwrap(); + assert!(publish_ports, "publish_all_ports must be `true`"); + } + + #[tokio::test] + async fn async_run_command_should_map_exposed_port() { + let image = GenericImage::new("simple_web_server", "latest") + .with_exposed_port(5000) + .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::seconds(1)); + let container = image.start().await; + container.get_host_port_ipv4(5000).await; + } + + #[tokio::test] + async fn async_run_command_should_expose_only_requested_ports() { + let client = Client::lazy_client().await; + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_mapped_port((123, 456)) + .with_mapped_port((555, 888)) + .start() + .await; + + let container_details = client.inspect(container.id()).await; + + let port_bindings = container_details + .host_config + .unwrap() + .port_bindings + .unwrap(); + assert!(port_bindings.contains_key("456/tcp")); + assert!(port_bindings.contains_key("888/tcp")); + } + + #[tokio::test] + async fn async_run_command_should_include_network() { + let client = Client::lazy_client().await; + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_network("awesome-net-1") + .start() + .await; + + let container_details = client.inspect(container.id()).await; + let networks = container_details + .network_settings + .unwrap() + .networks + .unwrap(); + + assert!( + networks.contains_key("awesome-net-1"), + "Networks is {networks:?}" + ); + } + + #[tokio::test] + async fn async_run_command_should_include_name() { + let client = Client::lazy_client().await; + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_container_name("async_hello_container") + .start() + .await; + + let container_details = client.inspect(container.id()).await; + let container_name = container_details.name.unwrap(); + assert!(container_name.ends_with("async_hello_container")); + } + + #[tokio::test] + async fn async_should_create_network_if_image_needs_it_and_drop_it_in_the_end() { + let hello_world = GenericImage::new("hello-world", "latest"); + + { + let client = Client::lazy_client().await; + assert!(!client.network_exists("awesome-net-2").await); + + // creating the first container creates the network + let _container1 = RunnableImage::from(hello_world.clone()) + .with_network("awesome-net-2") + .start() + .await; + + // creating a 2nd container doesn't fail because check if the network exists already + let _container2 = RunnableImage::from(hello_world) + .with_network("awesome-net-2") + .start() + .await; + + assert!(client.network_exists("awesome-net-2").await); + } + + // containers have been dropped, should clean up networks + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let client = Client::lazy_client().await; + assert!(!client.network_exists("awesome-net-2").await) + } + + #[tokio::test] + async fn async_run_command_should_set_shared_memory_size() { + let client = Client::lazy_client().await; + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_shm_size(1_000_000) + .start() + .await; + + let container_details = client.inspect(container.id()).await; + let shm_size = container_details.host_config.unwrap().shm_size.unwrap(); + + assert_eq!(shm_size, 1_000_000); + } + + #[tokio::test] + async fn async_run_command_should_include_privileged() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_privileged(true) + .start() + .await; + + let client = Client::lazy_client().await; + let container_details = client.inspect(container.id()).await; + + let privileged = container_details.host_config.unwrap().privileged.unwrap(); + assert!(privileged, "privileged must be `true`"); + } +} diff --git a/testcontainers/src/runners/mod.rs b/testcontainers/src/runners/mod.rs new file mode 100644 index 00000000..b4716c15 --- /dev/null +++ b/testcontainers/src/runners/mod.rs @@ -0,0 +1,9 @@ +pub(crate) mod async_runner; +#[cfg(feature = "blocking")] +pub(crate) mod sync_runner; + +pub use self::async_runner::AsyncRunner; + +#[cfg(feature = "blocking")] +#[cfg_attr(docsrs, doc(cfg(feature = "blocking")))] +pub use self::sync_runner::SyncRunner; diff --git a/testcontainers/src/runners/sync_runner.rs b/testcontainers/src/runners/sync_runner.rs new file mode 100644 index 00000000..2c1b02db --- /dev/null +++ b/testcontainers/src/runners/sync_runner.rs @@ -0,0 +1,248 @@ +use crate::{Container, Image, RunnableImage}; + +/// Helper trait to start containers synchronously. +/// +/// ## Example +/// +/// ```rust +/// use testcontainers::{core::WaitFor, runners::SyncRunner, GenericImage}; +/// +/// #[test] +/// fn test_redis() { +/// let container = GenericImage::new("redis", "7.2.4") +/// .with_exposed_port(6379) +/// .with_wait_for(WaitFor::message_on_stdout("Ready to accept connections")) +/// .start(); +/// } +/// ``` +pub trait SyncRunner { + /// Starts the container and returns an instance of `Container`. + fn start(self) -> Container; +} + +impl SyncRunner for T +where + T: Into> + Send, + I: Image, +{ + fn start(self) -> Container { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build sync runner"); + let async_container = runtime.block_on(super::AsyncRunner::start(self)); + + Container::new(runtime, async_container) + } +} + +#[cfg(test)] +mod tests { + use crate::core::{client::Client, WaitFor}; + use bollard_stubs::models::ContainerInspectResponse; + use std::{ + collections::BTreeMap, + sync::{Arc, OnceLock}, + }; + use tokio::runtime::Runtime; + + use super::*; + use crate::images::generic::GenericImage; + + static RUNTIME: OnceLock = OnceLock::new(); + + fn runtime() -> &'static Runtime { + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + }) + } + + fn docker_client() -> Arc { + runtime().block_on(Client::lazy_client()) + } + + fn inspect(id: &str) -> ContainerInspectResponse { + runtime().block_on(docker_client().inspect(id)) + } + + fn network_exists(client: &Arc, name: &str) -> bool { + runtime().block_on(client.network_exists(name)) + } + + #[derive(Default)] + struct HelloWorld { + volumes: BTreeMap, + env_vars: BTreeMap, + } + + impl Image for HelloWorld { + type Args = (); + + fn name(&self) -> String { + "hello-world".to_owned() + } + + fn tag(&self) -> String { + "latest".to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::message_on_stdout("Hello from Docker!")] + } + + fn env_vars(&self) -> Box + '_> { + Box::new(self.env_vars.iter()) + } + + fn volumes(&self) -> Box + '_> { + Box::new(self.volumes.iter()) + } + } + + #[test] + fn sync_run_command_should_expose_all_ports_if_no_explicit_mapping_requested() { + let container = RunnableImage::from(GenericImage::new("hello-world", "latest")).start(); + + // inspect volume and env + let container_details = inspect(container.id()); + let publish_ports = container_details + .host_config + .unwrap() + .publish_all_ports + .unwrap(); + assert!(publish_ports, "publish_all_ports must be `true`"); + } + + #[test] + fn sync_run_command_should_map_exposed_port() { + let image = GenericImage::new("simple_web_server", "latest") + .with_exposed_port(5000) + .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::seconds(1)); + let container = image.start(); + container.get_host_port_ipv4(5000); + } + + #[test] + fn sync_run_command_should_expose_only_requested_ports() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_mapped_port((123, 456)) + .with_mapped_port((555, 888)) + .start(); + + let container_details = inspect(container.id()); + + let port_bindings = container_details + .host_config + .unwrap() + .port_bindings + .unwrap(); + assert!(port_bindings.contains_key("456/tcp")); + assert!(port_bindings.contains_key("888/tcp")); + } + + #[test] + #[should_panic( + expected = "called `Result::unwrap()` on an `Err` value: DockerResponseServerError { status_code: 404, message: \"No such container: !INVALID_NAME_DUE_TO_SYMBOLS!\" }" + )] + fn sync_rm_command_should_panic_on_invalid_container() { + runtime().block_on(docker_client().rm("!INVALID_NAME_DUE_TO_SYMBOLS!")); + unreachable!() + } + + #[test] + fn sync_run_command_should_include_network() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_network("sync-awesome-net-1") + .start(); + + let container_details = inspect(container.id()); + let networks = container_details + .network_settings + .unwrap() + .networks + .unwrap(); + + assert!( + networks.contains_key("sync-awesome-net-1"), + "Networks is {networks:?}" + ); + } + + #[test] + fn sync_run_command_should_include_name() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image) + .with_container_name("sync_hello_container") + .start(); + + let container_details = inspect(container.id()); + let container_name = container_details.name.unwrap(); + assert!(container_name.ends_with("sync_hello_container")); + } + + #[test] + fn sync_run_command_with_container_network_should_not_expose_ports() { + let _first_container = + RunnableImage::from(GenericImage::new("simple_web_server", "latest")) + .with_container_name("the_first_one") + .start(); + + let image = GenericImage::new("hello-world", "latest"); + RunnableImage::from(image) + .with_network("container:the_first_one") + .start(); + } + + #[test] + fn sync_run_command_should_include_privileged() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image).with_privileged(true).start(); + let container_details = inspect(container.id()); + + let privileged = container_details.host_config.unwrap().privileged.unwrap(); + assert!(privileged, "privileged must be `true`"); + } + + #[test] + fn sync_run_command_should_set_shared_memory_size() { + let image = GenericImage::new("hello-world", "latest"); + let container = RunnableImage::from(image).with_shm_size(1_000_000).start(); + + let container_details = inspect(container.id()); + let shm_size = container_details.host_config.unwrap().shm_size.unwrap(); + + assert_eq!(shm_size, 1_000_000); + } + + #[test] + fn sync_should_create_network_if_image_needs_it_and_drop_it_in_the_end() { + { + let client = docker_client(); + + assert!(!network_exists(&client, "sync-awesome-net")); + + // creating the first container creates the network + let _container1: Container = RunnableImage::from(HelloWorld::default()) + .with_network("sync-awesome-net") + .start(); + // creating a 2nd container doesn't fail because check if the network exists already + let _container2 = RunnableImage::from(HelloWorld::default()) + .with_network("sync-awesome-net") + .start(); + + assert!(network_exists(&client, "sync-awesome-net")); + } + + { + let client = docker_client(); + // original client has been dropped, should clean up networks + assert!(!network_exists(&client, "sync-awesome-net")); + } + } +} diff --git a/testcontainers/src/watchdog.rs b/testcontainers/src/watchdog.rs index 0302c2cf..0e3483eb 100644 --- a/testcontainers/src/watchdog.rs +++ b/testcontainers/src/watchdog.rs @@ -1,4 +1,4 @@ -use crate::{clients::Cli, core::Docker}; +use crate::core::client::Client; use conquer_once::Lazy; use signal_hook::{ consts::{SIGINT, SIGQUIT, SIGTERM}, @@ -8,22 +8,29 @@ use std::{collections::BTreeSet, sync::Mutex, thread}; static WATCHDOG: Lazy> = Lazy::new(|| { thread::spawn(move || { - let signal_docker = Cli::default(); - let mut signals = - Signals::new([SIGTERM, SIGINT, SIGQUIT]).expect("failed to register signal handler"); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to start watchdog runtime in background"); - for signal in &mut signals { - for container_id in WATCHDOG - .lock() - .map(|s| s.containers.clone()) - .unwrap_or_default() - { - signal_docker.stop(&container_id); - signal_docker.rm(&container_id); - } + runtime.block_on(async { + let signal_docker = Client::lazy_client().await; + let mut signals = Signals::new([SIGTERM, SIGINT, SIGQUIT]) + .expect("failed to register signal handler"); + + for signal in &mut signals { + for container_id in WATCHDOG + .lock() + .map(|s| s.containers.clone()) + .unwrap_or_default() + { + signal_docker.stop(&container_id).await; + signal_docker.rm(&container_id).await; + } - let _ = signal_hook::low_level::emulate_default_handler(signal); - } + let _ = signal_hook::low_level::emulate_default_handler(signal); + } + }); }); Mutex::new(Watchdog::default()) diff --git a/testcontainers/tests/http_client.rs b/testcontainers/tests/async_runner.rs similarity index 71% rename from testcontainers/tests/http_client.rs rename to testcontainers/tests/async_runner.rs index f1a891b9..29d16569 100644 --- a/testcontainers/tests/http_client.rs +++ b/testcontainers/tests/async_runner.rs @@ -1,7 +1,6 @@ -#![cfg(feature = "experimental")] - +use bollard::Docker; use std::time::Duration; -use testcontainers::{core::WaitFor, GenericImage, *}; +use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, *}; #[derive(Debug, Default)] pub struct HelloWorld; @@ -23,16 +22,14 @@ impl Image for HelloWorld { } #[tokio::test(flavor = "multi_thread")] -async fn bollard_can_run_hello_world() { +async fn bollard_can_run_hello_world_with_multi_thread() { let _ = pretty_env_logger::try_init(); - let docker = clients::Http::default(); - - let _container = docker.run(HelloWorld).await; + let _container = HelloWorld.start().await; } async fn cleanup_hello_world_image() { - let docker = bollard::Docker::connect_with_http_defaults().unwrap(); + let docker = Docker::connect_with_unix_defaults().unwrap(); futures::future::join_all( docker .list_images::(None) @@ -49,26 +46,23 @@ async fn cleanup_hello_world_image() { .await; } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn bollard_pull_missing_image_hello_world() { let _ = pretty_env_logger::try_init(); cleanup_hello_world_image().await; - let docker = clients::Http::default(); - let _container = docker.run(HelloWorld).await; + let _container = RunnableImage::from(HelloWorld).start().await; } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn start_containers_in_parallel() { let _ = pretty_env_logger::try_init(); - let docker = clients::Http::default(); - let image = GenericImage::new("hello-world", "latest").with_wait_for(WaitFor::seconds(2)); - let run_1 = docker.run(image.clone()); - let run_2 = docker.run(image.clone()); - let run_3 = docker.run(image.clone()); - let run_4 = docker.run(image); + let run_1 = image.clone().start(); + let run_2 = image.clone().start(); + let run_3 = image.clone().start(); + let run_4 = image.start(); let run_all = futures::future::join_all(vec![run_1, run_2, run_3, run_4]); diff --git a/testcontainers/tests/dual_stack_host_ports.rs b/testcontainers/tests/dual_stack_host_ports.rs index 47472f37..30ed4afd 100644 --- a/testcontainers/tests/dual_stack_host_ports.rs +++ b/testcontainers/tests/dual_stack_host_ports.rs @@ -1,37 +1,35 @@ +#![cfg(feature = "blocking")] + use std::net::{Ipv6Addr, TcpListener}; -use testcontainers::{clients, core::WaitFor, GenericImage}; +use testcontainers::{core::WaitFor, runners::SyncRunner, GenericImage}; /// Test the functionality of exposing container ports over both IPv4 and IPv6. -#[tokio::test] -async fn test_ipv4_ipv6_host_ports() { +#[test] +fn test_ipv4_ipv6_host_ports() { let _ = pretty_env_logger::try_init(); - let docker = clients::Cli::default(); - let wait_for = WaitFor::message_on_stdout("server is ready"); - let image = GenericImage::new("simple_web_server", "latest").with_wait_for(wait_for.clone()); + let image = GenericImage::new("simple_web_server", "latest") + .with_wait_for(WaitFor::message_on_stdout("server is ready")) + .with_wait_for(WaitFor::seconds(1)); // Run one container, and check what ephemeral ports it uses. Perform test HTTP requests to // both bound ports. - let first_container = docker.run(image.clone()); + let first_container = image.clone().start(); let first_ipv4_port = first_container.get_host_port_ipv4(80); let first_ipv6_port = first_container.get_host_port_ipv6(80); assert_eq!( "foo", - reqwest::get(&format!("http://127.0.0.1:{first_ipv4_port}")) - .await + reqwest::blocking::get(format!("http://127.0.0.1:{first_ipv4_port}")) .unwrap() .text() - .await .unwrap(), ); assert_eq!( "foo", - reqwest::get(&format!("http://[::1]:{first_ipv6_port}")) - .await + reqwest::blocking::get(format!("http://[::1]:{first_ipv6_port}")) .unwrap() .text() - .await .unwrap(), ); @@ -48,25 +46,21 @@ async fn test_ipv4_ipv6_host_ports() { // Run a second container, and repeat test HTTP requests with it. This confirms that handling // of both IPv4 and IPv6 host port bindings is correct, because at this point, // `second_ipv4_port` and `second_ipv6_port` are very unlikely to be the same. - let second_container = docker.run(image); + let second_container = image.start(); let second_ipv4_port = second_container.get_host_port_ipv4(80); let second_ipv6_port = second_container.get_host_port_ipv6(80); assert_eq!( "foo", - reqwest::get(&format!("http://127.0.0.1:{second_ipv4_port}")) - .await + reqwest::blocking::get(format!("http://127.0.0.1:{second_ipv4_port}")) .unwrap() .text() - .await .unwrap(), ); assert_eq!( "foo", - reqwest::get(&format!("http://[::1]:{second_ipv6_port}")) - .await + reqwest::blocking::get(format!("http://[::1]:{second_ipv6_port}")) .unwrap() .text() - .await .unwrap(), ); } diff --git a/testcontainers/tests/cli_client.rs b/testcontainers/tests/sync_runner.rs similarity index 77% rename from testcontainers/tests/cli_client.rs rename to testcontainers/tests/sync_runner.rs index eb5d8138..22279bc4 100644 --- a/testcontainers/tests/cli_client.rs +++ b/testcontainers/tests/sync_runner.rs @@ -1,5 +1,8 @@ +#![cfg(feature = "blocking")] + use testcontainers::{ core::{Host, WaitFor}, + runners::SyncRunner, *, }; @@ -27,25 +30,21 @@ impl Image for HelloWorld { } } -#[tokio::test(flavor = "multi_thread")] -async fn cli_can_run_hello_world() { +#[test] +fn sync_can_run_hello_world() { let _ = pretty_env_logger::try_init(); - - let docker = clients::Cli::default(); - - let _container = docker.run(HelloWorld); + let _container = HelloWorld.start(); } #[test] fn generic_image_with_custom_entrypoint() { - let docker = clients::Cli::default(); let generic = get_server_container(None); - let node = docker.run(generic); + let node = generic.start(); let port = node.get_host_port_ipv4(80); assert_eq!( "foo", - reqwest::blocking::get(format!("http://127.0.0.1:{port}")) + reqwest::blocking::get(format!("http://{}:{port}", node.get_host_ip_address())) .unwrap() .text() .unwrap() @@ -53,11 +52,11 @@ fn generic_image_with_custom_entrypoint() { let generic = get_server_container(None).with_entrypoint("./bar"); - let node = docker.run(generic); + let node = generic.start(); let port = node.get_host_port_ipv4(80); assert_eq!( "bar", - reqwest::blocking::get(format!("http://127.0.0.1:{port}")) + reqwest::blocking::get(format!("http://{}:{port}", node.get_host_ip_address())) .unwrap() .text() .unwrap() @@ -67,7 +66,6 @@ fn generic_image_with_custom_entrypoint() { #[test] fn generic_image_exposed_ports() { let _ = pretty_env_logger::try_init(); - let docker = clients::Cli::default(); let target_port = 8080; @@ -77,7 +75,7 @@ fn generic_image_exposed_ports() { // Explicitly expose the port, which otherwise would not be available. .with_exposed_port(target_port); - let node = docker.run(generic_server); + let node = generic_server.start(); let port = node.get_host_port_ipv4(target_port); assert!(reqwest::blocking::get(format!("http://127.0.0.1:{port}")) .unwrap() @@ -87,10 +85,8 @@ fn generic_image_exposed_ports() { #[test] fn generic_image_running_with_extra_hosts_added() { - let docker = clients::Cli::default(); - let server_1 = get_server_container(None); - let node = docker.run(server_1); + let node = server_1.start(); let port = node.get_host_port_ipv4(80); let msg = WaitFor::message_on_stdout("foo"); @@ -103,22 +99,32 @@ fn generic_image_running_with_extra_hosts_added() { let server_2 = RunnableImage::from((server_2, vec![format!("http://custom-host:{port}")])) .with_host("custom-host", Host::HostGateway); - docker.run(server_2); + server_2.start(); } #[test] #[should_panic] fn generic_image_port_not_exposed() { let _ = pretty_env_logger::try_init(); - let docker = clients::Cli::default(); let target_port = 8080; // This image binds to 0.0.0.0:8080, does not EXPOSE ports in its dockerfile. let generic_server = GenericImage::new("no_expose_port", "latest") .with_wait_for(WaitFor::message_on_stdout("listening on 0.0.0.0:8080")); - let node = docker.run(generic_server); + let node = generic_server.start(); // Without exposing the port with `with_exposed_port()`, we cannot get a mapping to it. node.get_host_port_ipv4(target_port); } + +#[test] +fn start_multiple_containers() { + let _ = pretty_env_logger::try_init(); + + let image = GenericImage::new("hello-world", "latest").with_wait_for(WaitFor::seconds(2)); + + let _container_1 = image.clone().start(); + let _container_2 = image.clone().start(); + let _container_3 = image.start(); +}