Skip to content

Commit

Permalink
Fix register listeners in RedpandaContainer (#9247)
Browse files Browse the repository at this point in the history
Previously, RedpandaContainer allowed to register a new listener
using a Supplier and the new listener's host was also added as a network
alias. However, if the listener's host was an IP the listener configuration
was wrong and didn't allow to connect to the broker.
  • Loading branch information
eddumelendez authored Sep 19, 2024
1 parent 031fd06 commit f763945
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 24 deletions.
20 changes: 20 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ Client using the new registered listener:
[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage
<!--/codeinclude-->

The following examples shows how to register a proxy as a new listener in `RedpandaContainer`:

Use `SocatContainer` to create the proxy

<!--codeinclude-->
[Create Proxy](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createProxy
<!--/codeinclude-->

Register the listener and advertised listener

<!--codeinclude-->
[Register Listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListenerAndAdvertisedListener
<!--/codeinclude-->

Client using the new registered listener:

<!--codeinclude-->
[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessageFromProxy
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
testImplementation 'org.assertj:assertj-core:3.26.3'
testImplementation 'io.rest-assured:rest-assured:5.5.0'
testImplementation 'org.awaitility:awaitility:4.2.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private final List<String> superusers = new ArrayList<>();

@Deprecated
private final Set<Supplier<Listener>> listenersValueSupplier = new HashSet<>();

private final Map<String, Supplier<String>> listeners = new HashMap<>();

public RedpandaContainer(String image) {
this(DockerImageName.parse(image));
}
Expand Down Expand Up @@ -109,6 +112,7 @@ protected void configure() {
.map(Supplier::get)
.map(Listener::getAddress)
.forEach(this::withNetworkAliases);
this.listeners.keySet().stream().map(listener -> listener.split(":")[0]).forEach(this::withNetworkAliases);
}

@SneakyThrows
Expand Down Expand Up @@ -212,13 +216,74 @@ public RedpandaContainer withSuperuser(String username) {
* </ul>
* @param listenerSupplier a supplier that will provide a listener
* @return this {@link RedpandaContainer} instance
* @deprecated use {@link #withListener(String, Supplier)} instead
*/
@Deprecated
public RedpandaContainer withListener(Supplier<String> listenerSupplier) {
String[] parts = listenerSupplier.get().split(":");
this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1])));
return this;
}

/**
* Add a listener in the format {@code host:port}.
* Host will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker within the same container network.
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* The listener will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a listener with format {@code host:port}
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withListener(String listener) {
this.listeners.put(listener, () -> listener);
return this;
}

/**
* Add a listener in the format {@code host:port} and a {@link Supplier} for the advertised listener.
* Host from listener will be included as a network alias.
* <p>
* Use it to register additional connections to the Kafka broker from outside the container network
* <p>
* The listener will be added to the list of default listeners.
* <p>
* Default listeners:
* <ul>
* <li>0.0.0.0:9092</li>
* <li>0.0.0.0:9093</li>
* </ul>
* <p>
* The {@link Supplier} will be added to the list of default advertised listeners.
* <p>
* Default advertised listeners:
* <ul>
* <li>{@code container.getConfig().getHostName():9092}</li>
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
* </ul>
* @param listener a supplier that will provide a listener
* @param advertisedListener a supplier that will provide a listener
* @return this {@link RedpandaContainer} instance
*/
public RedpandaContainer withListener(String listener, Supplier<String> advertisedListener) {
this.listeners.put(listener, advertisedListener);
return this;
}

private Transferable getBootstrapFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("enableAuthorization", this.enableAuthorization);
Expand All @@ -233,6 +298,12 @@ private Transferable getBootstrapFile(Configuration cfg) {
}

private Transferable getRedpandaFile(Configuration cfg) {
Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));

List<Map<String, Object>> listeners =
this.listenersValueSupplier.stream()
.map(Supplier::get)
Expand All @@ -244,19 +315,44 @@ private Transferable getRedpandaFile(Configuration cfg) {
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafkaApi = new HashMap<>();
kafkaApi.put("authenticationMethod", this.authenticationMethod);
kafkaApi.put("enableAuthorization", this.enableAuthorization);
kafkaApi.put("advertisedHost", getHost());
kafkaApi.put("advertisedPort", getMappedPort(9092));
kafkaApi.put("listeners", listeners);

List<Map<String, Object>> kafkaListeners =
this.listeners.keySet()
.stream()
.map(listener -> {
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("name", listener.split(":")[0]);
listenerMap.put("address", listener.split(":")[0]);
listenerMap.put("port", listener.split(":")[1]);
listenerMap.put("authentication_method", this.authenticationMethod);
return listenerMap;
})
.collect(Collectors.toList());

List<Map<String, Object>> kafkaAdvertisedListeners =
this.listeners.entrySet()
.stream()
.map(entry -> {
String advertisedListener = entry.getValue().get();
Map<String, Object> listenerMap = new HashMap<>();
listenerMap.put("name", entry.getKey().split(":")[0]);
listenerMap.put("address", advertisedListener.split(":")[0]);
listenerMap.put("port", advertisedListener.split(":")[1]);
return listenerMap;
})
.collect(Collectors.toList());

Map<String, Object> kafka = new HashMap<>();
kafka.put("listeners", kafkaListeners);
kafka.put("advertisedListeners", kafkaAdvertisedListeners);

Map<String, Object> schemaRegistry = new HashMap<>();
schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod);

Map<String, Object> root = new HashMap<>();
root.put("kafkaApi", kafkaApi);
root.put("kafka", kafka);
root.put("schemaRegistry", schemaRegistry);

String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ redpanda:
port: ${listener.port}
authentication_method: ${listener.authentication_method}
</#list>
<#list kafka.listeners as listener>
- address: ${listener.address}
name: ${listener.name}
port: ${listener.port}
authentication_method: ${listener.authentication_method}
</#list>

advertised_kafka_api:
- address: ${ kafkaApi.advertisedHost }
Expand All @@ -40,6 +46,11 @@ redpanda:
name: ${listener.address}
port: ${listener.port}
</#list>
<#list kafka.advertisedListeners as listener>
- address: ${listener.address}
name: ${listener.name}
port: ${listener.port}
</#list>

schema_registry:
schema_registry_api:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.awaitility.Awaitility;

import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -67,24 +67,17 @@ protected void testKafkaFunctionality(String bootstrapServers, int partitions, i

producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();

Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
Awaitility
.await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

if (records.isEmpty()) {
return false;
}

assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));

return true;
}
);
});

