From b16638aad7dfee7023dab122149e193a31920d12 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Mon, 27 Nov 2023 16:19:16 +0800 Subject: [PATCH 1/5] Add a new kind of transform: `TransformBlocking`. --- .../src/processors/transforms/mod.rs | 2 + .../transforms/transform_blocking.rs | 115 ++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs diff --git a/src/query/pipeline/transforms/src/processors/transforms/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/mod.rs index 81a6bf965ed55..2e6b3f9fabcb5 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/mod.rs @@ -19,6 +19,7 @@ mod transform_accumulating_async; mod transform_async; mod transform_block_compact; mod transform_block_compact_for_copy; +mod transform_blocking; mod transform_compact; mod transform_dummy; mod transform_multi_sort_merge; @@ -33,6 +34,7 @@ pub use transform_accumulating_async::*; pub use transform_async::*; pub use transform_block_compact::*; pub use transform_block_compact_for_copy::*; +pub use transform_blocking::*; pub use transform_compact::*; pub use transform_dummy::*; pub use transform_sort::*; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs new file mode 100644 index 0000000000000..2260ca76dfe1c --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs @@ -0,0 +1,115 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::DataBlock; +use common_pipeline_core::processors::Event; +use common_pipeline_core::processors::InputPort; +use common_pipeline_core::processors::OutputPort; +use common_pipeline_core::processors::Processor; + +pub trait BlockingTransform: Send { + const NAME: &'static str; + + fn consume(&mut self, block: DataBlock) -> Result<()>; + + fn transform(&mut self) -> Result>; +} + +/// A transform may be blocked on a certain input. +/// +/// This transform will not pull new data from the input until the inner transform returns [None]. +pub struct BlockingTransformer { + inner: T, + input: Arc, + output: Arc, + input_data: Option, + output_data: Option, + need_data: bool, +} + +impl BlockingTransformer { + pub fn create(input: Arc, output: Arc, inner: T) -> Box { + Box::new(Self { + inner, + input, + output, + input_data: None, + output_data: None, + need_data: true, + }) + } +} + +#[async_trait::async_trait] +impl Processor for BlockingTransformer { + fn name(&self) -> String { + String::from(T::NAME) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + + if !self.need_data { + // There is data needed to be transformed. + return Ok(Event::Sync); + } + + // The data is fully consumed, we can begin to consume new data. + if self.input.has_data() { + let data = self.input.pull_data().unwrap()?; + self.input_data = Some(data); + return Ok(Event::Sync); + } + + if self.input.is_finished() { + self.output.finish(); + return Ok(Event::Finished); + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + fn process(&mut self) -> Result<()> { + if let Some(input) = self.input_data.take() { + debug_assert!(self.need_data); + self.inner.consume(input)?; + } + + if let Some(block) = self.inner.transform()? { + self.output_data = Some(block); + self.need_data = false; + } else { + self.need_data = true; + } + + Ok(()) + } +} From a38cdf1b9873a3b232ed48bd4115cc7914a0dd4d Mon Sep 17 00:00:00 2001 From: hezheyu Date: Mon, 27 Nov 2023 21:07:18 +0800 Subject: [PATCH 2/5] Add a new transform `TransformSRF` to process SRFs. Output blocks by determined batch size. --- .../src/pipelines/builders/builder_project.rs | 37 +-- .../pipelines/processors/transforms/mod.rs | 2 + .../processors/transforms/transform_srf.rs | 286 ++++++++++++++++++ src/query/sql/src/evaluator/block_operator.rs | 192 ------------ 4 files changed, 304 insertions(+), 213 deletions(-) create mode 100644 src/query/service/src/pipelines/processors/transforms/transform_srf.rs diff --git a/src/query/service/src/pipelines/builders/builder_project.rs b/src/query/service/src/pipelines/builders/builder_project.rs index 195cf84b1cb80..354a5575578be 100644 --- a/src/query/service/src/pipelines/builders/builder_project.rs +++ b/src/query/service/src/pipelines/builders/builder_project.rs @@ -19,14 +19,14 @@ use common_functions::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::ProcessorPtr; use common_pipeline_core::Pipeline; use common_pipeline_sinks::EmptySink; -use common_pipeline_transforms::processors::TransformProfileWrapper; -use common_pipeline_transforms::processors::Transformer; +use common_pipeline_transforms::processors::ProcessorProfileWrapper; use common_sql::evaluator::BlockOperator; use common_sql::evaluator::CompoundBlockOperator; use common_sql::executor::physical_plans::Project; use common_sql::executor::physical_plans::ProjectSet; use common_sql::ColumnBinding; +use crate::pipelines::processors::transforms::TransformSRF; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -82,36 +82,31 @@ impl PipelineBuilder { pub(crate) fn build_project_set(&mut self, project_set: &ProjectSet) -> Result<()> { self.build_pipeline(&project_set.input)?; - let op = BlockOperator::FlatMap { - projections: project_set.projections.clone(), - srf_exprs: project_set - .srf_exprs - .iter() - .map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS)) - .collect(), - }; - - let num_input_columns = project_set.input.output_schema()?.num_fields(); + let srf_exprs = project_set + .srf_exprs + .iter() + .map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS)) + .collect::>(); + let max_block_size = self.settings.get_max_block_size()? as usize; self.main_pipeline.add_transform(|input, output| { - let transform = CompoundBlockOperator::new( - vec![op.clone()], + let transform = TransformSRF::try_create( + input, + output, self.func_ctx.clone(), - num_input_columns, + project_set.projections.clone(), + srf_exprs.clone(), + max_block_size, ); if self.enable_profiling { - Ok(ProcessorPtr::create(TransformProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, - input, - output, project_set.plan_id, self.proc_profs.clone(), ))) } else { - Ok(ProcessorPtr::create(Transformer::create( - input, output, transform, - ))) + Ok(ProcessorPtr::create(transform)) } }) } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 02174a432a088..74639dc06c69f 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -32,6 +32,7 @@ mod transform_resort_addon; mod transform_resort_addon_without_source_schema; mod transform_runtime_cast_schema; mod transform_runtime_filter; +mod transform_srf; mod transform_udf; mod window; @@ -57,6 +58,7 @@ pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithou pub use transform_runtime_cast_schema::TransformRuntimeCastSchema; pub use transform_runtime_filter::SinkRuntimeFilterSource; pub use transform_runtime_filter::TransformRuntimeFilter; +pub use transform_srf::TransformSRF; pub use transform_udf::TransformUdf; pub use window::FrameBound; pub use window::TransformWindow; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs new file mode 100644 index 0000000000000..6e400617d49e6 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs @@ -0,0 +1,286 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::types::nullable::NullableColumnBuilder; +use common_expression::types::AnyType; +use common_expression::types::DataType; +use common_expression::types::VariantType; +use common_expression::BlockEntry; +use common_expression::Column; +use common_expression::ColumnBuilder; +use common_expression::DataBlock; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::ScalarRef; +use common_expression::Value; +use common_functions::BUILTIN_FUNCTIONS; +use common_pipeline_core::processors::InputPort; +use common_pipeline_core::processors::OutputPort; +use common_pipeline_core::processors::Processor; +use common_pipeline_transforms::processors::BlockingTransform; +use common_pipeline_transforms::processors::BlockingTransformer; +use common_sql::ColumnSet; + +/// Expand the input [`DataBlock`] with set-returning functions. +pub struct TransformSRF { + func_ctx: FunctionContext, + projections: ColumnSet, + srf_exprs: Vec, + num_rows: VecDeque, + srf_results: Vec, usize)>>, + input: Option, + max_block_size: usize, +} + +impl TransformSRF { + pub fn try_create( + input: Arc, + output: Arc, + func_ctx: FunctionContext, + projections: ColumnSet, + srf_exprs: Vec, + max_block_size: usize, + ) -> Box { + BlockingTransformer::create(input, output, TransformSRF { + func_ctx, + projections, + srf_exprs, + num_rows: VecDeque::new(), + srf_results: Vec::new(), + input: None, + max_block_size, + }) + } +} + +impl BlockingTransform for TransformSRF { + const NAME: &'static str = "TransformSRF"; + + fn consume(&mut self, input: DataBlock) -> Result<()> { + let eval = Evaluator::new(&input, &self.func_ctx, &BUILTIN_FUNCTIONS); + + // [ + // srf1: [ + // result_set1: [ + // col1, col2, ... + // ], + // ... + // ], + // ... + // ] + let input_num_rows = input.num_rows(); + let mut max_nums_per_row = vec![0; input_num_rows]; + let srf_results = self + .srf_exprs + .iter() + .map(|srf_expr| { + let res = eval.run_srf(srf_expr, &mut max_nums_per_row)?; + debug_assert_eq!(res.len(), input_num_rows); + Ok(VecDeque::from(res)) + }) + .collect::>>()?; + + debug_assert_eq!(max_nums_per_row.len(), input_num_rows); + debug_assert!(self.num_rows.is_empty()); + debug_assert!(self.srf_results.is_empty()); + debug_assert!(self.input.is_none()); + + self.num_rows = VecDeque::from(max_nums_per_row); + self.srf_results = srf_results; + self.input = Some(input.project(&self.projections)); + + Ok(()) + } + + fn transform(&mut self) -> Result> { + if self.input.is_none() { + return Ok(None); + } + let input = self.input.take().unwrap(); + + let mut result_size = 0; + let mut used = 0; + + for num_rows in self.num_rows.iter() { + result_size += num_rows; + // TBD: if we need to limit `result_size` under `max_block_size`. + if result_size >= self.max_block_size { + break; + } + used += 1; + } + + // TODO: if there is only one row can be used, we can use `Value::Scalar` directly. + // Condition: `used == 1` and the rows of all the `srf_results` is equal to `max_nums_per_row[0]`. + + let mut result = DataBlock::empty(); + let mut block_is_empty = true; + for column in input.columns() { + let mut builder = ColumnBuilder::with_capacity(&column.data_type, result_size); + for (i, max_nums) in self.num_rows.iter().take(used).enumerate() { + let scalar_ref = unsafe { column.value.index_unchecked(i) }; + for _ in 0..*max_nums { + builder.push(scalar_ref.clone()); + } + } + let block_entry = + BlockEntry::new(column.data_type.clone(), Value::Column(builder.build())); + if block_is_empty { + result = DataBlock::new(vec![block_entry], result_size); + block_is_empty = false; + } else { + result.add_column(block_entry); + } + } + + for (srf_expr, srf_results) in self.srf_exprs.iter().zip(self.srf_results.iter_mut()) { + if let Expr::FunctionCall { function, .. } = srf_expr { + match function.signature.name.as_str() { + "json_path_query" => { + let mut builder: NullableColumnBuilder = + NullableColumnBuilder::with_capacity(result_size, &[]); + for (i, (row_result, repeat_times)) in + srf_results.drain(0..used).enumerate() + { + if let Value::Column(Column::Tuple(fields)) = row_result { + debug_assert!(fields.len() == 1); + match &fields[0] { + Column::Nullable(box nullable_column) => { + match &nullable_column.column { + Column::Variant(string_column) => { + for idx in 0..repeat_times { + builder.push(unsafe { + string_column.index_unchecked(idx) + }); + } + for _ in 0..(self.num_rows[i] - repeat_times) { + builder.push_null(); + } + } + _ => unreachable!( + "json_path_query's return type is: `DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))])`" + ), + } + } + _ => unreachable!( + "json_path_query's return type is: `DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))])`" + ), + }; + } + } + let column = builder.build().upcast(); + let block_entry = BlockEntry::new( + DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))]), + Value::Column(Column::Tuple(vec![Column::Nullable(Box::new(column))])), + ); + if block_is_empty { + result = DataBlock::new(vec![block_entry], result_size); + block_is_empty = false; + } else { + result.add_column(block_entry); + } + } + _ => { + let mut result_data_blocks = Vec::with_capacity(used); + for (i, (mut row_result, repeat_times)) in + srf_results.drain(0..used).enumerate() + { + if let Value::Column(Column::Tuple(fields)) = &mut row_result { + // If the current result set has less rows than the max number of rows, + // we need to pad the result set with null values. + // TODO(leiysky): this can be optimized by using a `zip` array function + if repeat_times < self.num_rows[i] { + for field in fields { + match field { + Column::Null { .. } => { + *field = ColumnBuilder::repeat( + &ScalarRef::Null, + self.num_rows[i], + &DataType::Null, + ) + .build(); + } + Column::Nullable(box nullable_column) => { + let mut column_builder = + NullableColumnBuilder::from_column( + (*nullable_column).clone(), + ); + (0..(self.num_rows[i] - repeat_times)).for_each( + |_| { + column_builder.push_null(); + }, + ); + *field = Column::Nullable(Box::new( + column_builder.build(), + )); + } + _ => unreachable!(), + } + } + } + } else { + row_result = Value::Column( + ColumnBuilder::repeat( + &ScalarRef::Tuple(vec![ScalarRef::Null]), + self.num_rows[i], + srf_expr.data_type(), + ) + .build(), + ); + } + + let block_entry = + BlockEntry::new(srf_expr.data_type().clone(), row_result); + result_data_blocks + .push(DataBlock::new(vec![block_entry], self.num_rows[i])) + } + let data_block = DataBlock::concat(&result_data_blocks)?; + debug_assert!(data_block.num_rows() == result_size); + let block_entry = BlockEntry::new( + data_block.get_by_offset(0).data_type.clone(), + data_block.get_by_offset(0).value.clone(), + ); + if block_is_empty { + result = DataBlock::new(vec![block_entry], result_size); + block_is_empty = false; + } else { + result.add_column(block_entry); + } + } + } + } else { + unreachable!("expr is not a set returning function: {srf_expr}"); + } + } + + // Release consumed rows. + self.num_rows.drain(0..used); + // `self.srf_results` is already drained. + let input = input.slice(0..used); + if input.num_rows() == 0 { + debug_assert!(self.num_rows.is_empty()); + debug_assert!(self.srf_results.iter().all(|res| res.is_empty())); + self.input = None; + } else { + self.input = Some(input); + } + + Ok(Some(result)) + } +} diff --git a/src/query/sql/src/evaluator/block_operator.rs b/src/query/sql/src/evaluator/block_operator.rs index a5a5d2a288b8b..b4311a0e4e639 100644 --- a/src/query/sql/src/evaluator/block_operator.rs +++ b/src/query/sql/src/evaluator/block_operator.rs @@ -18,21 +18,17 @@ use common_catalog::plan::AggIndexMeta; use common_exception::Result; use common_expression::types::array::ArrayColumn; use common_expression::types::nullable::NullableColumn; -use common_expression::types::nullable::NullableColumnBuilder; use common_expression::types::BooleanType; use common_expression::types::DataType; -use common_expression::types::VariantType; use common_expression::BlockEntry; use common_expression::BlockMetaInfoDowncast; use common_expression::Column; -use common_expression::ColumnBuilder; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; use common_expression::FieldIndex; use common_expression::FunctionContext; use common_expression::Scalar; -use common_expression::ScalarRef; use common_expression::Value; use common_functions::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::InputPort; @@ -60,12 +56,6 @@ pub enum BlockOperator { /// Reorganize the input [`DataBlock`] with `projection`. Project { projection: Vec }, - /// Expand the input [`DataBlock`] with set-returning functions. - FlatMap { - projections: ColumnSet, - srf_exprs: Vec, - }, - /// Execute lambda function on input [`DataBlock`]. LambdaMap { funcs: Vec }, } @@ -131,187 +121,6 @@ impl BlockOperator { Ok(result) } - BlockOperator::FlatMap { - projections, - srf_exprs, - } => { - let eval = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS); - - // [ - // srf1: [ - // result_set1: [ - // col1, col2, ... - // ], - // ... - // ], - // ... - // ] - let input_num_rows = input.num_rows(); - let mut max_nums_per_row = vec![0; input_num_rows]; - let srf_results = srf_exprs - .iter() - .map(|srf_expr| eval.run_srf(srf_expr, &mut max_nums_per_row)) - .collect::>>()?; - let mut total_num_rows = 0; - for max_nums in max_nums_per_row.iter().take(input_num_rows) { - total_num_rows += *max_nums; - } - - let input_num_columns = input.num_columns(); - let mut result = DataBlock::empty(); - let mut block_is_empty = true; - for index in 0..input_num_columns { - if !projections.contains(&index) { - continue; - } - let column = input.get_by_offset(index); - let mut builder = - ColumnBuilder::with_capacity(&column.data_type, total_num_rows); - for (i, max_nums) in max_nums_per_row.iter().take(input_num_rows).enumerate() { - let scalar_ref = unsafe { column.value.index_unchecked(i) }; - for _ in 0..*max_nums { - builder.push(scalar_ref.clone()); - } - } - let block_entry = - BlockEntry::new(column.data_type.clone(), Value::Column(builder.build())); - if block_is_empty { - result = DataBlock::new(vec![block_entry], total_num_rows); - block_is_empty = false; - } else { - result.add_column(block_entry); - } - } - - for (srf_expr, srf_results) in srf_exprs.iter().zip(srf_results) { - if let Expr::FunctionCall { function, .. } = srf_expr { - match function.signature.name.as_str() { - "json_path_query" => { - let mut builder: NullableColumnBuilder = - NullableColumnBuilder::with_capacity(total_num_rows, &[]); - for (i, (row_result, repeat_times)) in - srf_results.into_iter().enumerate() - { - if let Value::Column(Column::Tuple(fields)) = row_result { - debug_assert!(fields.len() == 1); - match &fields[0] { - Column::Nullable(box nullable_column) => { - match &nullable_column.column { - Column::Variant(string_column) => { - for idx in 0..repeat_times { - builder.push(unsafe { - string_column.index_unchecked(idx) - }); - } - for _ in - 0..(max_nums_per_row[i] - repeat_times) - { - builder.push_null(); - } - } - _ => unreachable!( - "json_path_query's return type is: `DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))])`" - ), - } - } - _ => unreachable!( - "json_path_query's return type is: `DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Variant))])`" - ), - }; - } - } - let column = builder.build().upcast(); - let block_entry = BlockEntry::new( - DataType::Tuple(vec![DataType::Nullable(Box::new( - DataType::Variant, - ))]), - Value::Column(Column::Tuple(vec![Column::Nullable(Box::new( - column, - ))])), - ); - if block_is_empty { - result = DataBlock::new(vec![block_entry], total_num_rows); - block_is_empty = false; - } else { - result.add_column(block_entry); - } - } - _ => { - let mut result_data_blocks = Vec::with_capacity(input.num_rows()); - for (i, (mut row_result, repeat_times)) in - srf_results.into_iter().enumerate() - { - if let Value::Column(Column::Tuple(fields)) = &mut row_result { - // If the current result set has less rows than the max number of rows, - // we need to pad the result set with null values. - // TODO(leiysky): this can be optimized by using a `zip` array function - if repeat_times < max_nums_per_row[i] { - for field in fields { - match field { - Column::Null { .. } => { - *field = ColumnBuilder::repeat( - &ScalarRef::Null, - max_nums_per_row[i], - &DataType::Null, - ) - .build(); - } - Column::Nullable(box nullable_column) => { - let mut column_builder = - NullableColumnBuilder::from_column( - (*nullable_column).clone(), - ); - (0..(max_nums_per_row[i] - repeat_times)) - .for_each(|_| { - column_builder.push_null(); - }); - *field = Column::Nullable(Box::new( - column_builder.build(), - )); - } - _ => unreachable!(), - } - } - } - } else { - row_result = Value::Column( - ColumnBuilder::repeat( - &ScalarRef::Tuple(vec![ScalarRef::Null]), - max_nums_per_row[i], - srf_expr.data_type(), - ) - .build(), - ); - } - - let block_entry = - BlockEntry::new(srf_expr.data_type().clone(), row_result); - result_data_blocks.push(DataBlock::new( - vec![block_entry], - max_nums_per_row[i], - )) - } - let data_block = DataBlock::concat(&result_data_blocks)?; - debug_assert!(data_block.num_rows() == total_num_rows); - let block_entry = BlockEntry::new( - data_block.get_by_offset(0).data_type.clone(), - data_block.get_by_offset(0).value.clone(), - ); - if block_is_empty { - result = DataBlock::new(vec![block_entry], total_num_rows); - block_is_empty = false; - } else { - result.add_column(block_entry); - } - } - } - } else { - unreachable!("expr is not a set returning function: {srf_expr}"); - } - } - Ok(result) - } - BlockOperator::LambdaMap { funcs } => { for func in funcs { let expr = func.lambda_expr.as_expr(&BUILTIN_FUNCTIONS); @@ -522,7 +331,6 @@ impl Transform for CompoundBlockOperator { BlockOperator::Map { .. } => "Map", BlockOperator::Filter { .. } => "Filter", BlockOperator::Project { .. } => "Project", - BlockOperator::FlatMap { .. } => "FlatMap", BlockOperator::LambdaMap { .. } => "LambdaMap", } .to_string() From 4c14b2ccc823c27e35237f47a137dfa833add108 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Mon, 27 Nov 2023 21:17:26 +0800 Subject: [PATCH 3/5] Set block size when testing SRFs. --- .../02_0051_function_semi_structureds_get.test | 6 ++++++ .../query/02_function/02_0062_function_unnest.test | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/tests/sqllogictests/suites/query/02_function/02_0051_function_semi_structureds_get.test b/tests/sqllogictests/suites/query/02_function/02_0051_function_semi_structureds_get.test index ebf403d4dee25..def821012e6ea 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0051_function_semi_structureds_get.test +++ b/tests/sqllogictests/suites/query/02_function/02_0051_function_semi_structureds_get.test @@ -509,6 +509,9 @@ select id, get(arr, 5) from t4 statement error 1006 select id, get(arr, 'a') from t4 +statement ok +set max_block_size = 2; + query ITS select id, json_path_query(arr, '$[2, 1 to last -1]'), typeof(json_path_query(arr, '$[2, 1 to last -1]')) from t1 ---- @@ -671,6 +674,9 @@ select json_path_query_first(obj, '$.测试\\"\\uD83D\\uDC8E') from t5 "a" NULL +statement ok +set max_block_size = 65535; + query T SELECT arr #> '{3}' FROM t1; ---- diff --git a/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test b/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test index 52cdffb4e7113..75c65ed4af3ba 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test +++ b/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test @@ -1,3 +1,6 @@ +statement ok +set max_block_size = 1; + query I select unnest([]); ---- @@ -128,6 +131,9 @@ select unnest([1,2,3]) + unnest([3]); NULL NULL +statement ok +set max_block_size = 2; + query IIT select unnest([1,2,3]), number, unnest([3]) from numbers(2); ---- @@ -469,3 +475,6 @@ select unnest(parse_json('[1,2,"a",[3,4]]')) query T select unnest(parse_json('"a"')) ---- + +statement ok +set max_block_size = 65535; \ No newline at end of file From 66601f8a87d338951a14398d692f34f816e8b53a Mon Sep 17 00:00:00 2001 From: hezheyu Date: Tue, 28 Nov 2023 11:01:38 +0800 Subject: [PATCH 4/5] Fix bugs. --- .../transforms/transform_blocking.rs | 5 ++++ .../processors/transforms/transform_srf.rs | 26 +++++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs index 2260ca76dfe1c..61d8695b8608d 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_blocking.rs @@ -76,6 +76,11 @@ impl Processor for BlockingTransformer { return Ok(Event::NeedConsume); } + if let Some(output) = self.output_data.take() { + self.output.push_data(Ok(output)); + return Ok(Event::NeedConsume); + } + if !self.need_data { // There is data needed to be transformed. return Ok(Event::Sync); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs index 6e400617d49e6..e7c9bf001d2b0 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_srf.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_srf.rs @@ -42,7 +42,9 @@ pub struct TransformSRF { func_ctx: FunctionContext, projections: ColumnSet, srf_exprs: Vec, + /// The output number of rows for each input row. num_rows: VecDeque, + /// The output of each set-returning function for each input row. srf_results: Vec, usize)>>, input: Option, max_block_size: usize, @@ -57,12 +59,13 @@ impl TransformSRF { srf_exprs: Vec, max_block_size: usize, ) -> Box { + let srf_results = vec![VecDeque::new(); srf_exprs.len()]; BlockingTransformer::create(input, output, TransformSRF { func_ctx, projections, srf_exprs, num_rows: VecDeque::new(), - srf_results: Vec::new(), + srf_results, input: None, max_block_size, }) @@ -86,23 +89,18 @@ impl BlockingTransform for TransformSRF { // ] let input_num_rows = input.num_rows(); let mut max_nums_per_row = vec![0; input_num_rows]; - let srf_results = self - .srf_exprs - .iter() - .map(|srf_expr| { - let res = eval.run_srf(srf_expr, &mut max_nums_per_row)?; - debug_assert_eq!(res.len(), input_num_rows); - Ok(VecDeque::from(res)) - }) - .collect::>>()?; + for (i, expr) in self.srf_exprs.iter().enumerate() { + let res = eval.run_srf(expr, &mut max_nums_per_row)?; + debug_assert_eq!(res.len(), input_num_rows); + debug_assert!(self.srf_results[i].is_empty()); + self.srf_results[i] = VecDeque::from(res); + } debug_assert_eq!(max_nums_per_row.len(), input_num_rows); debug_assert!(self.num_rows.is_empty()); - debug_assert!(self.srf_results.is_empty()); debug_assert!(self.input.is_none()); self.num_rows = VecDeque::from(max_nums_per_row); - self.srf_results = srf_results; self.input = Some(input.project(&self.projections)); Ok(()) @@ -119,11 +117,11 @@ impl BlockingTransform for TransformSRF { for num_rows in self.num_rows.iter() { result_size += num_rows; + used += 1; // TBD: if we need to limit `result_size` under `max_block_size`. if result_size >= self.max_block_size { break; } - used += 1; } // TODO: if there is only one row can be used, we can use `Value::Scalar` directly. @@ -272,7 +270,7 @@ impl BlockingTransform for TransformSRF { // Release consumed rows. self.num_rows.drain(0..used); // `self.srf_results` is already drained. - let input = input.slice(0..used); + let input = input.slice(used..input.num_rows()); if input.num_rows() == 0 { debug_assert!(self.num_rows.is_empty()); debug_assert!(self.srf_results.iter().all(|res| res.is_empty())); From 8f6d534ce4ff7da8dbf1af686dfd16733be93f7a Mon Sep 17 00:00:00 2001 From: hezheyu Date: Tue, 28 Nov 2023 12:13:11 +0800 Subject: [PATCH 5/5] Ensure tests can pass. --- .../suites/query/02_function/02_0062_function_unnest.test | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test b/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test index 75c65ed4af3ba..a9ad4c8058a58 100644 --- a/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test +++ b/tests/sqllogictests/suites/query/02_function/02_0062_function_unnest.test @@ -1,3 +1,6 @@ +statement ok +set max_threads = 1; + statement ok set max_block_size = 1; @@ -477,4 +480,7 @@ select unnest(parse_json('"a"')) ---- statement ok -set max_block_size = 65535; \ No newline at end of file +set max_block_size = 65535; + +statement ok +set max_threads = 16; \ No newline at end of file