Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] Retries on failures #1768

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 101 additions & 56 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -82,6 +83,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;

/**
Expand Down Expand Up @@ -728,8 +730,31 @@ public Builder setClientNetworkBufferSize(int size) {
return this;
}


/**
* Sets list of causes that should be retried on.
* Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]}
* Use {@link ClientFaultCause#None} to disable retries.
*
* @param causes - list of causes
* @return
*/
public Builder retryOnFailures(ClientFaultCause ...causes) {
StringJoiner joiner = new StringJoiner(VALUES_LIST_DELIMITER);
for (ClientFaultCause cause : causes) {
joiner.add(cause.name());
}
this.configuration.put("client_retry_on_failures", joiner.toString());
return this;
}

public Builder setMaxRetries(int maxRetries) {
this.configuration.put(ClickHouseClientOption.RETRY.getKey(), String.valueOf(maxRetries));
return this;
}

public Client build() {
this.configuration = setDefaults(this.configuration);
setDefaults();

// check if endpoint are empty. so can not initiate client
if (this.endpoints.isEmpty()) {
Expand Down Expand Up @@ -776,65 +801,69 @@ public Client build() {
return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor);
}

private Map<String, String> setDefaults(Map<String, String> userConfig) {
private void setDefaults() {

// set default database name if not specified
if (!userConfig.containsKey("database")) {
userConfig.put("database", (String) ClickHouseDefaults.DATABASE.getDefaultValue());
if (!configuration.containsKey("database")) {
setDefaultDatabase((String) ClickHouseDefaults.DATABASE.getDefaultValue());
}

if (!userConfig.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) {
userConfig.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(),
String.valueOf(ClickHouseClientOption.MAX_EXECUTION_TIME.getDefaultValue()));
if (!configuration.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) {
setExecutionTimeout(0, MILLIS);
}

if (!userConfig.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) {
userConfig.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(),
if (!configuration.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) {
configuration.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(),
String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue()));
}

if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) {
userConfig.put("compression.lz4.uncompressed_buffer_size",
String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE));
if (!configuration.containsKey("compression.lz4.uncompressed_buffer_size")) {
setLZ4UncompressedBufferSize(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE);
}

if (!configuration.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) {
useServerTimeZone(true);
}

if (!userConfig.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) {
userConfig.put(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "true");
if (!configuration.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) {
setServerTimeZone("UTC");
}

if (!userConfig.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) {
userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC");
if (!configuration.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
useAsyncRequests(false);
}

if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false");
if (!configuration.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) {
setMaxConnections(10);
}

if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) {
userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10");
if (!configuration.containsKey("connection_request_timeout")) {
setConnectionRequestTimeout(10, SECONDS);
}

if (!userConfig.containsKey("connection_request_timeout")) {
userConfig.put("connection_request_timeout", "10000");
if (!configuration.containsKey("connection_reuse_strategy")) {
setConnectionReuseStrategy(ConnectionReuseStrategy.FIFO);
}

if (!userConfig.containsKey("connection_reuse_strategy")) {
userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name());
if (!configuration.containsKey("connection_pool_enabled")) {
enableConnectionPool(true);
}

if (!userConfig.containsKey("connection_pool_enabled")) {
userConfig.put("connection_pool_enabled", "true");
if (!configuration.containsKey("connection_ttl")) {
setConnectionTTL(-1, MILLIS);
}

if (!userConfig.containsKey("connection_ttl")) {
userConfig.put("connection_ttl", "-1");
if (!configuration.containsKey("client_retry_on_failures")) {
retryOnFailures(ClientFaultCause.NoHttpResponse, ClientFaultCause.ConnectTimeout, ClientFaultCause.ConnectionRequestTimeout);
}

if (!userConfig.containsKey("client_network_buffer_size")) {
if (!configuration.containsKey("client_network_buffer_size")) {
setClientNetworkBufferSize(8192);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add it as const

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

return userConfig;
if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {
setMaxRetries(3);
}
}
}

Expand Down Expand Up @@ -1008,6 +1037,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand Down Expand Up @@ -1046,16 +1076,19 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException e) {
LOG.warn("Failed to get response. Retrying.", e);
selectedNode = getNextAliveNode();
continue;
} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (IOException e) {
LOG.info("Interrupted while waiting for response.");
throw new ClientException("Failed to get query response", e);
throw new ClientException("Insert request failed", e);
}
}
throw new ClientException("Failed to get table schema: too many retries");
throw new ClientException("Insert request failed after retries", lastException);
};

return runAsyncOperation(supplier, settings.getAllSettings());
Expand All @@ -1075,7 +1108,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
}

globalClientStats.get(operationId).stop(ClientMetrics.OP_SERIALIZATION);
LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization"));
return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), format, settings);
}
}
Expand Down Expand Up @@ -1132,6 +1164,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand Down Expand Up @@ -1166,25 +1199,27 @@ public CompletableFuture<InsertResponse> insert(String tableName,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException e) {
if (i < maxRetries) {
try {
data.reset();
} catch (IOException ioe) {
throw new ClientException("Failed to get response", e);
}
LOG.warn("Failed to get response. Retrying.", e);
} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a duplicated code? (i see in other inserts as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I left it as duplicate because handling it in dedicated method would not be much shorter. I will review this code later if it can be extracted to a single method.

lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
selectedNode = getNextAliveNode();
} else {
throw new ClientException("Server did not respond", e);
throw lastException;
}
continue;
} catch (IOException e) {
LOG.info("Interrupted while waiting for response.");
throw new ClientException("Failed to get query response", e);
throw new ClientException("Insert request failed", e);
}

