diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/config/ProducerConfig.java b/exporter/src/main/java/io/zeebe/exporters/kafka/config/ProducerConfig.java index 3d0d05a..81fe47a 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/config/ProducerConfig.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/config/ProducerConfig.java @@ -36,7 +36,6 @@ public final class ProducerConfig { private final String clientId; private final Duration closeTimeout; private final Map config; - private final int maxConcurrentRequests; private final Duration requestTimeout; private final List servers; @@ -44,13 +43,11 @@ public ProducerConfig( final String clientId, final Duration closeTimeout, final Map config, - final int maxConcurrentRequests, final Duration requestTimeout, final List servers) { this.clientId = Objects.requireNonNull(clientId); this.closeTimeout = Objects.requireNonNull(closeTimeout); this.config = Objects.requireNonNull(config); - this.maxConcurrentRequests = maxConcurrentRequests; this.requestTimeout = Objects.requireNonNull(requestTimeout); this.servers = Objects.requireNonNull(servers); } @@ -67,10 +64,6 @@ public ProducerConfig( return config; } - public int getMaxConcurrentRequests() { - return maxConcurrentRequests; - } - public @NonNull Duration getRequestTimeout() { return requestTimeout; } @@ -81,8 +74,7 @@ public int getMaxConcurrentRequests() { @Override public int hashCode() { - return Objects.hash( - clientId, closeTimeout, config, maxConcurrentRequests, requestTimeout, servers); + return Objects.hash(clientId, closeTimeout, config, requestTimeout, servers); } @Override @@ -94,8 +86,7 @@ public boolean equals(final Object o) { return false; } final ProducerConfig that = (ProducerConfig) o; - return getMaxConcurrentRequests() == that.getMaxConcurrentRequests() - && Objects.equals(getClientId(), that.getClientId()) + return Objects.equals(getClientId(), that.getClientId()) && Objects.equals(getCloseTimeout(), that.getCloseTimeout()) && Objects.equals(getConfig(), that.getConfig()) && Objects.equals(getRequestTimeout(), that.getRequestTimeout()) diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawConfigParser.java b/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawConfigParser.java index 34b8fe2..3c61046 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawConfigParser.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawConfigParser.java @@ -36,7 +36,7 @@ * to overwrite the parsing for nested types. */ public final class RawConfigParser implements ConfigParser { - static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 3; + static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 100; static final Duration DEFAULT_IN_FLIGHT_RECORD_CHECK_INTERVAL = Duration.ofSeconds(1); private final ConfigParser recordsConfigParser; diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParser.java b/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParser.java index d5363cd..94effcd 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParser.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParser.java @@ -45,14 +45,11 @@ public class RawProducerConfigParser implements ConfigParser servers = get(config.servers, DEFAULT_SERVERS, ConfigParserUtil::splitCommaSeparatedString); final String clientId = get(config.clientId, DEFAULT_CLIENT_ID); @@ -63,8 +60,7 @@ public class RawProducerConfigParser implements ConfigParser producerConfig = get(config.config, new HashMap<>(), this::parseProperties); - return new ProducerConfig( - clientId, closeTimeout, producerConfig, maxConcurrentRequests, requestTimeout, servers); + return new ProducerConfig(clientId, closeTimeout, producerConfig, requestTimeout, servers); } private @NonNull Map parseProperties(final @NonNull String propertiesString) { diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/config/raw/RawProducerConfig.java b/exporter/src/main/java/io/zeebe/exporters/kafka/config/raw/RawProducerConfig.java index 9695fb0..3840dec 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/config/raw/RawProducerConfig.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/config/raw/RawProducerConfig.java @@ -38,13 +38,6 @@ public class RawProducerConfig { */ public String config; - /** - * Max concurrent requests to the Kafka broker; note that in flight records are batched such that - * in one request you can easily have a thousand records, depending on the producer's batch - * configuration. - */ - public Integer maxConcurrentRequests; - /** * Controls how long the producer will wait for a request to be acknowledged by the Kafka broker * before retrying it. Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/producer/DefaultKafkaProducerFactory.java b/exporter/src/main/java/io/zeebe/exporters/kafka/producer/DefaultKafkaProducerFactory.java index 499b1f6..47a2d8a 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/producer/DefaultKafkaProducerFactory.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/producer/DefaultKafkaProducerFactory.java @@ -48,9 +48,9 @@ public final class DefaultKafkaProducerFactory implements KafkaProducerFactory { final Map options = new HashMap<>(); options.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - options.put( - ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, - config.getProducer().getMaxConcurrentRequests()); + // since we're using "infinite" retries/delivery with an idempotent producer, setting the max + // in flight requests to 1 ensures batches are delivered in order. + options.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); options.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); options.put( ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, diff --git a/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParserTest.java b/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParserTest.java index a91d657..6102276 100644 --- a/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParserTest.java +++ b/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawProducerConfigParserTest.java @@ -39,14 +39,12 @@ public void shouldUseDefaultValuesForMissingProperties() { // then assertThat(parsed) .extracting( - "maxConcurrentRequests", "servers", "clientId", "closeTimeout", "requestTimeout", "config") .containsExactly( - RawProducerConfigParser.DEFAULT_MAX_CONCURRENT_REQUESTS, RawProducerConfigParser.DEFAULT_SERVERS, RawProducerConfigParser.DEFAULT_CLIENT_ID, RawProducerConfigParser.DEFAULT_CLOSE_TIMEOUT, @@ -58,7 +56,6 @@ public void shouldUseDefaultValuesForMissingProperties() { public void shouldParse() { // given final RawProducerConfig config = new RawProducerConfig(); - config.maxConcurrentRequests = 1; config.servers = "localhost:3000"; config.clientId = "client"; config.closeTimeoutMs = 3000L; @@ -71,14 +68,12 @@ public void shouldParse() { // then assertThat(parsed) .extracting( - "maxConcurrentRequests", "servers", "clientId", "closeTimeout", "requestTimeout", "config") .containsExactly( - 1, Collections.singletonList("localhost:3000"), "client", Duration.ofSeconds(3),