Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support clickhouse jdbc datasource(#40894) #42930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion be/src/exec/jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,42 @@ void JDBCScanner::_init_profile() {

StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_class, SlotDescriptor* slot_desc) {
auto type = slot_desc->type().type;
if (java_class == "java.lang.Short") {
if (java_class == "java.lang.Byte") {
if (type != TYPE_BOOLEAN && type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT &&
type != TYPE_BIGINT) {
return Status::NotSupported(
fmt::format("Type mismatches on column[{}], JDBC result type is Byte, please set the type to "
"one of boolean,tinyint,smallint,int,bigint",
slot_desc->col_name()));
}
if (type == TYPE_BOOLEAN) {
return TYPE_BOOLEAN;
}
return TYPE_TINYINT;
} else if (java_class == "com.clickhouse.data.value.UnsignedByte") {
if (type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedByte, please set the type to "
"one of smallint,int,bigint",
slot_desc->col_name()));
}
return TYPE_SMALLINT;
} else if (java_class == "java.lang.Short") {
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
fmt::format("Type mismatches on column[{}], JDBC result type is Short, please set the type to "
"one of tinyint,smallint,int,bigint",
slot_desc->col_name()));
}
return TYPE_SMALLINT;
} else if (java_class == "com.clickhouse.data.value.UnsignedShort") {
if (type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedShort, please set the type to "
"one of int,bigint",
slot_desc->col_name()));
}
return TYPE_INT;
} else if (java_class == "java.lang.Integer") {
if (type != TYPE_TINYINT && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
Expand All @@ -218,6 +246,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "com.clickhouse.data.value.UnsignedInteger") {
if (type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedInteger, please set the type to bigint",
slot_desc->col_name()));
}
return TYPE_BIGINT;
} else if (java_class == "java.lang.Long") {
if (type != TYPE_BIGINT) {
return Status::NotSupported(fmt::format(
Expand All @@ -232,6 +267,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "com.clickhouse.data.value.UnsignedLong") {
if (type != TYPE_LARGEINT) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is UnsignedLong, please set the type to largeint",
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.lang.Boolean") {
if (type != TYPE_BOOLEAN && type != TYPE_SMALLINT && type != TYPE_INT && type != TYPE_BIGINT) {
return Status::NotSupported(
Expand Down Expand Up @@ -282,6 +324,13 @@ StatusOr<LogicalType> JDBCScanner::_precheck_data_type(const std::string& java_c
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.time.LocalDate") {
if (type != TYPE_DATE) {
return Status::NotSupported(fmt::format(
"Type mismatches on column[{}], JDBC result type is LocalDate, please set the type to date",
slot_desc->col_name()));
}
return TYPE_VARCHAR;
} else if (java_class == "java.math.BigDecimal") {
if (type != TYPE_DECIMAL32 && type != TYPE_DECIMAL64 && type != TYPE_DECIMAL128 && type != TYPE_VARCHAR) {
return Status::NotSupported(
Seaven marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,13 @@ under the License.
<artifactId>postgresql</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc -->
<!-- we need clickhouse driver for jdbc connector -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.mockrunner/mockrunner-jdbc -->
<dependency>
<groupId>com.mockrunner</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public enum ProtocolType {
MYSQL,
POSTGRES,
ORACLE,
MARIADB
MARIADB,

CLICKHOUSE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.jdbc;

import com.google.common.collect.ImmutableSet;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Type;
import com.starrocks.connector.exception.StarRocksConnectorException;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class ClickhouseSchemaResolver extends JDBCSchemaResolver {
Map<String, String> properties;

public static final Set<String> SUPPORTED_TABLE_TYPES = new HashSet<>(
Arrays.asList("LOG TABLE", "MEMORY TABLE", "TEMPORARY TABLE", "VIEW", "DICTIONARY", "SYSTEM TABLE",
"REMOTE TABLE", "TABLE"));

public ClickhouseSchemaResolver(Map<String, String> properties) {
this.properties = properties;
}

@Override
public Collection<String> listSchemas(Connection connection) {
try (ResultSet resultSet = connection.getMetaData().getSchemas()) {
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
while (resultSet.next()) {
String schemaName = resultSet.getString("TABLE_SCHEM");
// skip internal schemas
if (!schemaName.equalsIgnoreCase("INFORMATION_SCHEMA") && !schemaName.equalsIgnoreCase("system")) {
schemaNames.add(schemaName);
}
}
return schemaNames.build();
} catch (SQLException e) {
throw new StarRocksConnectorException(e.getMessage());
}
}


@Override
public ResultSet getTables(Connection connection, String dbName) throws SQLException {
String tableTypes = properties.get("table_types");
if (null != tableTypes) {
String[] tableTypesArray = tableTypes.split(",");
if (tableTypesArray.length == 0) {
throw new StarRocksConnectorException("table_types should be populated with table types separated by " +
"comma, e.g. 'TABLE,VIEW'. Currently supported type includes:" +
String.join(",", SUPPORTED_TABLE_TYPES));
}

for (String tt : tableTypesArray) {
if (!SUPPORTED_TABLE_TYPES.contains(tt)) {
throw new StarRocksConnectorException("Unsupported table type found: " + tt,
",Currently supported table types includes:" + String.join(",", SUPPORTED_TABLE_TYPES));
}
}
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null, tableTypesArray);
}
return connection.getMetaData().getTables(connection.getCatalog(), dbName, null,
SUPPORTED_TABLE_TYPES.toArray(new String[SUPPORTED_TABLE_TYPES.size()]));

}

@Override
public ResultSet getColumns(Connection connection, String dbName, String tblName) throws SQLException {
return connection.getMetaData().getColumns(connection.getCatalog(), dbName, tblName, "%");
}


@Override
public Type convertColumnType(int dataType, String typeName, int columnSize, int digits) {
PrimitiveType primitiveType;
switch (dataType) {
case Types.TINYINT:
primitiveType = PrimitiveType.TINYINT;
break;
case Types.SMALLINT:
primitiveType = PrimitiveType.SMALLINT;
break;
case Types.INTEGER:
primitiveType = PrimitiveType.INT;
break;
case Types.BIGINT:
primitiveType = PrimitiveType.BIGINT;
break;
case Types.NUMERIC:
primitiveType = PrimitiveType.LARGEINT;
break;
case Types.FLOAT:
primitiveType = PrimitiveType.FLOAT;
break;
case Types.DOUBLE:
primitiveType = PrimitiveType.DOUBLE;
break;
case Types.BOOLEAN:
primitiveType = PrimitiveType.BOOLEAN;
break;
case Types.VARCHAR:
return ScalarType.createVarcharType(65533);
case Types.DATE:
primitiveType = PrimitiveType.DATE;
break;
case Types.TIMESTAMP:
primitiveType = PrimitiveType.DATETIME;
break;
case Types.DECIMAL:
// Decimal(9,9), first 9 is precision, second 9 is scale
String[] precisionAndScale =
typeName.replace("Decimal", "").replace("(", "")
.replace(")", "").replace(" ", "")
.split(",");
if (precisionAndScale.length != 2) {
// should not go here, but if it does, we make it DECIMALV2.
throw new StarRocksConnectorException(
"Cannot extract precision and scale from Decimal typename:" + typeName);
} else {
int precision = Integer.parseInt(precisionAndScale[0]);
int scale = Integer.parseInt(precisionAndScale[1]);
return ScalarType.createUnifiedDecimalType(precision, scale);
}
default:
primitiveType = PrimitiveType.UNKNOWN_TYPE;
break;
}
return ScalarType.createType(primitiveType);
}


}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I help you today?

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public JDBCMetadata(Map<String, String> properties, String catalogName, HikariDa
schemaResolver = new PostgresSchemaResolver();
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("mariadb")) {
schemaResolver = new MysqlSchemaResolver();
} else if (properties.get(JDBCResource.DRIVER_CLASS).toLowerCase().contains("clickhouse")) {
schemaResolver = new ClickhouseSchemaResolver(properties);
} else {
LOG.warn("{} not support yet", properties.get(JDBCResource.DRIVER_CLASS));
throw new StarRocksConnectorException(properties.get(JDBCResource.DRIVER_CLASS) + " not support yet");
Expand Down
Loading
Loading