if (i < maxRetries) {
try {
data.reset();
} catch (IOException ioe) {
throw new ClientException("Failed to reset stream before next attempt", ioe);
}
}
}
throw new ClientException("Failed to insert data: too many retries");
throw new ClientException("Insert request failed after retries", lastException);
};
} else {
responseSupplier = () -> {
Expand All @@ -1211,7 +1246,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
clickHouseResponse = future.get();
}
InsertResponse response = new InsertResponse(clickHouseResponse, clientStats);
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
return response;
} catch (ExecutionException e) {
throw new ClientException("Failed to get insert response", e.getCause());
Expand Down Expand Up @@ -1300,6 +1334,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();
ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
try {
ClassicHttpResponse httpResponse =
Expand All @@ -1325,15 +1360,23 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
metrics.operationComplete();

return new QueryResponse(httpResponse, finalSettings.getFormat(), finalSettings, metrics);

} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
lastException = httpClientHelper.wrapException("Query request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (ClientException e) {
throw e;
} catch (ConnectionRequestTimeoutException | ConnectTimeoutException e) {
throw new ConnectionInitiationException("Failed to get connection", e);
} catch (Exception e) {
throw new ClientException("Failed to execute query", e);
throw new ClientException("Query request failed", e);
}
}
throw new ClientException("Failed to get table schema: too many retries");

throw new ClientException("Query request failed after retries", lastException);
};
} else {
ClickHouseRequest<?> request = oldClient.read(getServerNode());
Expand Down Expand Up @@ -1610,4 +1653,6 @@ public Set<String> getEndpoints() {
private ClickHouseNode getNextAliveNode() {
return serverNodes.get(0);
}

public static final String VALUES_LIST_DELIMITER = ",";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.clickhouse.client.api;

public enum ClientFaultCause {

None,

NoHttpResponse,
ConnectTimeout,
ConnectionRequestTimeout,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import com.clickhouse.client.ClickHouseSslContextProvider;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.ClientFaultCause;
import com.clickhouse.client.api.ClientMisconfigurationException;
import com.clickhouse.client.api.ConnectionInitiationException;
import com.clickhouse.client.api.ConnectionReuseStrategy;
import com.clickhouse.client.api.ServerException;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.http.ClickHouseHttpProto;
import com.clickhouse.client.http.config.ClickHouseHttpOption;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
Expand Down Expand Up @@ -61,7 +64,11 @@
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand All @@ -78,6 +85,8 @@ public class HttpAPIClientHelper {

private String proxyAuthHeaderValue;

private final Set<ClientFaultCause> defaultRetryCauses;

public HttpAPIClientHelper(Map<String, String> configuration) {
this.chConfiguration = configuration;
this.httpClient = createHttpClient();
Expand All @@ -93,6 +102,11 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
boolean usingServerCompression= chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);

defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class);
if (defaultRetryCauses.contains(ClientFaultCause.None)) {
defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None);
}
}

/**
Expand Down Expand Up @@ -426,4 +440,39 @@ public static <T> T getHeaderVal(Header header, T defaultValue, Function<String,

return converter.apply(header.getValue());
}

public boolean shouldRetry(Exception ex, Map<String, Object> requestSettings) {
Set<ClientFaultCause> retryCauses = (Set<ClientFaultCause>)
requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses);

if (retryCauses.contains(ClientFaultCause.None)) {
return false;
}

if (ex instanceof NoHttpResponseException ) {
return retryCauses.contains(ClientFaultCause.NoHttpResponse);
}

if (ex instanceof ConnectException) {
return retryCauses.contains(ClientFaultCause.ConnectTimeout);
}

if (ex instanceof ConnectionRequestTimeoutException) {
return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout);
}

return false;
}

// This method wraps some client specific exceptions into specific ClientException or just ClientException
// ClientException will be also wrapped
public ClientException wrapException(String message, Exception cause) {
if (cause instanceof ConnectionRequestTimeoutException ||
cause instanceof ConnectTimeoutException ||
cause instanceof ConnectException) {
return new ConnectionInitiationException(message, cause);
}

return new ClientException(message, cause);
}
}
Loading
Loading