Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine replace into pruning for table with cluster keys #12147

Merged
merged 12 commits into from
Jul 24, 2023

Conversation

dantengsky
Copy link
Member

@dantengsky dantengsky commented Jul 19, 2023

I hereby agree to the terms of the CLA available at: https://databend.rs/dev/policies/cla/

Summary

  • enable table level pruning

    rows of input data blocks that definitely not conflicts with table data, will be filtered out from being checked for conflicts.

  • partition input data by the leftmost cluster key

    if the table defines cluster keys, the input data (after table-level pruning) will be partitioned by the left-most cluster key expression, in the later phase, tighter bounds will be used in the range prunings. hopefully, fewer data blocks will need to be loaded.

  • new setting enable_replace_into_partitioning

    set it to 0 will disable the partitioning of input data, in the execution of the replace-into statement, for the table has cluster keys.

  • new metrics

    replace_into_original_row_number
    replace_into_row_number_after_table_level_pruning
    replace_into_partition_number

this PR has also been tested using https://github.com/dantengsky/rr (ec2 + gp2)

demo scenario

  • tables
create table t (
id bigint,
ts datetime,
c string
) CLUSTER BY(to_yyyymmdd(ts), id) row_per_block = 100;


create table s like t engine=random;

  • demo run

mysql> insert into t select * from s  limit (1000);
Query OK, 1000 rows affected (0.39 sec)

mysql> optimize table t compact;
Query OK, 1000 rows affected (0.16 sec)

mysql> alter table t recluster final;
Query OK, 0 rows affected (0.09 sec)

mysql> select min(ts), max(ts), count(), _block_name from t group by _block_name ;
+----------------------------+----------------------------+---------+----------------------------------------------------------+
| min(ts)                    | max(ts)                    | count() | _block_name                                              |
+----------------------------+----------------------------+---------+----------------------------------------------------------+
| 4606-09-17 10:25:52.954931 | 5529-07-12 07:42:09.731885 |     100 | 7057/7064/_b/d46a3fb3b12e4f4cb435e11895456e10_v2.parquet |
| 7435-12-10 11:05:35.203458 | 8286-06-19 09:51:13.565204 |     100 | 7057/7064/_b/543d1059c8264a10aa100afce5d56ae6_v2.parquet |
| 1004-06-24 19:52:10.887161 | 1868-04-27 03:05:01.070040 |     100 | 7057/7064/_b/6382a301f557442db08e52dca28b0a5c_v2.parquet |
| 1886-11-22 23:43:43.709034 | 2818-01-29 12:59:45.586043 |     100 | 7057/7064/_b/218b754f20ad489ba7deda2d46757dd0_v2.parquet |
| 9136-09-08 07:12:06.123010 | 9980-09-07 13:13:05.532933 |      99 | 7057/7064/_b/fdc853b8b6a94c11b9ef720097d3c19a_v2.parquet |
| 3739-07-10 03:41:10.556019 | 4600-09-09 17:12:48.120085 |     100 | 7057/7064/_b/ec9db108afd04cc3b641924ece71fa61_v2.parquet |
| 6451-10-10 04:26:38.348252 | 7435-03-24 10:12:36.646625 |     101 | 7057/7064/_b/7455b630c5834643afc8d7a77edc4a36_v2.parquet |
| 2824-02-06 07:06:10.343722 | 3730-08-18 15:52:21.451360 |     100 | 7057/7064/_b/792225ae5c2247aba2d57ea658e792e7_v2.parquet |
| 8295-10-27 01:31:45.521671 | 9116-10-01 17:07:37.439840 |     100 | 7057/7064/_b/a31e6dacb8e8405c92894042c0a1d2c9_v2.parquet |
| 5536-03-11 17:58:05.001055 | 6434-02-18 03:50:30.843138 |     100 | 7057/7064/_b/591d233c1d0c4d678acd7a0191a7fe08_v2.parquet |
+----------------------------+----------------------------+---------+----------------------------------------------------------+
10 rows in set (0.05 sec)
Read 1000 rows, 7.81 KiB in 0.016 sec., 64.15 thousand rows/sec., 501.17 KiB/sec.

mysql> truncate table system.metrics;
Query OK, 0 rows affected (0.03 sec)

mysql >  -- update the whole block which contains 101 rows
mysql> replace into t on(ts, id) (select id, ts, 'n' as c from t where _block_name = '7057/7064/_b/7455b630c5834643afc8d7a77edc4a36_v2.parquet'); 
Query OK, 202 rows affected (0.19 sec)

