Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky kafka cluster example #4549

Merged
merged 12 commits into from
Oct 7, 2021
2 changes: 1 addition & 1 deletion examples/kafka-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-clients:2.8.0'
testImplementation 'org.assertj:assertj-core:3.20.2'
testImplementation 'com.google.guava:guava:23.0'
testImplementation 'org.slf4j:slf4j-simple:1.7.32'
testImplementation 'ch.qos.logback:logback-classic:1.2.5'
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -56,7 +57,8 @@ public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, in
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "");
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
.withStartupTimeout(Duration.ofMinutes(1));
})
.collect(Collectors.toList());
}
Expand All @@ -81,8 +83,8 @@ private Stream<GenericContainer<?>> allContainers() {
@Override
@SneakyThrows
public void start() {
Stream<Startable> startables = this.brokers.stream().map(Startable.class::cast);
Startables.deepStart(startables).get(60, SECONDS);
// sequential start to avoid resource contention on CI systems with weaker hardware
brokers.forEach(GenericContainer::start);
rnorth marked this conversation as resolved.
Show resolved Hide resolved

Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {
Container.ExecResult result = this.zookeeper.execInContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class KafkaContainerClusterTest {
@Test
public void testKafkaContainerCluster() throws Exception {
try (
KafkaContainerCluster cluster = new KafkaContainerCluster("5.2.1", 3, 2)
KafkaContainerCluster cluster = new KafkaContainerCluster("6.2.1", 3, 2)
) {
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();
Expand Down
14 changes: 14 additions & 0 deletions examples/kafka-cluster/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected void containerIsStarted(InspectContainerResponse containerInfo) {
"advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]"
);
if (result.getExitCode() != 0) {
throw new IllegalStateException(result.getStderr());
throw new IllegalStateException(result.toString());
}
}

Expand Down