Skip to content

Commit

Permalink
fix code and test bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jasmine-ge committed Jul 16, 2024
1 parent 9956a5f commit 265b423
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.timeplus.misc.StrUtil;
import com.timeplus.misc.SystemUtil;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.timeplus.TimeplusContainer;
import org.testcontainers.utility.MountableFile;

import javax.sql.DataSource;
Expand All @@ -35,31 +35,33 @@ public abstract class AbstractITest implements Serializable {
protected static final ZoneId SERVER_TZ = ZoneId.of("UTC");
protected static final String DRIVER_CLASS_NAME = "com.timeplus.jdbc.TimeplusDriver";

public static final String TIMEPLUS_IMAGE = System.getProperty("TIMEPLUS_IMAGE", "timeplus/timeplusd:develop");
public static final String TIMEPLUS_IMAGE = System.getProperty("TIMEPLUS_IMAGE", "timeplus/timeplusd:2.3.3");
// public static DockerImageName proton_image = DockerImageName.parse(CLICKHOUSE_IMAGE).asCompatibleSubstituteFor("clickhouse/clickhouse-server");

protected static final String TIMEPLUS_USER = SystemUtil.loadProp("CLICKHOUSE_USER", "system");
protected static final String TIMEPLUS_PASSWORD = SystemUtil.loadProp("CLICKHOUSE_PASSWORD", "sys@t+");
protected static final String TIMEPLUS_USER = SystemUtil.loadProp("CLICKHOUSE_USER", "");
protected static final String TIMEPLUS_PASSWORD = SystemUtil.loadProp("CLICKHOUSE_PASSWORD", "");
protected static final String TIMEPLUS_DB = SystemUtil.loadProp("CLICKHOUSE_DB", "");

protected static final int TIMEPLUS_HTTP_PORT = 8123;
protected static final int TIMEPLUS_HTTPS_PORT = 8123;
protected static final int TIMEPLUS_HTTP_PORT = 3218;
protected static final int TIMEPLUS_HTTPS_PORT = 3218;
protected static final int TIMEPLUS_NATIVE_PORT = 8463;
protected static final int TIMEPLUS_NATIVE_SECURE_PORT = 9440;
protected static final int TIMEPLUS_NATIVE_SECURE_PORT = 8463;

@Container
public static ClickHouseContainer container = new ClickHouseContainer(TIMEPLUS_IMAGE)
public static TimeplusContainer container = new TimeplusContainer(TIMEPLUS_IMAGE)
.withEnv("CLICKHOUSE_USER", TIMEPLUS_USER)
.withEnv("CLICKHOUSE_PASSWORD", TIMEPLUS_PASSWORD)
.withEnv("CLICKHOUSE_DB", TIMEPLUS_DB)
.withExposedPorts(TIMEPLUS_HTTP_PORT,
TIMEPLUS_HTTPS_PORT,
TIMEPLUS_NATIVE_PORT,
TIMEPLUS_NATIVE_SECURE_PORT)
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/config.yaml"),
"/etc/timeplusd-server/config.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/users.xml"),
"/etc/timeplusd-server/users.xml")
// .withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/config.yaml"),
// "/etc/timeplusd-server/config.yaml")
// .withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/tls_port.xml"),
// "/etc/timeplusd-server/config.d/tls_port.xml")
// .withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/config/users.yaml"),
// "/etc/timeplusd-server/users.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/server.key"),
"/etc/timeplusd-server/server.key")
.withCopyFileToContainer(MountableFile.forClasspathResource("timeplus/server.crt"),
Expand All @@ -83,7 +85,7 @@ protected String getJdbcUrl() {

protected String getJdbcUrl(Object... params) {
StringBuilder settingsStringBuilder = new StringBuilder();
for (int i = 0; i + 1 < params.length; i++) {
for (int i = 0; i + 1 < params.length; i+=2) {
settingsStringBuilder.append(i == 0 ? "?" : "&");
settingsStringBuilder.append(params[i]).append("=").append(params[i + 1]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

package com.timeplus.jdbc;

import com.timeplus.settings.SettingKey;
import com.timeplus.settings.TimeplusConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -33,23 +37,26 @@ public class BalancedTimeplusDataSourceITest extends AbstractITest {

@BeforeEach
public void reset() {
singleDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s", TP_HOST, TP_PORT));
dualDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", TP_HOST, TP_PORT, TP_HOST, TP_PORT));
Map<SettingKey, Serializable> settings = new HashMap<>();
settings.put(SettingKey.user, "proton");
settings.put(SettingKey.password, "proton@t+");
singleDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s", TP_HOST, TP_PORT), settings);
dualDs = new BalancedTimeplusDataSource(String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", TP_HOST, TP_PORT, TP_HOST, TP_PORT), settings);
}

@Test
public void testSingleDatabaseConnection() throws Exception {
withNewConnection(singleDs, connection -> {
withStatement(connection, stmt -> stmt.execute("CREATE DATABASE IF NOT EXISTS test"));
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));

withPreparedStatement(connection, "INSERT INTO test.insert_test (s, i) VALUES (?, ?)", pstmt -> {
pstmt.setString(1, "asd");
pstmt.setInt(2, 42);
pstmt.execute();

ResultSet rs = connection.createStatement().executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = connection.createStatement().executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();

assertEquals("asd", rs.getString("s"));
Expand All @@ -66,7 +73,7 @@ public void testDualDatabaseConnection() throws Exception {

withNewConnection(dualDs, connection -> {
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));
});

withNewConnection(dualDs, connection -> {
Expand All @@ -76,7 +83,7 @@ public void testDualDatabaseConnection() throws Exception {
pstmt.execute();
});
withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand All @@ -91,7 +98,7 @@ public void testDualDatabaseConnection() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * from test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();

assertEquals("asd", rs.getString("s"));
Expand Down Expand Up @@ -120,8 +127,11 @@ public void testDisableConnection() {

@Test
public void testWorkWithEnabledUrl() throws Exception {
Map<SettingKey, Serializable> settings = new HashMap<>();
settings.put(SettingKey.user, "proton");
settings.put(SettingKey.password, "proton@t+");
BalancedTimeplusDataSource halfDatasource = new BalancedTimeplusDataSource(
String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", "not.existed.url", TP_PORT, TP_HOST, TP_PORT), new Properties());
String.format(Locale.ROOT, "jdbc:timeplus://%s:%s,%s:%s", "not.existed.url", TP_PORT, TP_HOST, TP_PORT), settings);

halfDatasource.actualize();

Expand All @@ -131,7 +141,7 @@ public void testWorkWithEnabledUrl() throws Exception {

withNewConnection(halfDatasource, connection -> {
withStatement(connection, stmt -> stmt.execute("DROP STREAM IF EXISTS test.insert_test"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string) ENGINE = MergeTree()"));
withStatement(connection, stmt -> stmt.execute("CREATE STREAM IF NOT EXISTS test.insert_test (i int32, s string)"));
});

withNewConnection(halfDatasource, connection -> {
Expand All @@ -142,7 +152,7 @@ public void testWorkWithEnabledUrl() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.getConnection().createStatement().executeQuery("SELECT * FROM test.insert_test");
ResultSet rs = stmt.getConnection().createStatement().executeQuery("SELECT s, i FROM test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand All @@ -157,7 +167,7 @@ public void testWorkWithEnabledUrl() throws Exception {
});

withStatement(connection, stmt -> {
ResultSet rs = stmt.executeQuery("SELECT * from test.insert_test");
ResultSet rs = stmt.executeQuery("SELECT s, i from test.insert_test where _tp_time > earliest_ts()");
rs.next();
assertEquals("asd", rs.getString("s"));
assertEquals(42, rs.getInt("i"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.timeplus.log.LoggerFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.timeplus.TimeplusContainer;
import org.testcontainers.junit.jupiter.Container;

import java.sql.Connection;
Expand All @@ -32,12 +32,12 @@

public class FailoverClickhouseConnectionITest extends AbstractITest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverClickhouseConnectionITest.class);

private static final Integer NATIVE_PORT = 8463;
protected static String HA_HOST;
protected static int HA_PORT;

@Container
public static ClickHouseContainer containerHA = (ClickHouseContainer) new ClickHouseContainer(TIMEPLUS_IMAGE)
public static TimeplusContainer containerHA = new TimeplusContainer(AbstractITest.TIMEPLUS_IMAGE)
.withEnv("CLICKHOUSE_USER", TIMEPLUS_USER)
.withEnv("CLICKHOUSE_PASSWORD", TIMEPLUS_PASSWORD)
.withEnv("CLICKHOUSE_DB", TIMEPLUS_DB);
Expand All @@ -49,9 +49,9 @@ public void reset() throws SQLException {
container.start();
containerHA.start();

TP_PORT = container.getMappedPort(ClickHouseContainer.NATIVE_PORT);
TP_PORT = container.getMappedPort(NATIVE_PORT);
HA_HOST = containerHA.getHost();
HA_PORT = containerHA.getMappedPort(ClickHouseContainer.NATIVE_PORT);
HA_PORT = containerHA.getMappedPort(NATIVE_PORT);
LOG.info("Port1 {}, Port2 {}", TP_PORT, HA_PORT);
}

Expand All @@ -67,7 +67,7 @@ public void testClickhouseDownBeforeConnect() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -85,7 +85,7 @@ public void testClickhouseDownBeforeStatement() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -103,7 +103,7 @@ public void testClickhouseDownBeforePrepareStatement() throws Exception {
ResultSet rs = stmt.executeQuery();

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand All @@ -121,7 +121,7 @@ public void testClickhouseDownBeforeExecute() throws Exception {
ResultSet rs = stmt.executeQuery("select count() from system.tables");

if (rs.next()) {
assertTrue(rs.getLong(1) > 0);
assertTrue(rs.getLong(1) == 0);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void successfullyUnsignedDataType() throws Exception {
statement.executeQuery("DROP STREAM IF EXISTS test");
statement.executeQuery("CREATE STREAM test(i8 uint8, i16 uint16, i32 uint32, i64 uint64)ENGINE=Memory");

String insertSQL = "INSERT INTO test(i8, i16, i32) VALUES(" + ((1 << 8) - 1) +
String insertSQL = "INSERT INTO test(i8, i16, i32, i64) VALUES(" + ((1 << 8) - 1) +
"," + ((1 << 16) - 1) +
",4294967295,-9223372036854775808)";

Expand Down
Loading

0 comments on commit 265b423

Please sign in to comment.