From f91b74aef3e252d173921d81dd045380bd301ea6 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 25 Nov 2024 14:59:46 +0800 Subject: [PATCH 1/3] flaky test --- .../pipelines/builders/builder_mutation.rs | 25 ++++++++++++++++--- .../09_0011_change_tracking.test | 6 ++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_mutation.rs b/src/query/service/src/pipelines/builders/builder_mutation.rs index 18b64ffe59f0a..6535c798a074c 100644 --- a/src/query/service/src/pipelines/builders/builder_mutation.rs +++ b/src/query/service/src/pipelines/builders/builder_mutation.rs @@ -20,6 +20,7 @@ use databend_common_exception::Result; use databend_common_expression::BlockThresholds; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; +use databend_common_pipeline_core::processors::create_resize_item; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -237,8 +238,18 @@ impl PipelineBuilder { transform_len: usize, block_thresholds: BlockThresholds, ) -> Result<()> { - // we should avoid too much little block write, because for s3 write, there are too many - // little blocks, it will cause high latency. + let mut origin_len = transform_len; + let mut resize_len = 1; + let mut pipe_items = Vec::with_capacity(2); + if need_match { + origin_len += 1; + resize_len += 1; + pipe_items.push(create_dummy_item()); + } + pipe_items.push(create_resize_item(transform_len, 1)); + self.main_pipeline + .add_pipe(Pipe::create(origin_len, resize_len, pipe_items)); + let mut builder = self.main_pipeline.add_transform_with_specified_len( |transform_input_port, transform_output_port| { Ok(ProcessorPtr::create(AccumulatingTransformer::create( @@ -247,13 +258,21 @@ impl PipelineBuilder { BlockCompactBuilder::new(block_thresholds), ))) }, - transform_len, + 1, )?; if need_match { builder.add_items_prepend(vec![create_dummy_item()]); } self.main_pipeline.add_pipe(builder.finalize()); + let mut pipe_items = Vec::with_capacity(2); + if need_match { + pipe_items.push(create_dummy_item()); + } + pipe_items.push(create_resize_item(1, transform_len)); + self.main_pipeline + .add_pipe(Pipe::create(resize_len, origin_len, pipe_items)); + let mut builder = self.main_pipeline.add_transform_with_specified_len( |transform_input_port, transform_output_port| { Ok(ProcessorPtr::create(BlockMetaTransformer::create( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test index 5d1c807d4c0ae..ad40ab40c9476 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test @@ -97,7 +97,7 @@ statement ok set enable_experimental_merge_into = 1 query TTT -settings (max_threads = 8) merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert * +merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert * ---- 1 1 1 @@ -109,7 +109,7 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 0 0 +8 0 0 1 0 statement ok create table t1(a int) change_tracking = true @@ -136,7 +136,7 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 0 0 +8 0 0 1 0 ############### # issue 14955 # From 8e7ddda7a44fae3128a1cd947ebfe4c03377f1c2 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 25 Nov 2024 15:07:47 +0800 Subject: [PATCH 2/3] fix --- src/query/service/src/pipelines/builders/builder_mutation.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/query/service/src/pipelines/builders/builder_mutation.rs b/src/query/service/src/pipelines/builders/builder_mutation.rs index 6535c798a074c..3cd64c367fc5f 100644 --- a/src/query/service/src/pipelines/builders/builder_mutation.rs +++ b/src/query/service/src/pipelines/builders/builder_mutation.rs @@ -238,6 +238,8 @@ impl PipelineBuilder { transform_len: usize, block_thresholds: BlockThresholds, ) -> Result<()> { + // we should avoid too much little block write, because for s3 write, there are too many + // little blocks, it will cause high latency. let mut origin_len = transform_len; let mut resize_len = 1; let mut pipe_items = Vec::with_capacity(2); From 38cbf8eeaf8c12c94c7b3c23435fcb994bb55835 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 25 Nov 2024 17:03:59 +0800 Subject: [PATCH 3/3] fix test --- .../base/09_fuse_engine/09_0011_change_tracking.test | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test index ad40ab40c9476..6025b15c33f43 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test @@ -91,25 +91,25 @@ statement ok create table t2(a int) statement ok -insert into t2 values(1),(2),(8) +insert into t2 values(0),(2),(1) statement ok set enable_experimental_merge_into = 1 query TTT -merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 0 when matched and t2.a = 2 then delete when not matched then insert * +merge into t using t2 on t.a = t2.a when matched and t2.a = 1 then update set t.a = 8 when matched and t2.a = 2 then delete when not matched then insert * ---- 1 1 1 query IBBII select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a ---- -0 0 0 0 1 +0 0 0 0 0 3 0 0 1 1 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 1 0 +8 0 0 0 1 statement ok create table t1(a int) change_tracking = true @@ -131,12 +131,12 @@ merge into t using t1 on t.a = t1.a when matched and t1.a = 0 then update set t. query IBBII select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a ---- -1 0 0 0 2 +1 0 0 0 1 2 0 0 1 2 5 0 0 0 0 6 0 0 0 0 7 0 0 1 0 -8 0 0 1 0 +8 0 0 0 1 ############### # issue 14955 #