Skip to content

Commit

Permalink
fix code and test bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jasmine-ge committed Jul 16, 2024
1 parent 9956a5f commit 9f7401c
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 269 deletions.
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@
<artifactId>clickhouse</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>timeplus</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 1 addition & 1 deletion examples/src/test/java/examples/DataSourceITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected void runSql(DataSource ds) throws Exception {
Statement statement = connection.createStatement();

statement.executeQuery("DROP STREAM IF EXISTS test");
statement.executeQuery("CREATE STREAM test(test_uInt16 UInt16, test_Int16 Int16)ENGINE=Log");
statement.executeQuery("CREATE STREAM test(test_uInt16 uint16, test_Int16 int16)ENGINE=Memory");
statement.executeQuery("INSERT INTO test VALUES(" + Short.MAX_VALUE + "," + Short.MIN_VALUE + ")");
ResultSet rs = statement.executeQuery("SELECT * FROM test ORDER BY test_uInt16");
assertTrue(rs.next());
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@
<artifactId>clickhouse</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>timeplus</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions timeplus-native-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<artifactId>clickhouse</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>timeplus</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,5 @@ public byte[] readBytes(int size) throws IOException {
switcher.get().readBinary(bytes);
return bytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public BinarySerializer(BuffedWriter writer, boolean enableCompress) {
compressWriter = new CompressedBuffedWriter(TimeplusDefines.SOCKET_SEND_BUFFER_BYTES, writer);
}
switcher = new Switcher<>(compressWriter, writer);
// max num of byte is 8 for double and long
writeBuffer = new byte[8];
// max num of byte is 32 for Int256
writeBuffer = new byte[32];
}

public void writeVarInt(long x) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.timeplus.misc.StrUtil;
import com.timeplus.misc.SystemUtil;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.timeplus.TimeplusContainer;
import org.testcontainers.utility.MountableFile;

import javax.sql.DataSource;
Expand All @@ -35,31 +35,33 @@ public abstract class AbstractITest implements Serializable {
protected static final ZoneId SERVER_TZ = ZoneId.of("UTC");
protected static final String DRIVER_CLASS_NAME = "com.timeplus.jdbc.TimeplusDriver";

public static final String TIMEPLUS_IMAGE = System.getProperty("TIMEPLUS_IMAGE", "timeplus/timeplusd:develop");
public static final String TIMEPLUS_IMAGE = System.getProperty("TIMEPLUS_IMAGE", "timeplus/timeplusd:2.3.3");
// public static DockerImageName proton_image = DockerImageName.parse(CLICKHOUSE_IMAGE).asCompatibleSubstituteFor("clickhouse/clickhouse-server");

protected static final String TIMEPLUS_USER = SystemUtil.loadProp("CLICKHOUSE_USER", "system");
protected static final String TIMEPLUS_PASSWORD = SystemUtil.loadProp("CLICKHOUSE_PASSWORD", "sys@t+");
protected static final String TIMEPLUS_USER = SystemUtil.loadProp("CLICKHOUSE_USER", "");
protected static final String TIMEPLUS_PASSWORD = SystemUtil.loadProp("CLICKHOUSE_PASSWORD", "");
protected static final String TIMEPLUS_DB = SystemUtil.loadProp("CLICKHOUSE_DB", "");

protected static final int TIMEPLUS_HTTP_PORT = 8123;
protected static final int TIMEPLUS_HTTPS_PORT = 8123;
protected static final int TIMEPLUS_HTTP_PORT = 3218;
protected static final int TIMEPLUS_HTTPS_PORT = 3218;
protected static final int TIMEPLUS_NATIVE_PORT = 8463;
protected static final int TIMEPLUS_NATIVE_SECURE_PORT = 9440;
protected static final int TIMEPLUS_NATIVE_SECURE_PORT = 8463;

@Container
public static ClickHouseContainer container = new ClickHouseContainer(TIMEPLUS_IMAGE)
public static TimeplusContainer container = new TimeplusContainer(TIMEPLUS_IMAGE)
.withEnv("CLICKHOUSE_USER", TIMEPLUS_USER)
.withEnv("CLICKHOUSE_PASSWORD", TIMEPLUS_PASSWORD)
.withEnv("CLICKHOUSE_DB", TIMEPLUS_DB)
.withExposedPorts(TIMEPLUS_HTTP_PORT,
TIMEPLUS_HTTPS_PORT,
TIMEPLUS_NATIVE_PORT,
TIMEPLUS_NATIVE_SECURE_PORT)
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/config.yaml"),
"/etc/timeplusd-server/config.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/users.xml"),
"/etc/timeplusd-server/users.xml")
// .withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/config.yaml"),
// "/etc/timeplusd-server/config.yaml")
// .withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/tls_port.xml"),
// "/etc/timeplusd-server/config.d/tls_port.xml")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/users.yaml"),
"/etc/timeplusd-server/users.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/server.key"),
"/etc/timeplusd-server/server.key")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/server.crt"),
Expand All @@ -83,7 +85,7 @@ protected String getJdbcUrl() {

protected String getJdbcUrl(Object... params) {
StringBuilder settingsStringBuilder = new StringBuilder();
for (int i = 0; i + 1 < params.length; i++) {
for (int i = 0; i + 1 < params.length; i+=2) {
settingsStringBuilder.append(i == 0 ? "?" : "&");
settingsStringBuilder.append(params[i]).append("=").append(params[i + 1]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

package com.timeplus.jdbc;

import com.timeplus.settings.SettingKey;
import com.timeplus.settings.TimeplusConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -33,23 +37,26 @@ public class BalancedTimeplusDataSourceITest extends AbstractITest {

@BeforeEach
public void reset() {
singleDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s", TP_HOST, TP_PORT));
dualDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", TP_HOST, TP_PORT, TP_HOST, TP_PORT));
Map<SettingKey, Serializable> settings = new HashMap<>();
settings.put(SettingKey.user, "proton");
settings.put(SettingKey.password, "proton@t+");
singleDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s", TP_HOST, TP_PORT), settings);
dualDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", TP_HOST, TP_PORT, TP_HOST, TP_PORT), settings);
}

@Test
public void testSingleDatabaseConnection() throws Exception {
withNewConnection(singleDs, connection -> {
withStatement(connection, stmt -> stmt.execute("CREATE DATABASE IF NOT EXISTS test"));
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));

withPreparedStatement(connection, "INSERT INTO test.insert_test (s, i) VALUES (?, ?)", pstmt -> {
pstmt.setString(1, "asd");
pstmt.setInt(2, 42);
pstmt.execute();

ResultSet rs = connection.createStatement().executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = connection.createStatement().executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();

assertEquals("asd", rs.getString("s"));
Expand All @@ -66,7 +73,7 @@ public void testDualDatabaseConnection() throws Exception {

withNewConnection(dualDs, connection -> {
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));
});

withNewConnection(dualDs, connection -> {
Expand All @@ -76,7 +83,7 @@ public void testDualDatabaseConnection() throws Exception {
pstmt.execute();
});
withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand All @@ -91,7 +98,7 @@ public void testDualDatabaseConnection() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * from test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();

assertEquals("asd", rs.getString("s"));
Expand Down Expand Up @@ -120,8 +127,11 @@ public void testDisableConnection() {

@Test
public void testWorkWithEnabledUrl() throws Exception {
Map<SettingKey, Serializable> settings = new HashMap<>();
settings.put(SettingKey.user, "proton");
settings.put(SettingKey.password, "proton@t+");
BalancedTimeplusDataSource halfDatasource = new BalancedTimeplusDataSource(
String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", "not.existed.url", TP_PORT, TP_HOST, TP_PORT), new Properties());
String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", "not.existed.url", TP_PORT, TP_HOST, TP_PORT), settings);

halfDatasource.actualize();

Expand All @@ -131,7 +141,7 @@ public void testWorkWithEnabledUrl() throws Exception {

withNewConnection(halfDatasource, connection -> {
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));
});

withNewConnection(halfDatasource, connection -> {
Expand All @@ -142,7 +152,7 @@ public void testWorkWithEnabledUrl() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.getConnection().createStatement().executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = stmt.getConnection().createStatement().executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand All @@ -157,7 +167,7 @@ public void testWorkWithEnabledUrl() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * from test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i from test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.timeplus.log.LoggerFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.timeplus.TimeplusContainer;
import org.testcontainers.junit.jupiter.Container;

import java.sql.Connection;
Expand All @@ -32,12 +32,12 @@

public class FailoverClickhouseConnectionITest extends AbstractITest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverClickhouseConnectionITest.class);

private static final Integer NATIVE_PORT = 8463;
protected static String HA_HOST;
protected static int HA_PORT;

@Container
public static ClickHouseContainer containerHA = (ClickHouseContainer) new ClickHouseContainer(TIMEPLUS_IMAGE)
public static TimeplusContainer containerHA = new TimeplusContainer(AbstractITest.TIMEPLUS_IMAGE)
.withEnv("CLICKHOUSE_USER", TIMEPLUS_USER)
.withEnv("CLICKHOUSE_PASSWORD", TIMEPLUS_PASSWORD)
.withEnv("CLICKHOUSE_DB", TIMEPLUS_DB);
Expand All @@ -49,9 +49,9 @@ public void reset() throws SQLException {
container.start();
containerHA.start();

TP_PORT = container.getMappedPort(ClickHouseContainer.NATIVE_PORT);
TP_PORT = container.getMappedPort(NATIVE_PORT);
HA_HOST = containerHA.getHost();
HA_PORT = containerHA.getMappedPort(ClickHouseContainer.NATIVE_PORT);
HA_PORT = containerHA.getMappedPort(NATIVE_PORT);
LOG.info("Port1 {}, Port2 {}", TP_PORT, HA_PORT);
}

Expand All @@ -67,7 +67,7 @@ public void testClickhouseDownBeforeConnect() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -85,7 +85,7 @@ public void testClickhouseDownBeforeStatement() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -103,7 +103,7 @@ public void testClickhouseDownBeforePrepareStatement() throws Exception {
ResultSet rs = stmt.executeQuery();

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -121,7 +121,7 @@ public void testClickhouseDownBeforeExecute() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void successfullyUnsignedDataType() throws Exception {
statement.executeQuery("DROP STREAM IF EXISTS test");
statement.executeQuery("CREATE STREAM test(i8 uint8, i16 uint16, i32 uint32, i64 uint64)ENGINE=Memory");

String insertSQL = "INSERT INTO test(i8, i16, i32) VALUES(" + ((1 << 8) - 1) +
String insertSQL = "INSERT INTO test(i8, i16, i32, i64) VALUES(" + ((1 << 8) - 1) +
"," + ((1 << 16) - 1) +
",4294967295,-9223372036854775808)";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package com.timeplus.jdbc;

import com.timeplus.jdbc.tool.LocalKeyStoreConfig;
import com.timeplus.settings.KeyStoreConfig;
// import com.timeplus.jdbc.tool.LocalKeyStoreConfig;
// import com.timeplus.settings.KeyStoreConfig;
import org.junit.jupiter.api.Test;

import java.sql.ResultSet;
Expand All @@ -36,29 +36,29 @@ void ping() throws Exception {

@Test
void pingWithSecureConnection() throws Exception {
withNewConnection(connection -> {
withStatement(connection, stmt -> {
ResultSet resultSet = stmt.executeQuery("SELECT 1");
assertTrue(resultSet.next());
});
}, "ssl", "true",
"ssl_mode", "disabled");
// withNewConnection(connection -> {
// withStatement(connection, stmt -> {
// ResultSet resultSet = stmt.executeQuery("SELECT 1");
// assertTrue(resultSet.next());
// });
// }, "ssl", "true",
// "ssl_mode", "disabled");
}

@Test
void pingWithSecureConnectionAndVerification() throws Exception {
KeyStoreConfig keyStoreConfig = new LocalKeyStoreConfig();
//KeyStoreConfig keyStoreConfig = new LocalKeyStoreConfig();

withNewConnection(connection -> {
withStatement(connection, stmt -> {
ResultSet resultSet = stmt.executeQuery("SELECT 1");
assertTrue(resultSet.next());
});
}, "ssl", "true",
"ssl_mode", "verify_ca",
"key_store_type", keyStoreConfig.getKeyStoreType(),
"key_store_path", keyStoreConfig.getKeyStorePath(),
"key_store_password", keyStoreConfig.getKeyStorePassword());
// withNewConnection(connection -> {
// withStatement(connection, stmt -> {
// ResultSet resultSet = stmt.executeQuery("SELECT 1");
// assertTrue(resultSet.next());
// });
// }, "ssl", "true",
// "ssl_mode", "verify_ca",
// "key_store_type", keyStoreConfig.getKeyStoreType(),
// "key_store_path", keyStoreConfig.getKeyStorePath(),
// "key_store_password", keyStoreConfig.getKeyStorePassword());
}

@Test
Expand Down
Loading

0 comments on commit 9f7401c

Please sign in to comment.