Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client-v2] V2 implement writer api #2034

Merged
merged 7 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 67 additions & 59 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.data_formats.NativeFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryFormatSerializer;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesAndTypesFormatReader;
import com.clickhouse.client.api.data_formats.RowBinaryWithNamesFormatReader;
import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader;
Expand Down Expand Up @@ -43,7 +44,6 @@
import com.clickhouse.client.api.query.Records;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
Expand All @@ -59,6 +59,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ConnectException;
Expand Down Expand Up @@ -623,6 +624,11 @@ public Builder useHttpCompression(boolean enabled) {
return this;
}

public Builder appCompressedData(boolean enabled) {
this.configuration.put(ClientConfigProperties.APP_COMPRESSED_DATA.getKey(), String.valueOf(enabled));
return this;
}

/**
* Sets buffer size for uncompressed data in LZ4 compression.
* For outgoing data it is the size of a buffer that will be compressed.
Expand Down Expand Up @@ -1066,6 +1072,11 @@ public Client build() {

private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000;

/**
* Default size for a buffers used in networking.
*/
public static final int DEFAULT_BUFFER_SIZE = 8192;

private void setDefaults() {

// set default database name if not specified
Expand Down Expand Up @@ -1154,6 +1165,10 @@ private void setDefaults() {
if (!configuration.containsKey(ClientConfigProperties.USE_HTTP_COMPRESSION.getKey())) {
useHttpCompression(false);
}

if (!configuration.containsKey(ClientConfigProperties.APP_COMPRESSED_DATA.getKey())) {
appCompressedData(false);
}
}
}

Expand Down Expand Up @@ -1236,45 +1251,9 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
schemaSerializers.put(column.getColumnName(), (obj, stream) -> {
Object value = getterMethod.invoke(obj);

if (defaultsSupport) {
if (value != null) {//Because we now support defaults, we have to send nonNull
SerializerUtils.writeNonNull(stream);//Write 0 for no default

if (column.isNullable()) {//If the column is nullable
SerializerUtils.writeNonNull(stream);//Write 0 for not null
}
} else {//So if the object is null
if (column.hasDefault()) {
SerializerUtils.writeNull(stream);//Send 1 for default
return;
} else if (column.isNullable()) {//And the column is nullable
SerializerUtils.writeNonNull(stream);
SerializerUtils.writeNull(stream);//Then we send null, write 1
return;//And we're done
} else if (column.getDataType() == ClickHouseDataType.Array) {//If the column is an array
SerializerUtils.writeNonNull(stream);//Then we send nonNull
} else {
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
}
}
} else {
if (column.isNullable()) {
if (value == null) {
SerializerUtils.writeNull(stream);
return;
}
SerializerUtils.writeNonNull(stream);
} else if (value == null) {
if (column.getDataType() == ClickHouseDataType.Array) {
SerializerUtils.writeNonNull(stream);
} else {
throw new IllegalArgumentException(String.format("An attempt to write null into not nullable column '%s'", column.getColumnName()));
}
}
if (RowBinaryFormatSerializer.writeValuePreamble(stream, defaultsSupport, column, value)) {
SerializerUtils.serializeData(stream, value, column);
}

//Handle the different types
SerializerUtils.serializeData(stream, value, column);
});
} else {
LOG.warn("No getter method found for column: {}", propertyName);
Expand Down Expand Up @@ -1473,7 +1452,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
}

/**
* <p>Sends write request to database. Input data is read from the input stream.</p>
* Sends write request to database. Input data is read from the input stream.
*
* @param tableName - destination table name
* @param data - data stream to insert
Expand All @@ -1482,7 +1461,49 @@ public CompletableFuture<InsertResponse> insert(String tableName, InputStream da
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
*/
public CompletableFuture<InsertResponse> insert(String tableName,
InputStream data,
InputStream data,
ClickHouseFormat format,
InsertSettings settings) {

final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(),
ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getDefaultValue())) :
settings.getInputStreamCopyBufferSize();