mysql> select * from system.metrics where metric like '%replace%' order by metric;
+------------------------+--------------------------------------------------------+-------+--------+-------+
| node                   | metric                                                 | kind  | labels | value |
+------------------------+--------------------------------------------------------+-------+--------+-------+
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_number_after_pruning           | gauge | {}     | 1.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_of_zero_row_deleted            | gauge | {}     | 0.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_original_row_number                  | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_partition_number                     | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_after_pruning             | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_after_table_level_pruning | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_whole_block_deletion                 | gauge | {}     | 1.0   |
+------------------------+--------------------------------------------------------+-------+--------+-------+
7 rows in set (0.03 sec)
Read 189 rows, 27.26 KiB in 0.019 sec., 9.98 thousand rows/sec., 1.41 MiB/sec.

mysql> -- `fuse_replace_into_block_number_after_pruning = 1` indicates that only one block left after pruning

mysql> select min(ts), max(ts), count(), _block_name from t group by _block_name ;
+----------------------------+----------------------------+---------+----------------------------------------------------------+
| min(ts)                    | max(ts)                    | count() | _block_name                                              |
+----------------------------+----------------------------+---------+----------------------------------------------------------+
| 7435-12-10 11:05:35.203458 | 8286-06-19 09:51:13.565204 |     100 | 7057/7064/_b/543d1059c8264a10aa100afce5d56ae6_v2.parquet |
| 4606-09-17 10:25:52.954931 | 5529-07-12 07:42:09.731885 |     100 | 7057/7064/_b/d46a3fb3b12e4f4cb435e11895456e10_v2.parquet |
| 1004-06-24 19:52:10.887161 | 1868-04-27 03:05:01.070040 |     100 | 7057/7064/_b/6382a301f557442db08e52dca28b0a5c_v2.parquet |
| 6451-10-10 04:26:38.348252 | 7435-03-24 10:12:36.646625 |     101 | 7057/7064/_b/1f3f2c9bef5d4dcc8496ee59404a1f0a_v2.parquet |
| 1886-11-22 23:43:43.709034 | 2818-01-29 12:59:45.586043 |     100 | 7057/7064/_b/218b754f20ad489ba7deda2d46757dd0_v2.parquet |
| 9136-09-08 07:12:06.123010 | 9980-09-07 13:13:05.532933 |      99 | 7057/7064/_b/fdc853b8b6a94c11b9ef720097d3c19a_v2.parquet |
| 3739-07-10 03:41:10.556019 | 4600-09-09 17:12:48.120085 |     100 | 7057/7064/_b/ec9db108afd04cc3b641924ece71fa61_v2.parquet |
| 2824-02-06 07:06:10.343722 | 3730-08-18 15:52:21.451360 |     100 | 7057/7064/_b/792225ae5c2247aba2d57ea658e792e7_v2.parquet |
| 8295-10-27 01:31:45.521671 | 9116-10-01 17:07:37.439840 |     100 | 7057/7064/_b/a31e6dacb8e8405c92894042c0a1d2c9_v2.parquet |
| 5536-03-11 17:58:05.001055 | 6434-02-18 03:50:30.843138 |     100 | 7057/7064/_b/591d233c1d0c4d678acd7a0191a7fe08_v2.parquet |
+----------------------------+----------------------------+---------+----------------------------------------------------------+
10 rows in set (0.04 sec)
Read 1000 rows, 7.81 KiB in 0.016 sec., 63.97 thousand rows/sec., 499.75 KiB/sec.

mysql> truncate table system.metrics;
Query OK, 0 rows affected (0.02 sec)

mysql> -- partially update a block (10/101)
mysql> replace into t on(ts, id) (select id, ts, 'n' as c from t where _block_name = '7057/7064/_b/1f3f2c9bef5d4dcc8496ee59404a1f0a_v2.parquet' limit 10);
Query OK, 121 rows affected (0.43 sec)

mysql> select * from system.metrics where metric like '%replace%' order by metric;
+------------------------+--------------------------------------------------------+-------+--------+-------+
| node                   | metric                                                 | kind  | labels | value |
+------------------------+--------------------------------------------------------+-------+--------+-------+
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_number_after_pruning           | gauge | {}     | 1.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_number_totally_loaded          | gauge | {}     | 1.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_number_write                   | gauge | {}     | 1.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_block_of_zero_row_deleted            | gauge | {}     | 0.0   |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_original_row_number                  | gauge | {}     | 10.0  |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_partition_number                     | gauge | {}     | 10.0  |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_after_pruning             | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_after_table_level_pruning | gauge | {}     | 10.0  |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_totally_loaded            | gauge | {}     | 101.0 |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_row_number_write                     | gauge | {}     | 91.0  |
| 9qwWbY2IK3HvLcP5yxo6o7 | fuse_replace_into_whole_block_deletion                 | gauge | {}     | 0.0   |
+------------------------+--------------------------------------------------------+-------+--------+-------+
11 rows in set (0.03 sec)
Read 196 rows, 28.01 KiB in 0.018 sec., 10.91 thousand rows/sec., 1.52 MiB/sec.

