Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): allow decimal to rw_int256 and varchar #16346

Merged
merged 9 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions e2e_test/source/cdc/cdc.check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,26 @@ query I
select count(*) from tt3_rw;
----
2

query II
select * from numeric_to_varchar order by id;
----
1 3.14
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
6 NAN
7 POSITIVE_INFINITY

# The invalid data for rw_int256 is converted to NULL
query II
select * from numeric_to_rw_int256 order by id;
----
1 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
37 changes: 36 additions & 1 deletion e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,39 @@ SELECT * FROM rw.products_test order by id limit 3
query TTTT
select order_date,customer_name,product_id,order_status from orders_no_backfill order by order_id;
----
2022-12-01 15:08:22 Sam 110 0
2022-12-01 15:08:22 Sam 110 0

query II
select * from numeric_to_varchar order by id;
----
1 3.14
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
6 NAN
7 POSITIVE_INFINITY
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 57896044618658097711785492504343953926634992332820282019728792003956564819968
104 115792089237316195423570985008687907853269984665640564039457584007913129639936
105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
106 NAN
Comment on lines +96 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supposed to be NaN and Infinity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Trying to resolve this issue. If it's too non-trivial, I'll open another PR to fix it. Besides, I also find that in our shared source cdc, the conversion from pg-numeric to rw-numeric will also lose Infinity and -Infinity. Will fix it soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🫠Too complicated to fix, as it requires us to know the types in upstream tables when parsing the data in JSON payload. Otherwise, we cannot distinguish whether a string is converted from a numeric or a real varchar.

So in this PR, when mapping pg-numeric to varchar, NaN will be NAN, inf will be POSITIVE_INFINITY , and -inf will be NEGATIVE_INFINITY . At least these behavior is aligned among backfiling and normal data updates. Do you think that's acceptable?

For the bug in the conversion from pg-numeric to rw-numeric, I'll create a PR later to resolve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a issue here to record.

107 POSITIVE_INFINITY

# The invalid data for rw_int256 is converted to NULL
query II
select * from numeric_to_rw_int256 order by id;
----
1 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 NULL
104 NULL
105 NULL
106 NULL
107 NULL
33 changes: 33 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,36 @@ create table person_rw (
publication.name='dumb_publicaton',
publication.create.enable='false'
);

statement ok
create table numeric_to_rw_int256 (
id int,
num rw_int256,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
table.name = 'numeric_table',
slot.name = 'numeric_to_rw_int256'
);

statement ok
create table numeric_to_varchar (
id int,
num varchar,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_varchar'
);
95 changes: 95 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,98 @@ SELECT * from person_new order by id;
1100 noris [email protected] 1864 2539 enne
1101 white [email protected] 8157 6974 se
1102 spencer [email protected] 9481 6270 angeles