if (writeBufferSize <= 0) {
throw new IllegalArgumentException("Buffer size must be greater than 0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we can not do that in InsertSettings and throw the exception from there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this value can be set by two method setInputStreamCopyBufferSize and setOption and here I'm doing specific calculation.
One more reason - I keep validation here because it is implementation specific code for this operation. InsertSettings should not be aware of this validation because they may be used for other purposes.

}

return insert(tableName, new DataStreamWriter() {
@Override
public void onOutput(OutputStream out) throws IOException {
byte[] buffer = new byte[writeBufferSize];
int bytesRead;
while ((bytesRead = data.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
out.close();
}

@Override
public void onRetry() throws IOException {
data.reset();
}
},
format, settings);
}

/**
* Does an insert request to a server. Data is pushed when a {@link DataStreamWriter#onOutput(OutputStream)} is called.
*
* @param tableName - target table name
* @param writer - {@link DataStreamWriter} implementation
* @param format - source format
* @param settings - operation settings
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
*/
public CompletableFuture<InsertResponse> insert(String tableName,
DataStreamWriter writer,
ClickHouseFormat format,
InsertSettings settings) {

Expand Down Expand Up @@ -1513,6 +1534,8 @@ public CompletableFuture<InsertResponse> insert(String tableName,

settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name());
final InsertSettings finalSettings = settings;
final String sqlStmt = "INSERT INTO \"" + tableName + "\" FORMAT " + format.name();
finalSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt);
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();
Expand All @@ -1523,17 +1546,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
try (ClassicHttpResponse httpResponse =
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(),
out -> {
out.write("INSERT INTO ".getBytes());
out.write(tableName.getBytes());
out.write(" FORMAT ".getBytes());
out.write(format.name().getBytes());
out.write(" \n".getBytes());

byte[] buffer = new byte[writeBufferSize];
int bytesRead;
while ((bytesRead = data.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
writer.onOutput(out);
out.close();
})) {

Expand Down Expand Up @@ -1566,7 +1579,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,

if (i < maxRetries) {
try {
data.reset();
writer.onRetry();
} catch (IOException ioe) {
throw new ClientException("Failed to reset stream before next attempt", ioe);
}
Expand All @@ -1581,12 +1594,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,

CompletableFuture<ClickHouseResponse> future = null;
future = request.data(output -> {
//Copy the data from the input stream to the output stream
byte[] buffer = new byte[settings.getInputStreamCopyBufferSize()];
int bytesRead;
while ((bytesRead = data.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
writer.onOutput(output);
output.close();
}).option(ClickHouseClientOption.ASYNC, false).execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public enum ClientConfigProperties {

QUERY_ID("query_id"), // actually a server setting, but has client effect too

CLIENT_NETWORK_BUFFER_SIZE("client_network_buffer_size"),

CLIENT_NETWORK_BUFFER_SIZE("client_network_buffer_size", String.valueOf(Client.Builder.DEFAULT_BUFFER_SIZE)),

ACCESS_TOKEN("access_token"), SSL_AUTH("ssl_authentication"),

Expand All @@ -123,7 +122,12 @@ public enum ClientConfigProperties {
@Deprecated
PRODUCT_NAME("product_name"),

BEARERTOKEN_AUTH ("bearer_token")
BEARERTOKEN_AUTH ("bearer_token"),
/**
* Indicates that data provided for write operation is compressed by application.
*/
APP_COMPRESSED_DATA("app_compressed_data"),

;

private String key;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.clickhouse.client.api;

import java.io.IOException;
import java.io.OutputStream;

public interface DataStreamWriter {

/**
* Called by client when output stream is ready for user data.
* This method is called once per operation, so all data should be written while the call.
* Output stream will be closed by client.
* When client compression is enabled, then output stream will be a compressing one.
* If {@link ClientConfigProperties#APP_COMPRESSED_DATA} is set for an operation,
* then {@param out} will be raw IO stream without compression.
* @param out - output stream
* @throws IOException - when any IO exceptions happens.
*/
void onOutput(OutputStream out) throws IOException;

/**
* Is called when client is going to perform a retry.
* It is optional to implement this method because most cases there is nothing to reset.
* Useful to reset wrapped stream or throw exception to indicate that retry is not supported for a data source.
* @throws IOException - when any IO exception happens.
*/
default void onRetry() throws IOException {}
}
Loading
Loading