mysql> -- `fuse_replace_into_block_number_after_pruning = 1` indicates one block left after pruning
mysql> -- `fuse_replace_into_block_number_write = 1` since one block is partially updated
mysql> -- `fuse_replace_into_row_number_write = 91` (101 -10) rows are written into a new block

@SkyFan2002

hope this PR will not conflict too much with #12119, if anything needs to be adjusted, please let me know.

  • Closes #issue

@vercel
Copy link

vercel bot commented Jul 19, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
databend ⬜️ Ignored (Inspect) Visit Preview Jul 24, 2023 8:11am

@dantengsky dantengsky changed the title refactor: refine replace into pruning refactor: refine replace into pruning for table with cluster keys Jul 24, 2023
@dantengsky dantengsky marked this pull request as ready for review July 24, 2023 05:36
@dantengsky dantengsky requested a review from zhyass July 24, 2023 05:36
@github-actions github-actions bot added the pr-refactor this PR changes the code base without new features or bugfix label Jul 24, 2023
@dantengsky dantengsky requested a review from SkyFan2002 July 24, 2023 05:37
@BohuTANG BohuTANG merged commit fd255f8 into databendlabs:main Jul 24, 2023
Copy link
Member

@zhyass zhyass left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

SkyFan2002 added a commit to SkyFan2002/databend that referenced this pull request Jul 26, 2023
BohuTANG pushed a commit that referenced this pull request Aug 10, 2023
* refactor copy into

* fix panic

* fix

* fix

* fix

* make lint

* fix logic error

* replace into values

* fix

* fix

* fix render result

* fix schema cast

* temp

* respect #12147

* respect #12100

* make lint

* respect #12130

* fix merge

* add exchange

* fix conflict

* fix schema cast

* fix conlfict

* fix

* fix copy plan

* clear log

* fix copy

* fix copy

* run ci

* fix purge

* make lint

* add exchange

* disable dist for value source

* adjust exchange

* remove top exchange

* adjust replace into

* reshuffle

* fix

* fix reshuffle

* move segment_partition_num

* resolve conflicts

* add need insert flag

* unbranched_replace_into_processor

* merge only pipeline

* fix segment index

* fix conflict

* remove log

* fix empty table

* fix stateful test

* fix stateful test

* modify test

* fix typo

* fix random source

* add setting

* remove empty file

* remove dead code

* add default setting

* Update src/query/service/src/interpreters/interpreter_replace.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* rename struct

* default 0

* regen golden file

* set enable_distributed_replace_into = 1 in slt

* make lint

---------

Co-authored-by: dantengsky <[email protected]>
Co-authored-by: JackTan25 <[email protected]>
andylokandy pushed a commit to andylokandy/databend that referenced this pull request Nov 27, 2023
…tabendlabs#12147)

* refactor: refine replace into pruning

* parition rows (WIP)

* partition by left most cluster key

* more metric

* add new setting enable_replace_into_partitioning

* refine merge_into_mutator

* only un-compact the segment info when necessary

* minor gc

* chore

* adjust metric

* fix typos
andylokandy pushed a commit to andylokandy/databend that referenced this pull request Nov 27, 2023
…2119)

* refactor copy into

* fix panic

* fix

* fix

* fix

* make lint

* fix logic error

* replace into values

* fix

* fix

* fix render result

* fix schema cast

* temp

* respect databendlabs#12147

* respect databendlabs#12100

* make lint

* respect databendlabs#12130

* fix merge

* add exchange

* fix conflict

* fix schema cast

* fix conlfict

* fix

* fix copy plan

* clear log

* fix copy

* fix copy

* run ci

* fix purge

* make lint

* add exchange

* disable dist for value source

* adjust exchange

* remove top exchange

* adjust replace into

* reshuffle

* fix

* fix reshuffle

* move segment_partition_num

* resolve conflicts

* add need insert flag

* unbranched_replace_into_processor

* merge only pipeline

* fix segment index

* fix conflict

* remove log

* fix empty table

* fix stateful test

* fix stateful test

* modify test

* fix typo

* fix random source

* add setting

* remove empty file

* remove dead code

* add default setting

* Update src/query/service/src/interpreters/interpreter_replace.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan.rs

Co-authored-by: dantengsky <[email protected]>

* Update src/query/sql/src/executor/physical_plan_display.rs

Co-authored-by: dantengsky <[email protected]>

* rename struct

* default 0

* regen golden file

* set enable_distributed_replace_into = 1 in slt

* make lint

---------

Co-authored-by: dantengsky <[email protected]>
Co-authored-by: JackTan25 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-refactor this PR changes the code base without new features or bugfix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants