Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support adding new listeners to Apache Kafka #9142

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ Create a `ConfluentKafkaContainer` to use it in your tests:
[Creating a ConlfuentKafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

### Using org.testcontainers.kafka.KafkaContainer

Create a `KafkaContainer` to use it in your tests:

<!--codeinclude-->
[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

## Options

### <a name="zookeeper"></a> Using external Zookeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import org.testcontainers.utility.DockerImageName;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

/**
* Testcontainers implementation for Apache Kafka.
Expand All @@ -24,11 +27,11 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {

private static final int KAFKA_PORT = 9092;

private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";

private static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";

private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";
private final Set<String> listeners = new HashSet<>();

private final Set<Supplier<String>> advertisedListeners = new HashSet<>();

public KafkaContainer(String imageName) {
this(DockerImageName.parse(imageName));
Expand All @@ -39,31 +42,16 @@ public KafkaContainer(DockerImageName dockerImageName) {
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, APACHE_KAFKA_NATIVE_IMAGE_NAME);

withExposedPorts(KAFKA_PORT);
withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);

withEnv(
"KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");

withEnv("KAFKA_NODE_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
withEnv(KafkaHelper.envVars());

withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
}

@Override
protected void configure() {
KafkaHelper.resolveListeners(this, this.listeners);

String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
Expand All @@ -80,7 +68,10 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
List<String> advertisedListeners = new ArrayList<>();
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
advertisedListeners.add(brokerAdvertisedListener);

advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);

String command = "#!/bin/bash\n";
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
Expand All @@ -89,6 +80,69 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

/**
* Add a listener in the format {@code host:port}.
* Host will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker within the same container network.
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* <li>0.0.0.0:9094</li>
* </ul>
* <p>
* The listener will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a listener with format {@code host:port}
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(String listener) {
this.listeners.add(listener);
this.advertisedListeners.add(() -> listener);
return this;
}

/**
* Add a listener in the format {@code host:port} and a {@link Supplier} for the advertised listener.
* Host from listener will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker from outside the container network
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* <li>0.0.0.0:9094</li>
* </ul>
* <p>
* The {@link Supplier} will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a supplier that will provide a listener
* @param advertisedListener a supplier that will provide a listener
* @return this {@link KafkaContainer} instance
*/
public KafkaContainer withListener(String listener, Supplier<String> advertisedListener) {
this.listeners.add(listener);
this.advertisedListeners.add(advertisedListener);
return this;
}

public String getBootstrapServers() {
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.testcontainers.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.AbstractKafka;

@RunWith(Parameterized.class)
public class CompatibleApacheKafkaImageTest extends AbstractKafka {

@Parameterized.Parameters(name = "{0}")
public static String[] params() {
return new String[] { "apache/kafka:3.8.0", "apache/kafka-native:3.8.0" };
}

@Parameterized.Parameter
public String imageName;

@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(this.imageName)) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,61 @@
package org.testcontainers.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.testcontainers.AbstractKafka;
import org.testcontainers.KCatContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.SocatContainer;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Parameterized.class)
public class KafkaContainerTest extends AbstractKafka {

@Parameterized.Parameters(name = "{0}")
public static String[] params() {
return new String[] { "apache/kafka:3.8.0", "apache/kafka-native:3.8.0" };
@Test
public void testUsage() throws Exception {
try ( // constructorWithVersion {
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
// }
) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

@Parameterized.Parameter
public String imageName;
@Test
public void testUsageWithListener() throws Exception {
try (
Network network = Network.newNetwork();
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092")
.withNetwork(network);
KCatContainer kcat = new KCatContainer().withNetwork(network)
) {
kafka.start();
kcat.start();

kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();

assertThat(stdout).contains("Message produced by kcat");
}
}

@Test
public void testUsage() throws Exception {
try (KafkaContainer kafka = new KafkaContainer(imageName)) {
public void testUsageWithListenerFromProxy() throws Exception {
try (
Network network = Network.newNetwork();
SocatContainer socat = new SocatContainer().withNetwork(network).withTarget(2000, "kafka", 19092);
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
.withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
.withNetwork(network)
) {
socat.start();
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());

String bootstrapServers = String.format("%s:%s", socat.getHost(), socat.getMappedPort(2000));
testKafkaFunctionality(bootstrapServers);
}
}
}
Loading