statement ok
CREATE TABLE numeric_to_rw_int256_shared (
id int,
num rw_int256,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';

statement ok
CREATE TABLE numeric_to_varchar_shared (
id int,
num varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_table';

statement ok
CREATE TABLE numeric_list_to_rw_int256_list_shared (
id int,
num rw_int256[],
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_list';

statement ok
CREATE TABLE numeric_list_to_varchar_list_shared (
id int,
num varchar[],
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.numeric_list';


system ok
psql -c "
insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
insert into numeric_table values(106, 'NaN'::numeric);
insert into numeric_table values(107, 'Infinity'::numeric);
"

sleep 3s

query II
select * from numeric_to_varchar_shared order by id;
----
1 3.14
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 57896044618658097711785492504343953926634992332820282019728792003956564819968
4 115792089237316195423570985008687907853269984665640564039457584007913129639936
5 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
6 NAN
7 POSITIVE_INFINITY
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 57896044618658097711785492504343953926634992332820282019728792003956564819968
104 115792089237316195423570985008687907853269984665640564039457584007913129639936
105 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555
106 NAN
107 POSITIVE_INFINITY

# The invalid data for rw_int256 is converted to NULL
query II
select * from numeric_to_rw_int256_shared order by id;
----
1 NULL
2 57896044618658097711785492504343953926634992332820282019728792003956564819967
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
102 57896044618658097711785492504343953926634992332820282019728792003956564819967
103 NULL
104 NULL
105 NULL
106 NULL
107 NULL

system ok
psql -c "
DELETE FROM numeric_table WHERE id IN (102, 103, 104, 105, 106, 107);
"

query II
select * from numeric_list_to_varchar_list_shared order by id;
----
1 {3.14,6,57896044618658097711785492504343953926634992332820282019728792003956564819967,57896044618658097711785492504343953926634992332820282019728792003956564819968,115792089237316195423570985008687907853269984665640564039457584007913129639936.555555}
2 {NAN,POSITIVE_INFINITY,NEGATIVE_INFINITY}

query II
select * from numeric_list_to_rw_int256_list_shared order by id;
----
1 {NULL,6,57896044618658097711785492504343953926634992332820282019728792003956564819967,NULL,NULL}
2 {NULL,NULL,NULL}
34 changes: 34 additions & 0 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,37 @@ create table shipments (
table.name = 'shipments',
slot.name = 'shipments'
) format canal encode csv;

statement ok
explain create table numeric_to_rw_int256 (
id int,
num rw_int256,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_rw_int256'
);

statement ok
explain create table numeric_to_varchar (
id int,
num varchar,
PRIMARY KEY (id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'numeric_table',
slot.name = 'numeric_to_varchar'
);
16 changes: 16 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,19 @@ CREATE TABLE IF NOT EXISTS postgres_all_types(
);
INSERT INTO postgres_all_types VALUES ( False, 0, 0, 0, 0, 0, 0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[]::boolean[], array[]::smallint[], array[]::integer[], array[]::bigint[], array[]::decimal[], array[]::real[], array[]::double precision[], array[]::varchar[], array[]::bytea[], array[]::date[], array[]::time[], array[]::timestamp[], array[]::timestamptz[], array[]::interval[], array[]::jsonb[], null);
INSERT INTO postgres_all_types VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, '', '\x00', '0001-01-01', '00:00:00', '0001-01-01 00:00:00'::timestamp, '0001-01-01 00:00:00'::timestamptz, interval '0 second', '{}', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['\x00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['0001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['0001-01-01 00:00:00'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], 'bb488f9b-330d-4012-b849-12adeb49e57e');

create table numeric_table(id int PRIMARY KEY, num numeric);
insert into numeric_table values(1, 3.14);
--- 2^255 - 1
insert into numeric_table values(2, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(3, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(4, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(5, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
insert into numeric_table values(6, 'NaN'::numeric);
insert into numeric_table values(7, 'Infinity'::numeric);

create table numeric_list(id int primary key, num numeric[]);
insert into numeric_list values(1, '{3.14, 6, 57896044618658097711785492504343953926634992332820282019728792003956564819967, 57896044618658097711785492504343953926634992332820282019728792003956564819968, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555}');
insert into numeric_list values(2, '{nan, infinity, -infinity}');
9 changes: 9 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ insert into abs.t1 values (2, 2.2, 'bbb', '1234.5431');
SELECT pg_current_wal_lsn();
select * from pg_publication_tables where pubname='rw_publication';
select * from public.person order by id;

insert into numeric_table values(102, 57896044618658097711785492504343953926634992332820282019728792003956564819967);
--- 2^255
insert into numeric_table values(103, 57896044618658097711785492504343953926634992332820282019728792003956564819968);
--- 2^256
insert into numeric_table values(104, 115792089237316195423570985008687907853269984665640564039457584007913129639936);
insert into numeric_table values(105, 115792089237316195423570985008687907853269984665640564039457584007913129639936.555555);
insert into numeric_table values(106, 'NaN'::numeric);
insert into numeric_table values(107, 'Infinity'::numeric);
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,11 @@ private boolean isDataTypeCompatible(String pgDataType, Data.DataType.TypeName t
return val == Data.DataType.TypeName.DOUBLE_VALUE;
case "decimal":
case "numeric":
return val == Data.DataType.TypeName.DECIMAL_VALUE;
return val == Data.DataType.TypeName.DECIMAL_VALUE
// We allow user to map numeric into rw_int256 or varchar to avoid precision
// loss in the conversion from pg-numeric to rw-numeric
|| val == Data.DataType.TypeName.INT256_VALUE
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
|| val == Data.DataType.TypeName.VARCHAR_VALUE;
case "varchar":
case "character varying":
return val == Data.DataType.TypeName.VARCHAR_VALUE;
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ opendal = "0.45"
openssl = "0.10"
parking_lot = { workspace = true }
paste = "1"
pg_bigdecimal = { git = "https://github.com/risingwavelabs/rust-pg_bigdecimal", rev = "0b7893d88894ca082b4525f94f812da034486f7c" }
postgres-openssl = "0.5.0"
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
Expand Down
Loading
Loading