Skip to content

Commit

Permalink
Fix KafkaContainer with multiple networks defined (#4213)
Browse files Browse the repository at this point in the history
Current implementation exposes only two listeners PLAINTEXT and BROKER. BROKER must be advertised to explicitly specified network or bridge. If a host port is exposed using `Testcontainers.exposeHostPorts` and a network is specified, the container will be "attached" to an additional network, creating to listeners with the same name

Configure the BROKER listener using the network specified (withNetwork) or "bridge" if not specified, instead of iterating over all the networks in the container info. Additional test cases when a port of the host is exposed (proxy container started) and a external network is specified to the container placing multiple network entries in the container info, making additional Kafka Broker listener entries with the same name "BROKER"


Co-authored-by: Sergei Egorov <[email protected]>
  • Loading branch information
DennisFederico and bsideup authored Jun 30, 2021
1 parent 655110a commit adae983
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import org.testcontainers.utility.DockerImageName;

import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Comparator;

/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
Expand Down Expand Up @@ -122,13 +121,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
}

command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream
.concat(
Stream.of(getBootstrapServers()),
containerInfo.getNetworkSettings().getNetworks().values().stream()
.map(it -> "BROKER://" + it.getIpAddress() + ":9092")
)
.collect(Collectors.joining(",")) + "'\n";

command += "export KAFKA_ADVERTISED_LISTENERS='" + String.join(",", getBootstrapServers(), brokerAdvertisedListener(containerInfo)) + "'\n";

command += ". /etc/confluent/docker/bash-config \n";
command += "/etc/confluent/docker/configure \n";
Expand All @@ -139,4 +133,30 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
STARTER_SCRIPT
);
}

protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) {
// Kafka supports only one INTER_BROKER listener, so we have to pick one.
// The current algorithm uses the following order of resolving the IP:
// 1. Custom network's IP set via `withNetwork`
// 2. Bridge network's IP
// 3. Best effort fallback to getNetworkSettings#ipAddress
String ipAddress = containerInfo.getNetworkSettings().getNetworks().entrySet()
.stream()
.filter(it -> it.getValue().getIpAddress() != null)
.max(Comparator.comparingInt(entry -> {
if (getNetwork() != null && getNetwork().getId().equals(entry.getValue().getNetworkID())) {
return 2;
}

if ("bridge".equals(entry.getKey())) {
return 1;
}

return 0;
}))
.map(it -> it.getValue().getIpAddress())
.orElseGet(() -> containerInfo.getNetworkSettings().getIpAddress());

return String.format("BROKER://%s:%s", ipAddress, "9092");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.Testcontainers;
import org.testcontainers.utility.DockerImageName;

public class KafkaContainerTest {
Expand Down Expand Up @@ -113,6 +114,24 @@ public void testConfluentPlatformVersion6() throws Exception {
}
}

@Test
public void testWithHostExposedPort() throws Exception {
Testcontainers.exposeHostPorts(12345);
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

@Test
public void testWithHostExposedPortAndExternalNetwork() throws Exception {
Testcontainers.exposeHostPorts(12345);
try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE).withNetwork(Network.newNetwork())) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down

0 comments on commit adae983

Please sign in to comment.