consumer.unsubscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.SocatContainer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

Expand Down Expand Up @@ -114,10 +115,37 @@ public void testSchemaRegistry() {
public void testUsageWithListener() throws Exception {
try (
Network network = Network.newNetwork();
// registerListener {
RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7")
.withListener(() -> "redpanda:19092")
.withNetwork(network);
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
})
.withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt")
.withNetwork(network)
.withCommand("-c", "tail -f /dev/null")
) {
redpanda.start();
kcat.start();

kcat.execInContainer("kcat", "-b", "redpanda:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "redpanda:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();

assertThat(stdout).contains("Message produced by kcat");
}
}

@Test
public void testUsageWithListenerInTheSameNetwork() throws Exception {
try (
Network network = Network.newNetwork();
// registerListener {
RedpandaContainer kafka = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7")
.withListener("kafka:19092")
.withNetwork(network);
// }
// createKCatContainer {
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
Expand All @@ -129,18 +157,42 @@ public void testUsageWithListener() throws Exception {
.withCommand("-c", "tail -f /dev/null")
// }
) {
redpanda.start();
kafka.start();
kcat.start();

// produceConsumeMessage {
kcat.execInContainer("kcat", "-b", "redpanda:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
String stdout = kcat
.execInContainer("kcat", "-b", "redpanda:19092", "-C", "-t", "msgs", "-c", "1")
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
.getStdout();
// }

assertThat(stdout).contains("Message produced by kcat");
}
}

@Test
public void testUsageWithListenerFromProxy() throws Exception {
try (
Network network = Network.newNetwork();
// createProxy {
SocatContainer socat = new SocatContainer().withNetwork(network).withTarget(2000, "kafka", 19092);
// }
// registerListenerAndAdvertisedListener {
RedpandaContainer kafka = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7")
.withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
.withNetwork(network)
// }
) {
socat.start();
kafka.start();
// produceConsumeMessageFromProxy {
String bootstrapServers = String.format("%s:%s", socat.getHost(), socat.getMappedPort(2000));
testKafkaFunctionality(bootstrapServers);
// }
}
}

@Test
public void testUsageWithListenerAndSasl() throws Exception {
final String username = "panda";
Expand All @@ -153,7 +205,7 @@ public void testUsageWithListenerAndSasl() throws Exception {
.enableAuthorization()
.enableSasl()
.withSuperuser("panda")
.withListener(() -> "my-panda:29092")
.withListener("my-panda:29092")
.withNetwork(network);
GenericContainer<?> kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1")
.withCreateContainerCmdModifier(cmd -> {
Expand Down

0 comments on commit f763945

Please sign in to comment.