Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce concurrent requests and remove explicit configuration parameter #34

Merged
merged 1 commit into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ public final class ProducerConfig {
private final String clientId;
private final Duration closeTimeout;
private final Map<String, Object> config;
private final int maxConcurrentRequests;
private final Duration requestTimeout;
private final List<String> servers;

public ProducerConfig(
final String clientId,
final Duration closeTimeout,
final Map<String, Object> config,
final int maxConcurrentRequests,
final Duration requestTimeout,
final List<String> servers) {
this.clientId = Objects.requireNonNull(clientId);
this.closeTimeout = Objects.requireNonNull(closeTimeout);
this.config = Objects.requireNonNull(config);
this.maxConcurrentRequests = maxConcurrentRequests;
this.requestTimeout = Objects.requireNonNull(requestTimeout);
this.servers = Objects.requireNonNull(servers);
}
Expand All @@ -67,10 +64,6 @@ public ProducerConfig(
return config;
}

public int getMaxConcurrentRequests() {
return maxConcurrentRequests;
}

public @NonNull Duration getRequestTimeout() {
return requestTimeout;
}
Expand All @@ -81,8 +74,7 @@ public int getMaxConcurrentRequests() {

@Override
public int hashCode() {
return Objects.hash(
clientId, closeTimeout, config, maxConcurrentRequests, requestTimeout, servers);
return Objects.hash(clientId, closeTimeout, config, requestTimeout, servers);
}

@Override
Expand All @@ -94,8 +86,7 @@ public boolean equals(final Object o) {
return false;
}
final ProducerConfig that = (ProducerConfig) o;
return getMaxConcurrentRequests() == that.getMaxConcurrentRequests()
&& Objects.equals(getClientId(), that.getClientId())
return Objects.equals(getClientId(), that.getClientId())
&& Objects.equals(getCloseTimeout(), that.getCloseTimeout())
&& Objects.equals(getConfig(), that.getConfig())
&& Objects.equals(getRequestTimeout(), that.getRequestTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* to overwrite the parsing for nested types.
*/
public final class RawConfigParser implements ConfigParser<RawConfig, Config> {
static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 3;
static final int DEFAULT_MAX_IN_FLIGHT_RECORDS = 100;
static final Duration DEFAULT_IN_FLIGHT_RECORD_CHECK_INTERVAL = Duration.ofSeconds(1);

private final ConfigParser<RawRecordsConfig, RecordsConfig> recordsConfigParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,11 @@ public class RawProducerConfigParser implements ConfigParser<RawProducerConfig,
static final String DEFAULT_CLIENT_ID = "zeebe";
static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(20);
static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5);
static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 3;

@Override
public @NonNull ProducerConfig parse(final @Nullable RawProducerConfig config) {
Objects.requireNonNull(config);

final Integer maxConcurrentRequests =
get(config.maxConcurrentRequests, DEFAULT_MAX_CONCURRENT_REQUESTS);
final List<String> servers =
get(config.servers, DEFAULT_SERVERS, ConfigParserUtil::splitCommaSeparatedString);
final String clientId = get(config.clientId, DEFAULT_CLIENT_ID);
Expand All @@ -63,8 +60,7 @@ public class RawProducerConfigParser implements ConfigParser<RawProducerConfig,
final Map<String, Object> producerConfig =
get(config.config, new HashMap<>(), this::parseProperties);

return new ProducerConfig(
clientId, closeTimeout, producerConfig, maxConcurrentRequests, requestTimeout, servers);
return new ProducerConfig(clientId, closeTimeout, producerConfig, requestTimeout, servers);
}

private @NonNull Map<String, Object> parseProperties(final @NonNull String propertiesString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public class RawProducerConfig {
*/
public String config;

/**
* Max concurrent requests to the Kafka broker; note that in flight records are batched such that
* in one request you can easily have a thousand records, depending on the producer's batch
* configuration.
*/
public Integer maxConcurrentRequests;

/**
* Controls how long the producer will wait for a request to be acknowledged by the Kafka broker
* before retrying it. Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public final class DefaultKafkaProducerFactory implements KafkaProducerFactory {
final Map<String, Object> options = new HashMap<>();

options.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
options.put(
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
config.getProducer().getMaxConcurrentRequests());
// since we're using "infinite" retries/delivery with an idempotent producer, setting the max
// in flight requests to 1 ensures batches are delivered in order.
options.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
options.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
options.put(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ public void shouldUseDefaultValuesForMissingProperties() {
// then
assertThat(parsed)
.extracting(
"maxConcurrentRequests",
"servers",
"clientId",
"closeTimeout",
"requestTimeout",
"config")
.containsExactly(
RawProducerConfigParser.DEFAULT_MAX_CONCURRENT_REQUESTS,
RawProducerConfigParser.DEFAULT_SERVERS,
RawProducerConfigParser.DEFAULT_CLIENT_ID,
RawProducerConfigParser.DEFAULT_CLOSE_TIMEOUT,
Expand All @@ -58,7 +56,6 @@ public void shouldUseDefaultValuesForMissingProperties() {
public void shouldParse() {
// given
final RawProducerConfig config = new RawProducerConfig();
config.maxConcurrentRequests = 1;
config.servers = "localhost:3000";
config.clientId = "client";
config.closeTimeoutMs = 3000L;
Expand All @@ -71,14 +68,12 @@ public void shouldParse() {
// then
assertThat(parsed)
.extracting(
"maxConcurrentRequests",
"servers",
"clientId",
"closeTimeout",
"requestTimeout",
"config")
.containsExactly(
1,
Collections.singletonList("localhost:3000"),
"client",
Duration.ofSeconds(3),
Expand Down