Skip to content

Commit

Permalink
fix(jdbc-sink): fix deliver DELETE event for row with uuid as primary…
Browse files Browse the repository at this point in the history
… key (#16447)
  • Loading branch information
StrikeW authored Apr 25, 2024
1 parent c97c972 commit dfd5449
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 21 deletions.
6 changes: 6 additions & 0 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@ query IT
select * from t1_uuid;
----
221 74605c5a-a7bb-4b3b-8742-2a12e9709dea hello world


query T
select * from sk_t1_uuid
----
21189447-8736-44bd-b254-26b5dec91da9
20 changes: 20 additions & 0 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@ CREATE SINK s1_uuid FROM t1_uuid WITH (
statement ok
INSERT INTO t1_uuid values (221, '74605c5a-a7bb-4b3b-8742-2a12e9709dea', 'hello world');


statement ok
CREATE TABLE t1_test_uuid_delete (id varchar, primary key(id));

statement ok
INSERT INTO t1_test_uuid_delete VALUES ('fb48ecc1-917f-4f4b-ab6d-d8e37809caf8'), ('21189447-8736-44bd-b254-26b5dec91da9');

statement ok
CREATE SINK sk_t1_uuid FROM t1_test_uuid_delete WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='sk_t1_uuid',
primary_key='id',
type='upsert'
);

statement ok
DELETE FROM t1_test_uuid_delete WHERE ID='fb48ecc1-917f-4f4b-ab6d-d8e37809caf8';


statement ok
INSERT INTO tt2 VALUES
(1),
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,5 @@ CREATE TABLE biz.t_types (
CREATE TABLE biz.t2 (
"aBc" INTEGER PRIMARY KEY
);

CREATE TABLE sk_t1_uuid (id uuid, primary key(id));
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.grpc.Status;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,20 +59,27 @@ public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
// column name -> java.sql.Types
Map<String, Integer> columnTypeMapping =
getColumnTypeMapping(conn, config.getTableName(), config.getSchemaName());
// create an array that each slot corresponding to each column in TableSchema
var columnSqlTypes = new int[tableSchema.getNumColumns()];
for (int columnIdx = 0; columnIdx < tableSchema.getNumColumns(); columnIdx++) {
var columnName = tableSchema.getColumnNames()[columnIdx];
columnSqlTypes[columnIdx] = columnTypeMapping.get(columnName);
}

// A vector of upstream column types
List<Integer> columnSqlTypes =
Arrays.stream(tableSchema.getColumnNames())
.map(columnTypeMapping::get)
.collect(Collectors.toList());

List<Integer> pkIndices =
tableSchema.getPrimaryKeys().stream()
.map(tableSchema::getColumnIndex)
.collect(Collectors.toList());

LOG.info(
"schema = {}, table = {}: columnSqlTypes = {}",
"schema = {}, table = {}, columnSqlTypes = {}, pkIndices = {}",
config.getSchemaName(),
config.getTableName(),
Arrays.toString(columnSqlTypes));
columnSqlTypes,
pkIndices);

if (factory.isPresent()) {
this.jdbcDialect = factory.get().create(columnSqlTypes);
this.jdbcDialect = factory.get().create(columnSqlTypes, pkIndices);
} else {
throw Status.INVALID_ARGUMENT
.withDescription("Unsupported jdbc url: " + jdbcUrl)
Expand Down Expand Up @@ -303,11 +311,7 @@ public void prepareDelete(SinkRow row) {
.asRuntimeException();
}
try {
int placeholderIdx = 1;
for (String primaryKey : pkColumnNames) {
Object fromRow = tableSchema.getFromRow(primaryKey, row);
deleteStatement.setObject(placeholderIdx++, fromRow);
}
jdbcDialect.bindDeleteStatement(deleteStatement, row);
deleteStatement.addBatch();
} catch (SQLException e) {
throw Status.INTERNAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ void bindUpsertStatement(
void bindInsertIntoStatement(
PreparedStatement stmt, Connection conn, TableSchema tableSchema, SinkRow row)
throws SQLException;

/** Bind the values of primary key fields to the {@code DELETE} statement. */
void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public interface JdbcDialectFactory {

JdbcDialect create(int[] columnSqlTypes);
JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@

public class MySqlDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public MySqlDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

@Override
public SchemaTableName createSchemaTableName(String schemaName, String tableName) {
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -100,4 +116,14 @@ public void bindInsertIntoStatement(
}
}
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public class MySqlDialectFactory implements JdbcDialectFactory {

@Override
public JdbcDialect create(int[] columnSqlTypes) {
return new MySqlDialect();
public JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
return new MySqlDialect(columnSqlTypes, pkIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,19 @@
public class PostgresDialect implements JdbcDialect {

private final int[] columnSqlTypes;
private final int[] pkIndices;
private final int[] pkColumnSqlTypes;

public PostgresDialect(int[] columnSqlTypes) {
this.columnSqlTypes = columnSqlTypes;
public PostgresDialect(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
this.columnSqlTypes = columnSqlTypes.stream().mapToInt(i -> i).toArray();
this.pkIndices = pkIndices.stream().mapToInt(i -> i).toArray();

// derive sql types for pk columns
var pkColumnSqlTypes = new int[pkIndices.size()];
for (int i = 0; i < pkIndices.size(); i++) {
pkColumnSqlTypes[i] = this.columnSqlTypes[this.pkIndices[i]];
}
this.pkColumnSqlTypes = pkColumnSqlTypes;
}

private static final HashMap<TypeName, String> RW_TYPE_TO_JDBC_TYPE_NAME;
Expand Down Expand Up @@ -154,4 +164,14 @@ public void bindInsertIntoStatement(
}
}
}

@Override
public void bindDeleteStatement(PreparedStatement stmt, SinkRow row) throws SQLException {
// set the values of primary key fields
int placeholderIdx = 1;
for (int idx : pkIndices) {
Object pkField = row.get(idx);
stmt.setObject(placeholderIdx++, pkField, pkColumnSqlTypes[idx]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

package com.risingwave.connector.jdbc;

import java.util.List;

public class PostgresDialectFactory implements JdbcDialectFactory {

@Override
public JdbcDialect create(int[] columnSqlTypes) {
return new PostgresDialect(columnSqlTypes);
public JdbcDialect create(List<Integer> columnSqlTypes, List<Integer> pkIndices) {
return new PostgresDialect(columnSqlTypes, pkIndices);
}
}

0 comments on commit dfd5449

Please sign in to comment.