Skip to content

Commit

Permalink
chore(exporter): reduce concurrent requests to 1 (#34)
Browse files Browse the repository at this point in the history
- remove concurrent requests config param
  • Loading branch information
npepinpe authored Sep 11, 2020
1 parent 99c2797 commit 761ef2e
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ public final class ProducerConfig {
private final String clientId;
private final Duration closeTimeout;
private final Map<String, Object> config;
private final int maxConcurrentRequests;
private final Duration requestTimeout;
private final List<String> servers;

public ProducerConfig(
final String clientId,
final Duration closeTimeout,
final Map<String, Object> config,
final int maxConcurrentRequests,
final Duration requestTimeout,
final List<String> servers) {
this.clientId = Objects.requireNonNull(clientId);
this.closeTimeout = Objects.requireNonNull(closeTimeout);
this.config = Objects.requireNonNull(config);
this.maxConcurrentRequests = maxConcurrentRequests;
this.requestTimeout = Objects.requireNonNull(requestTimeout);
this.servers = Objects.requireNonNull(servers);
}
Expand All @@ -67,10 +64,6 @@ public ProducerConfig(
return config;
}

public int getMaxConcurrentRequests() {
return maxConcurrentRequests;
}

public @NonNull Duration getRequestTimeout() {
return requestTimeout;
}
Expand All @@ -81,8 +74,7 @@ public int getMaxConcurrentRequests() {

@Override
public int hashCode() {
return Objects.hash(
clientId, closeTimeout, config, maxConcurrentRequests, requestTimeout, servers);
return Objects.hash(clientId, closeTimeout, config, requestTimeout, servers);
}

@Override
Expand All @@ -94,8 +86,7 @@ public boolean equals(final Object o) {
return false;
}
final ProducerConfig that = (ProducerConfig) o;
return getMaxConcurrentRequests() == that.getMaxConcurrentRequests()
&& Objects.equals(getClientId(), that.getClientId())
return Objects.equals(getClientId(), that.getClientId())
&& Objects.equals(getCloseTimeout(), that.getCloseTimeout())
&& Objects.equals(getConfig(), that.getConfig())
&& Objects.equals(getRequestTimeout(), that.getRequestTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* to overwrite the parsing for nested types.
*/
public final class RawConfigParser implements ConfigParser<RawConfig, Config> {
static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 3;
static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 100;
static final Duration DEFAULT_IN_FLIGHT_RECORD_CHECK_INTERVAL = Duration.ofSeconds(1);

private final ConfigParser<RawRecordsConfig, RecordsConfig> recordsConfigParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,11 @@ public class RawProducerConfigParser implements ConfigParser<RawProducerConfig,
static final String DEFAULT_CLIENT_ID = "zeebe";
static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(20);
static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5);
static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 3;

@Override
public @NonNull ProducerConfig parse(final @Nullable RawProducerConfig config) {
Objects.requireNonNull(config);

final Integer maxConcurrentRequests =
get(config.maxConcurrentRequests, DEFAULT_MAX_CONCURRENT_REQUESTS);
final List<String> servers =
get(config.servers, DEFAULT_SERVERS, ConfigParserUtil::splitCommaSeparatedString);
final String clientId = get(config.clientId, DEFAULT_CLIENT_ID);
Expand All @@ -63,8 +60,7 @@ public class RawProducerConfigParser implements ConfigParser<RawProducerConfig,
final Map<String, Object> producerConfig =
get(config.config, new HashMap<>(), this::parseProperties);

return new ProducerConfig(
clientId, closeTimeout, producerConfig, maxConcurrentRequests, requestTimeout, servers);
return new ProducerConfig(clientId, closeTimeout, producerConfig, requestTimeout, servers);
}

private @NonNull Map<String, Object> parseProperties(final @NonNull String propertiesString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public class RawProducerConfig {
*/
public String config;

/**
* Max concurrent requests to the Kafka broker; note that in flight records are batched such that
* in one request you can easily have a thousand records, depending on the producer's batch
* configuration.
*/
public Integer maxConcurrentRequests;

/**
* Controls how long the producer will wait for a request to be acknowledged by the Kafka broker
* before retrying it. Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public final class DefaultKafkaProducerFactory implements KafkaProducerFactory {
final Map<String, Object> options = new HashMap<>();

options.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
options.put(
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
config.getProducer().getMaxConcurrentRequests());
// since we're using "infinite" retries/delivery with an idempotent producer, setting the max
// in flight requests to 1 ensures batches are delivered in order.
options.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
options.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
options.put(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ public void shouldUseDefaultValuesForMissingProperties() {
// then
assertThat(parsed)
.extracting(
"maxConcurrentRequests",
"servers",
"clientId",
"closeTimeout",
"requestTimeout",
"config")
.containsExactly(
RawProducerConfigParser.DEFAULT_MAX_CONCURRENT_REQUESTS,
RawProducerConfigParser.DEFAULT_SERVERS,
RawProducerConfigParser.DEFAULT_CLIENT_ID,
RawProducerConfigParser.DEFAULT_CLOSE_TIMEOUT,
Expand All @@ -58,7 +56,6 @@ public void shouldUseDefaultValuesForMissingProperties() {
public void shouldParse() {
// given
final RawProducerConfig config = new RawProducerConfig();
config.maxConcurrentRequests = 1;
config.servers = "localhost:3000";
config.clientId = "client";
config.closeTimeoutMs = 3000L;
Expand All @@ -71,14 +68,12 @@ public void shouldParse() {
// then
assertThat(parsed)
.extracting(
"maxConcurrentRequests",
"servers",
"clientId",
"closeTimeout",
"requestTimeout",
"config")
.containsExactly(
1,
Collections.singletonList("localhost:3000"),
"client",
Duration.ofSeconds(3),
Expand Down

0 comments on commit 761ef2e

Please sign in to comment.