From acbee0415b7005427b52c93f29a2ac77db493a4f Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Tue, 25 Oct 2022 20:22:10 +0800 Subject: [PATCH 1/8] Implement the basic streaming sort. --- Cargo.lock | 2 + .../datablocks/src/kernels/data_block_sort.rs | 4 +- .../datablocks/src/kernels/data_block_take.rs | 24 +- src/query/datablocks/src/kernels/mod.rs | 1 + .../pipeline/core/src/pipeline_display.rs | 43 +- src/query/pipeline/transforms/Cargo.toml | 2 + .../src/processors/transforms/mod.rs | 2 + .../transforms/transform_multi_sort_merge.rs | 543 ++++++++++++++++++ .../transforms/transform_sort_merge.rs | 21 +- .../service/src/pipelines/pipeline_builder.rs | 20 +- .../storages/fuse/src/operations/recluster.rs | 5 +- 11 files changed, 636 insertions(+), 31 deletions(-) create mode 100644 src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs diff --git a/Cargo.lock b/Cargo.lock index d4bac3ca1ddce..fb39b99759f86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1875,6 +1875,8 @@ name = "common-pipeline-transforms" version = "0.1.0" dependencies = [ "async-trait-fn", + "common-arrow", + "common-catalog", "common-datablocks", "common-exception", "common-pipeline-core", diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index 9127dc9bee06e..0c676e2f4f301 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -133,7 +133,7 @@ impl DataBlock { .map(|f| { let left = lhs.try_column_by_name(f.name())?; let right = rhs.try_column_by_name(f.name())?; - Self::take_columns_by_slices_limit( + Self::take_column_by_slices_limit( f.data_type(), &[left.clone(), right.clone()], &slices, @@ -225,7 +225,7 @@ fn compare_array(left: &dyn Array, right: &dyn Array) -> ArrowResult ArrowResult { +pub fn build_compare(left: &dyn Array, right: &dyn Array) -> ArrowResult { match left.data_type() { ArrowType::LargeList(_) => compare_array(left, right), ArrowType::Extension(name, _, _) => { diff --git a/src/query/datablocks/src/kernels/data_block_take.rs b/src/query/datablocks/src/kernels/data_block_take.rs index 57777e705f16c..f6b8ff44d4a23 100644 --- a/src/query/datablocks/src/kernels/data_block_take.rs +++ b/src/query/datablocks/src/kernels/data_block_take.rs @@ -61,7 +61,29 @@ impl DataBlock { Ok(DataBlock::create(schema.clone(), result_columns)) } - pub fn take_columns_by_slices_limit( + pub fn block_take_by_slices_limit( + raw: &DataBlock, + slice: (usize, usize), + limit: Option, + ) -> Result { + let fields = raw.schema().fields(); + let columns = fields + .iter() + .map(|f| { + let column = raw.try_column_by_name(f.name())?.clone(); + Self::take_column_by_slices_limit( + f.data_type(), + &[column], + &[(0, slice.0, slice.1)], + limit, + ) + }) + .collect::>>()?; + let data = DataBlock::create(raw.schema().clone(), columns); + Ok(data) + } + + pub fn take_column_by_slices_limit( data_type: &DataTypeImpl, columns: &[ColumnRef], slices: &[MergeSlice], diff --git a/src/query/datablocks/src/kernels/mod.rs b/src/query/datablocks/src/kernels/mod.rs index 5e635860da1ea..e06c9ca7d698f 100644 --- a/src/query/datablocks/src/kernels/mod.rs +++ b/src/query/datablocks/src/kernels/mod.rs @@ -23,4 +23,5 @@ mod data_block_sort; mod data_block_take; pub use data_block_group_by_hash::*; +pub use data_block_sort::build_compare; pub use data_block_sort::SortColumnDescription; diff --git a/src/query/pipeline/core/src/pipeline_display.rs b/src/query/pipeline/core/src/pipeline_display.rs index 8353731d04c9a..f2aa1083c443f 100644 --- a/src/query/pipeline/core/src/pipeline_display.rs +++ b/src/query/pipeline/core/src/pipeline_display.rs @@ -71,21 +71,34 @@ impl<'a> Display for PipelineIndentDisplayWrapper<'a> { .. } => { let prev_name = Self::pipe_name(&pipes[pipes.len() - index - 2]); - let post_name = Self::pipe_name(&pipes[pipes.len() - index]); - - write!( - f, - "Merge ({} × {} {}) to ({} × {})", - prev_name, - inputs_port.len(), - if inputs_port.len() == 1 { - "processor" - } else { - "processors" - }, - post_name, - outputs_port.len(), - )?; + if index > 0 { + let post_name = Self::pipe_name(&pipes[pipes.len() - index]); + write!( + f, + "Merge ({} × {} {}) to ({} × {})", + prev_name, + inputs_port.len(), + if inputs_port.len() == 1 { + "processor" + } else { + "processors" + }, + post_name, + outputs_port.len(), + )?; + } else { + write!( + f, + "Merge ({} × {} {})", + prev_name, + inputs_port.len(), + if inputs_port.len() == 1 { + "processor" + } else { + "processors" + }, + )?; + } } } } diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index aa4a915aea60a..0f7103981437a 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -9,6 +9,8 @@ doctest = false test = false [dependencies] +common-arrow = { path = "../../../common/arrow" } +common-catalog = { path = "../../catalog" } common-datablocks = { path = "../../datablocks" } common-exception = { path = "../../../common/exception" } common-pipeline-core = { path = "../core" } diff --git a/src/query/pipeline/transforms/src/processors/transforms/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/mod.rs index 84475f6c28f1d..61b1396c7415c 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/mod.rs @@ -16,6 +16,7 @@ pub mod transform; pub mod transform_block_compact; pub mod transform_compact; pub mod transform_limit; +pub mod transform_multi_sort_merge; pub mod transform_sort_merge; pub mod transform_sort_partial; @@ -23,5 +24,6 @@ pub use transform::*; pub use transform_block_compact::*; pub use transform_compact::*; pub use transform_limit::*; +pub use transform_multi_sort_merge::*; pub use transform_sort_merge::*; pub use transform_sort_partial::*; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs new file mode 100644 index 0000000000000..16837e2fa594c --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -0,0 +1,543 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cell::RefCell; +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use common_arrow::arrow::array::ord::DynComparator; +use common_arrow::ArrayRef; +use common_datablocks::build_compare; +use common_datablocks::DataBlock; +use common_datablocks::SortColumnDescription; +use common_datavalues::ColumnRef; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use common_pipeline_core::Pipe; +use common_pipeline_core::Pipeline; + +pub fn try_add_multi_sort_merge( + pipeline: &mut Pipeline, + output_schema: DataSchemaRef, + block_size: usize, + limit: Option, + sort_columns_descriptions: Vec, +) -> Result<()> { + match pipeline.pipes.last() { + None => Err(ErrorCode::LogicalError("Cannot resize empty pipe.")), + Some(pipe) if pipe.output_size() == 0 => { + Err(ErrorCode::LogicalError("Cannot resize empty pipe.")) + } + Some(pipe) if pipe.output_size() == 1 => Ok(()), + Some(pipe) => { + let input_size = pipe.output_size(); + let mut inputs_port = Vec::with_capacity(input_size); + for _ in 0..input_size { + inputs_port.push(InputPort::create()); + } + let output_port = OutputPort::create(); + let processor = MultiSortMergeProcessor::create( + inputs_port.clone(), + output_port.clone(), + output_schema, + block_size, + limit, + sort_columns_descriptions, + ); + pipeline.pipes.push(Pipe::ResizePipe { + inputs_port, + outputs_port: vec![output_port], + processor: ProcessorPtr::create(Box::new(processor)), + }); + Ok(()) + } + } +} + +/// A cursor point to a certain row in a data block. +struct Cursor { + pub input_index: usize, + pub row_index: usize, + + /// block_id is a global unique id. Not the sequence number of its input. + pub block_id: usize, + num_rows: usize, + + sort_columns: Vec, + sort_columns_descriptions: Vec, + + /// A collection of comparators that compare rows in this cursors' block + /// to the cursors in other blocks. + /// + /// Other blocks are uniquely identified by their input_index and block_index. + comparators: RefCell)>>, +} + +impl Cursor { + pub fn try_create( + input_index: usize, + block_index: usize, + block: &DataBlock, + sort_columns_descriptions: Vec, + ) -> Result { + let sort_columns = sort_columns_descriptions + .iter() + .map(|f| { + let c = block.try_column_by_name(&f.column_name)?; + Ok(c.clone()) + }) + .collect::>>()?; + Ok(Cursor { + input_index, + block_id: block_index, + row_index: 0, + sort_columns_descriptions, + sort_columns, + num_rows: block.num_rows(), + comparators: RefCell::new(HashMap::new()), + }) + } + + #[inline] + pub fn advance(&mut self) -> usize { + let res = self.row_index; + self.row_index += 1; + res + } + + #[inline] + pub fn is_finished(&self) -> bool { + self.num_rows == self.row_index + } + + pub fn compare(&self, other: &Cursor) -> Result { + if self.sort_columns.len() != other.sort_columns.len() { + return Err(ErrorCode::LogicalError(format!( + "Sort columns length not match: {} != {}", + self.sort_columns.len(), + other.sort_columns.len() + ))); + } + let zipped = self + .sort_columns + .iter() + .map(|c| c.as_arrow_array(c.data_type())) + .zip( + other + .sort_columns + .iter() + .map(|c| c.as_arrow_array(c.data_type())), + ) + .zip(self.sort_columns_descriptions.iter()) + .collect::>(); + self.add_block_comparators(other, &zipped)?; + let comparators = self.comparators.borrow(); + let comparators = &comparators + .get(&other.input_index) + .ok_or_else(|| { + ErrorCode::LogicalError(format!( + "Cannot find comparators to compare {}.{} with {}.{}", + self.input_index, self.block_id, other.input_index, other.block_id + )) + })? + .1; + + for (i, ((l, r), option)) in zipped.iter().enumerate() { + match (l.is_valid(self.row_index), r.is_valid(other.row_index)) { + (false, true) if option.nulls_first => return Ok(Ordering::Less), + (false, true) => return Ok(Ordering::Greater), + (true, false) if option.nulls_first => return Ok(Ordering::Greater), + (true, false) => return Ok(Ordering::Less), + (false, false) => {} + (true, true) => match comparators[i](self.row_index, other.row_index) { + Ordering::Equal => {} + o if !option.asc => return Ok(o.reverse()), + o => return Ok(o), + }, + } + } + + // If all columns are equal, compare the input index. + Ok(self.input_index.cmp(&other.input_index)) + } + + /// update comparators if there comes a new block. + pub fn add_block_comparators( + &self, + other: &Cursor, + zipped: &[((ArrayRef, ArrayRef), &SortColumnDescription)], + ) -> Result<()> { + let mut comparators = self.comparators.borrow_mut(); + if let Some(cmps) = comparators.get_mut(&other.input_index) { + // comparator of this block is already exisits. + if cmps.0 == other.block_id { + return Ok(()); + } + let mut compares = Vec::with_capacity(other.sort_columns.len()); + for ((l, r), _) in zipped.iter() { + compares.push(build_compare(&**l, &**r)?) + } + cmps.1 = compares; + } else { + let mut compares = Vec::with_capacity(other.sort_columns.len()); + for ((l, r), _) in zipped.iter() { + compares.push(build_compare(&**l, &**r)?) + } + comparators.insert(other.input_index, (other.block_id, compares)); + } + Ok(()) + } +} + +impl Ord for Cursor { + fn cmp(&self, other: &Self) -> Ordering { + self.compare(other).unwrap() + } +} + +impl PartialEq for Cursor { + fn eq(&self, other: &Self) -> bool { + other.compare(self).unwrap() == Ordering::Equal + } +} + +impl Eq for Cursor {} + +impl PartialOrd for Cursor { + fn partial_cmp(&self, other: &Self) -> Option { + other.compare(self).ok() + } +} + +/// TransformMultiSortMerge is a processor with multiple input ports; +pub struct MultiSortMergeProcessor { + /// Data from inputs (every input is sorted) + inputs: Vec>, + output: Arc, + output_schema: DataSchemaRef, + + // Parameters + block_size: usize, + limit: Option, + sort_columns_descriptions: Vec, + + /// For each input port, maintain a dequeue of data blocks. + blocks: Vec>, + /// Maintain a flag for each input denoting if the current cursor has finished + /// and needs to pull data from input. + cursor_finished: Vec, + /// The accumulated rows for the next output data block. + /// + /// Data format: (input_index, block_index, row_index) + in_progess_rows: Vec<(usize, usize, usize)>, + /// Heap that yields [`Cursor`] in increasing order. + heap: BinaryHeap, + /// Assign every input block a unique id. + cur_block_index: usize, + /// If the input port is finished. + input_finished: Vec, + + state: ProcessorState, + + aborting: Arc, +} + +impl MultiSortMergeProcessor { + pub fn create( + inputs: Vec>, + output: Arc, + output_schema: DataSchemaRef, + block_size: usize, + limit: Option, + sort_columns_descriptions: Vec, + ) -> Self { + let intput_size = inputs.len(); + Self { + inputs, + output, + output_schema, + block_size, + limit, + sort_columns_descriptions, + blocks: vec![VecDeque::new(); intput_size], + heap: BinaryHeap::with_capacity(intput_size), + in_progess_rows: vec![], + cursor_finished: vec![true; intput_size], + cur_block_index: 0, + input_finished: vec![false; intput_size], + state: ProcessorState::Consume, + aborting: Arc::new(AtomicBool::new(false)), + } + } + + fn get_data_blocks(&mut self) -> Result> { + let mut data = Vec::new(); + for (i, input) in self.inputs.iter().enumerate() { + if input.is_finished() { + self.input_finished[i] = true; + continue; + } + input.set_need_data(); + if self.cursor_finished[i] { + if input.has_data() { + data.push((i, input.pull_data().unwrap()?)); + } + } + } + Ok(data) + } + + fn nums_active_inputs(&self) -> usize { + self.input_finished + .iter() + .filter(|&&finished| !finished) + .count() + } + + fn drain_heap(&mut self) { + let nums_active_inputs = self.nums_active_inputs(); + // Need to pop data to in_progess_rows. + // Use `>=` because some of the input ports may be finished, but the data is still in the heap. + while self.heap.len() >= nums_active_inputs { + match self.heap.pop() { + Some(mut cursor) => { + let input_index = cursor.input_index; + let row_index = cursor.advance(); + if !cursor.is_finished() { + self.heap.push(cursor); + } else { + // We have read all rows of this block, need to read a new one. + self.cursor_finished[input_index] = true; + } + let block_index = self.blocks[input_index].len() - 1; + self.in_progess_rows + .push((input_index, block_index, row_index)); + // Reach the block size, need to output. + if self.in_progess_rows.len() >= self.block_size { + self.state = ProcessorState::Output; + break; + } + } + None => { + // Special case: self.heap.len() == 0 && nums_active_inputs == 0. + // `self.in_progress_rows` cannot be empty. + // If reach here, it means that all inputs are finished but `self.heap` is not empty before the while loop. + // Therefore, when reach here, data in `self.heap` is all drained into `self.in_progress_rows`. + debug_assert!(!self.in_progess_rows.is_empty()); + self.state = ProcessorState::Output; + break; + } + } + } + } + + /// Drain `self.in_progess_rows` to build a output data block. + fn build_block(&mut self) -> Result { + let num_rows = self.in_progess_rows.len(); + debug_assert!(num_rows > 0 && num_rows <= self.block_size); + + let mut blocks_num_pre_sum = Vec::with_capacity(self.blocks.len()); + let mut len = 0; + for block in self.blocks.iter() { + blocks_num_pre_sum.push(len); + len += block.len(); + } + + // Compute the indices of the output block. + let first_row = &self.in_progess_rows[0]; + let mut index = blocks_num_pre_sum[first_row.0] + first_row.1; + let mut start_row_index = first_row.2; + let mut end_row_index = start_row_index + 1; + let mut indices = Vec::new(); + for row in self.in_progess_rows.iter().skip(1) { + let next_index = blocks_num_pre_sum[row.0] + row.1; + if next_index == index { + // Within a same block. + end_row_index += 1; + continue; + } + // next_index != index + // Record a range in the block. + indices.push((index, start_row_index, end_row_index - start_row_index)); + // Start to record a new block. + index = next_index; + start_row_index = row.2; + end_row_index = start_row_index + 1; + } + indices.push((index, start_row_index, end_row_index - start_row_index)); + + let columns = self + .output_schema + .fields() + .iter() + .enumerate() + .map(|(column_index, field)| { + // Collect all rows for a ceterain column out of all preserved blocks. + let candidate_cols = self + .blocks + .iter() + .flatten() + .map(|block| block.column(column_index).clone()) + .collect::>(); + DataBlock::take_column_by_slices_limit( + field.data_type(), + &candidate_cols, + &indices, + None, + ) + }) + .collect::>>()?; + + // Clear no need data. + self.in_progess_rows.clear(); + // A cursor pointing to a new block is created onlyh if the previous block is finished. + // This means that all blocks except the last one for each input port are drained into the output block. + // Therefore, the previous blocks can be cleared. + for blocks in self.blocks.iter_mut() { + if blocks.len() > 1 { + blocks.drain(0..(blocks.len() - 1)); + } + } + + Ok(DataBlock::create(self.output_schema.clone(), columns)) + } +} + +#[async_trait::async_trait] +impl Processor for MultiSortMergeProcessor { + fn name(&self) -> String { + "MultiSortMerge".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn interrupt(&self) { + self.aborting.store(true, sync::atomic::Ordering::Release); + } + + fn event(&mut self) -> Result { + let aborting = self.aborting.load(sync::atomic::Ordering::Relaxed); + if aborting { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + + if self.output.is_finished() { + for input in self.inputs.iter() { + input.finish(); + } + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if matches!(self.state, ProcessorState::Generated(_)) { + if let ProcessorState::Generated(data_block) = + std::mem::replace(&mut self.state, ProcessorState::Consume) + { + self.output.push_data(Ok(data_block)); + return Ok(Event::NeedConsume); + } + } + + match &self.state { + ProcessorState::Consume => { + let data_blocks = self.get_data_blocks()?; + if !data_blocks.is_empty() { + self.state = ProcessorState::Preserve(data_blocks); + return Ok(Event::Sync); + } + let all_finished = self.nums_active_inputs() == 0; + if all_finished { + if !self.heap.is_empty() { + // The heap is not drained yet. Need to drain data into in_progress_rows. + self.state = ProcessorState::Preserve(vec![]); + return Ok(Event::Sync); + } + if !self.in_progess_rows.is_empty() { + // The in_progess_rows is not drained yet. Need to drain data into output. + self.state = ProcessorState::Output; + return Ok(Event::Sync); + } + self.output.finish(); + return Ok(Event::Finished); + } else { + // `data_blocks` is empty + if !self.heap.is_empty() { + // The heap is not drained yet. Need to drain data into in_progress_rows. + self.state = ProcessorState::Preserve(vec![]); + Ok(Event::Sync) + } else { + Ok(Event::NeedData) + } + } + } + ProcessorState::Output => Ok(Event::Sync), + _ => Err(ErrorCode::LogicalError("It's a bug.")), + } + } + + fn process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, ProcessorState::Consume) { + ProcessorState::Preserve(blocks) => { + for (input_index, block) in blocks.into_iter() { + if !block.is_empty() { + let cursor = Cursor::try_create( + input_index, + self.cur_block_index, + &block, + self.sort_columns_descriptions.clone(), + )?; + self.heap.push(cursor); + self.cursor_finished[input_index] = false; + self.blocks[input_index].push_back(block); + self.cur_block_index += 1; + } + } + self.drain_heap(); + Ok(()) + } + ProcessorState::Output => { + let block = self.build_block()?; + self.state = ProcessorState::Generated(block); + Ok(()) + } + _ => Err(ErrorCode::LogicalError("It's a bug.")), + } + } +} + +enum ProcessorState { + Consume, // Need to consume data from input. + Preserve(Vec<(usize, DataBlock)>), // Need to preserve blocks in memory. + Output, // Need to generate output block. + Generated(DataBlock), // Need to push output block to output port. +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index 50890ec51f4e9..90c0f18b36b6f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -25,6 +25,7 @@ use super::TransformCompact; use crate::processors::transforms::Aborting; pub struct SortMergeCompactor { + block_size: usize, limit: Option, sort_columns_descriptions: Vec, aborting: Arc, @@ -32,10 +33,12 @@ pub struct SortMergeCompactor { impl SortMergeCompactor { pub fn new( + block_size: usize, limit: Option, sort_columns_descriptions: Vec, ) -> Self { SortMergeCompactor { + block_size, limit, sort_columns_descriptions, aborting: Arc::new(AtomicBool::new(false)), @@ -65,7 +68,23 @@ impl Compactor for SortMergeCompactor { self.limit, aborting, )?; - Ok(vec![block]) + // split block by `self.block_size` + let num_rows = block.num_rows(); + let num_blocks = + num_rows / self.block_size + if num_rows % self.block_size > 0 { 1 } else { 0 }; + let mut start = 0; + let mut output = Vec::with_capacity(num_blocks); + for _ in 0..num_blocks { + let end = std::cmp::min(start + self.block_size, num_rows); + let block = DataBlock::block_take_by_slices_limit( + &block, + (start, end - start), + self.limit.clone(), + )?; + start = end; + output.push(block); + } + Ok(output) } } } diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index cfbd643803de8..266b1dbf512c1 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -30,6 +30,7 @@ use common_functions::scalars::FunctionContext; use common_functions::scalars::FunctionFactory; use common_pipeline_core::Pipe; use common_pipeline_sinks::processors::sinks::UnionReceiveSink; +use common_pipeline_transforms::processors::transforms::try_add_multi_sort_merge; use common_sql::evaluator::ChunkOperator; use common_sql::evaluator::CompoundChunkOperator; use common_sql::executor::AggregateFunctionDesc; @@ -429,6 +430,7 @@ impl PipelineBuilder { }) .collect(); + let block_size = self.ctx.get_settings().get_max_block_size()? as usize; // Sort self.main_pipeline.add_transform(|input, output| { TransformSortPartial::try_create(input, output, sort.limit, sort_desc.clone()) @@ -439,20 +441,18 @@ impl PipelineBuilder { TransformSortMerge::try_create( input, output, - SortMergeCompactor::new(sort.limit, sort_desc.clone()), + SortMergeCompactor::new(block_size, sort.limit, sort_desc.clone()), ) })?; - self.main_pipeline.resize(1)?; - // Concat merge in single thread - self.main_pipeline.add_transform(|input, output| { - TransformSortMerge::try_create( - input, - output, - SortMergeCompactor::new(sort.limit, sort_desc.clone()), - ) - }) + try_add_multi_sort_merge( + &mut self.main_pipeline, + sort.output_schema()?, + block_size, + sort.limit, + sort_desc, + ) } fn build_limit(&mut self, limit: &Limit) -> Result<()> { diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index d6b680c3d4aaf..e06f158c14ae1 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -163,11 +163,12 @@ impl FuseTable { sort_descs.clone(), ) })?; + let block_size = ctx.get_settings().get_max_block_size()? as usize; pipeline.add_transform(|transform_input_port, transform_output_port| { TransformSortMerge::try_create( transform_input_port, transform_output_port, - SortMergeCompactor::new(None, sort_descs.clone()), + SortMergeCompactor::new(block_size, None, sort_descs.clone()), ) })?; pipeline.resize(1)?; @@ -175,7 +176,7 @@ impl FuseTable { TransformSortMerge::try_create( transform_input_port, transform_output_port, - SortMergeCompactor::new(None, sort_descs.clone()), + SortMergeCompactor::new(block_size, None, sort_descs.clone()), ) })?; From 7c194e3688297c0109a8635411bea21a886d1f38 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Thu, 27 Oct 2022 19:36:36 +0800 Subject: [PATCH 2/8] refactor: use a global compare map. --- Cargo.lock | 1 - .../datablocks/src/kernels/data_block_sort.rs | 20 ++- src/query/datablocks/src/kernels/mod.rs | 2 +- src/query/pipeline/transforms/Cargo.toml | 1 - .../transforms/transform_multi_sort_merge.rs | 140 ++++++++---------- 5 files changed, 81 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb39b99759f86..7f8885b223d4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1875,7 +1875,6 @@ name = "common-pipeline-transforms" version = "0.1.0" dependencies = [ "async-trait-fn", - "common-arrow", "common-catalog", "common-datablocks", "common-exception", diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index 0c676e2f4f301..0e30855aca191 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -205,12 +205,28 @@ impl DataBlock { } } } + + pub fn build_compare(left: &DataBlock, right: &DataBlock) -> Result { + let fields = left.schema().fields(); + let mut res = Vec::with_capacity(fields.len()); + for f in fields { + let l = left.try_column_by_name(f.name())?; + let l = l.as_arrow_array(f.data_type().clone()); + + let r = right.try_column_by_name(f.name())?; + let r = r.as_arrow_array(f.data_type().clone()); + let cmp = build_compare(&*l, &*r)?; + res.push(cmp); + } + Ok(res) + } } +pub type ColumnsDynComparator = Vec; + fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult { let left = VariantColumn::from_arrow_array(left); let right = VariantColumn::from_arrow_array(right); - Ok(Box::new(move |i, j| { left.get_data(i).cmp(right.get_data(j)) })) @@ -225,7 +241,7 @@ fn compare_array(left: &dyn Array, right: &dyn Array) -> ArrowResult ArrowResult { +fn build_compare(left: &dyn Array, right: &dyn Array) -> ArrowResult { match left.data_type() { ArrowType::LargeList(_) => compare_array(left, right), ArrowType::Extension(name, _, _) => { diff --git a/src/query/datablocks/src/kernels/mod.rs b/src/query/datablocks/src/kernels/mod.rs index e06c9ca7d698f..6fdff2931d3a5 100644 --- a/src/query/datablocks/src/kernels/mod.rs +++ b/src/query/datablocks/src/kernels/mod.rs @@ -23,5 +23,5 @@ mod data_block_sort; mod data_block_take; pub use data_block_group_by_hash::*; -pub use data_block_sort::build_compare; +pub use data_block_sort::ColumnsDynComparator; pub use data_block_sort::SortColumnDescription; diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index 0f7103981437a..69234ebadff13 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -9,7 +9,6 @@ doctest = false test = false [dependencies] -common-arrow = { path = "../../../common/arrow" } common-catalog = { path = "../../catalog" } common-datablocks = { path = "../../datablocks" } common-exception = { path = "../../../common/exception" } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 16837e2fa594c..be890c6a36223 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -13,18 +13,15 @@ // limitations under the License. use std::any::Any; -use std::cell::RefCell; use std::cmp::Ordering; use std::collections::BinaryHeap; -use std::collections::HashMap; use std::collections::VecDeque; use std::sync; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::sync::RwLock; -use common_arrow::arrow::array::ord::DynComparator; -use common_arrow::ArrayRef; -use common_datablocks::build_compare; +use common_datablocks::ColumnsDynComparator; use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_datavalues::ColumnRef; @@ -82,26 +79,20 @@ struct Cursor { pub input_index: usize, pub row_index: usize, - /// block_id is a global unique id. Not the sequence number of its input. - pub block_id: usize, num_rows: usize, sort_columns: Vec, sort_columns_descriptions: Vec, - /// A collection of comparators that compare rows in this cursors' block - /// to the cursors in other blocks. - /// - /// Other blocks are uniquely identified by their input_index and block_index. - comparators: RefCell)>>, + compare_map: Arc>, } impl Cursor { pub fn try_create( input_index: usize, - block_index: usize, block: &DataBlock, sort_columns_descriptions: Vec, + compare_map: Arc>, ) -> Result { let sort_columns = sort_columns_descriptions .iter() @@ -112,12 +103,11 @@ impl Cursor { .collect::>>()?; Ok(Cursor { input_index, - block_id: block_index, row_index: 0, sort_columns_descriptions, sort_columns, num_rows: block.num_rows(), - comparators: RefCell::new(HashMap::new()), + compare_map, }) } @@ -141,32 +131,17 @@ impl Cursor { other.sort_columns.len() ))); } - let zipped = self + let compare_map = self.compare_map.read().unwrap(); + let comparators = &compare_map[self.input_index][other.input_index]; + + for (i, ((l, r), option)) in self .sort_columns .iter() - .map(|c| c.as_arrow_array(c.data_type())) - .zip( - other - .sort_columns - .iter() - .map(|c| c.as_arrow_array(c.data_type())), - ) + .zip(other.sort_columns.iter()) .zip(self.sort_columns_descriptions.iter()) - .collect::>(); - self.add_block_comparators(other, &zipped)?; - let comparators = self.comparators.borrow(); - let comparators = &comparators - .get(&other.input_index) - .ok_or_else(|| { - ErrorCode::LogicalError(format!( - "Cannot find comparators to compare {}.{} with {}.{}", - self.input_index, self.block_id, other.input_index, other.block_id - )) - })? - .1; - - for (i, ((l, r), option)) in zipped.iter().enumerate() { - match (l.is_valid(self.row_index), r.is_valid(other.row_index)) { + .enumerate() + { + match (l.null_at(self.row_index), r.null_at(other.row_index)) { (false, true) if option.nulls_first => return Ok(Ordering::Less), (false, true) => return Ok(Ordering::Greater), (true, false) if option.nulls_first => return Ok(Ordering::Greater), @@ -183,33 +158,6 @@ impl Cursor { // If all columns are equal, compare the input index. Ok(self.input_index.cmp(&other.input_index)) } - - /// update comparators if there comes a new block. - pub fn add_block_comparators( - &self, - other: &Cursor, - zipped: &[((ArrayRef, ArrayRef), &SortColumnDescription)], - ) -> Result<()> { - let mut comparators = self.comparators.borrow_mut(); - if let Some(cmps) = comparators.get_mut(&other.input_index) { - // comparator of this block is already exisits. - if cmps.0 == other.block_id { - return Ok(()); - } - let mut compares = Vec::with_capacity(other.sort_columns.len()); - for ((l, r), _) in zipped.iter() { - compares.push(build_compare(&**l, &**r)?) - } - cmps.1 = compares; - } else { - let mut compares = Vec::with_capacity(other.sort_columns.len()); - for ((l, r), _) in zipped.iter() { - compares.push(build_compare(&**l, &**r)?) - } - comparators.insert(other.input_index, (other.block_id, compares)); - } - Ok(()) - } } impl Ord for Cursor { @@ -232,6 +180,20 @@ impl PartialOrd for Cursor { } } +type CompareMap = Vec>; + +fn create_compare_map(n: usize) -> CompareMap { + let mut res = Vec::with_capacity(n); + for _ in 0..n { + let mut inner = Vec::with_capacity(n); + for _ in 0..n { + inner.push(Vec::new()); + } + res.push(inner); + } + res +} + /// TransformMultiSortMerge is a processor with multiple input ports; pub struct MultiSortMergeProcessor { /// Data from inputs (every input is sorted) @@ -246,6 +208,11 @@ pub struct MultiSortMergeProcessor { /// For each input port, maintain a dequeue of data blocks. blocks: Vec>, + /// Compare blocks from different inputs. + /// + /// There are only one block in the heap for each input port at the same time. + /// So we can use a 2d vector to store the comparators. + compare_map: Arc>, /// Maintain a flag for each input denoting if the current cursor has finished /// and needs to pull data from input. cursor_finished: Vec, @@ -255,8 +222,6 @@ pub struct MultiSortMergeProcessor { in_progess_rows: Vec<(usize, usize, usize)>, /// Heap that yields [`Cursor`] in increasing order. heap: BinaryHeap, - /// Assign every input block a unique id. - cur_block_index: usize, /// If the input port is finished. input_finished: Vec, @@ -274,7 +239,8 @@ impl MultiSortMergeProcessor { limit: Option, sort_columns_descriptions: Vec, ) -> Self { - let intput_size = inputs.len(); + let input_size = inputs.len(); + Self { inputs, output, @@ -282,12 +248,12 @@ impl MultiSortMergeProcessor { block_size, limit, sort_columns_descriptions, - blocks: vec![VecDeque::new(); intput_size], - heap: BinaryHeap::with_capacity(intput_size), + blocks: vec![VecDeque::with_capacity(2); input_size], + compare_map: Arc::new(RwLock::new(create_compare_map(input_size))), + heap: BinaryHeap::with_capacity(input_size), in_progess_rows: vec![], - cursor_finished: vec![true; intput_size], - cur_block_index: 0, - input_finished: vec![false; intput_size], + cursor_finished: vec![true; input_size], + input_finished: vec![false; input_size], state: ProcessorState::Consume, aborting: Arc::new(AtomicBool::new(false)), } @@ -319,6 +285,7 @@ impl MultiSortMergeProcessor { fn drain_heap(&mut self) { let nums_active_inputs = self.nums_active_inputs(); + let mut need_output = false; // Need to pop data to in_progess_rows. // Use `>=` because some of the input ports may be finished, but the data is still in the heap. while self.heap.len() >= nums_active_inputs { @@ -337,8 +304,7 @@ impl MultiSortMergeProcessor { .push((input_index, block_index, row_index)); // Reach the block size, need to output. if self.in_progess_rows.len() >= self.block_size { - self.state = ProcessorState::Output; - break; + need_output = true; } } None => { @@ -352,12 +318,15 @@ impl MultiSortMergeProcessor { } } } + if need_output { + self.state = ProcessorState::Output; + } } /// Drain `self.in_progess_rows` to build a output data block. fn build_block(&mut self) -> Result { let num_rows = self.in_progess_rows.len(); - debug_assert!(num_rows > 0 && num_rows <= self.block_size); + debug_assert!(num_rows > 0); let mut blocks_num_pre_sum = Vec::with_capacity(self.blocks.len()); let mut len = 0; @@ -424,6 +393,21 @@ impl MultiSortMergeProcessor { Ok(DataBlock::create(self.output_schema.clone(), columns)) } + + /// Add comparators for newly come data blocks. + fn build_compare_map(&mut self, blocks: &[(usize, DataBlock)]) -> Result<()> { + for i in 0..self.inputs.len() { + for (j, right) in blocks { + if i != *j && !self.blocks[i].is_empty() { + let left = self.blocks[i].back().unwrap(); + let comparators = DataBlock::build_compare(left, right)?; + let mut cmp = self.compare_map.write().unwrap(); + cmp[i][*j] = comparators; + } + } + } + Ok(()) + } } #[async_trait::async_trait] @@ -508,18 +492,18 @@ impl Processor for MultiSortMergeProcessor { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, ProcessorState::Consume) { ProcessorState::Preserve(blocks) => { + self.build_compare_map(&blocks)?; for (input_index, block) in blocks.into_iter() { if !block.is_empty() { let cursor = Cursor::try_create( input_index, - self.cur_block_index, &block, self.sort_columns_descriptions.clone(), + self.compare_map.clone(), )?; self.heap.push(cursor); self.cursor_finished[input_index] = false; self.blocks[input_index].push_back(block); - self.cur_block_index += 1; } } self.drain_heap(); From 8e155e023794d65a1d1923bd0f3ca1dc970016af Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Thu, 27 Oct 2022 21:46:53 +0800 Subject: [PATCH 3/8] Fix order bugs. --- .../transforms/transform_multi_sort_merge.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index be890c6a36223..4382c80834818 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -141,7 +141,7 @@ impl Cursor { .zip(self.sort_columns_descriptions.iter()) .enumerate() { - match (l.null_at(self.row_index), r.null_at(other.row_index)) { + match (!l.null_at(self.row_index), !r.null_at(other.row_index)) { (false, true) if option.nulls_first => return Ok(Ordering::Less), (false, true) => return Ok(Ordering::Greater), (true, false) if option.nulls_first => return Ok(Ordering::Greater), @@ -149,8 +149,8 @@ impl Cursor { (false, false) => {} (true, true) => match comparators[i](self.row_index, other.row_index) { Ordering::Equal => {} - o if !option.asc => return Ok(o.reverse()), - o => return Ok(o), + o if !option.asc => Ok(o.reverse()), + o => Ok(o), }, } } @@ -279,7 +279,8 @@ impl MultiSortMergeProcessor { fn nums_active_inputs(&self) -> usize { self.input_finished .iter() - .filter(|&&finished| !finished) + .zip(self.cursor_finished.iter()) + .filter(|(f, c)| !**f || !**c) .count() } @@ -400,9 +401,11 @@ impl MultiSortMergeProcessor { for (j, right) in blocks { if i != *j && !self.blocks[i].is_empty() { let left = self.blocks[i].back().unwrap(); - let comparators = DataBlock::build_compare(left, right)?; let mut cmp = self.compare_map.write().unwrap(); + let comparators = DataBlock::build_compare(left, right)?; cmp[i][*j] = comparators; + let comparators = DataBlock::build_compare(right, left)?; + cmp[*j][i] = comparators; } } } @@ -492,6 +495,9 @@ impl Processor for MultiSortMergeProcessor { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, ProcessorState::Consume) { ProcessorState::Preserve(blocks) => { + for (input_index, block) in blocks.iter() { + self.blocks[*input_index].push_back(block.clone()); + } self.build_compare_map(&blocks)?; for (input_index, block) in blocks.into_iter() { if !block.is_empty() { @@ -503,7 +509,6 @@ impl Processor for MultiSortMergeProcessor { )?; self.heap.push(cursor); self.cursor_finished[input_index] = false; - self.blocks[input_index].push_back(block); } } self.drain_heap(); From c3cc57b02a3d28cd0ad353554c3eba6263d980e9 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Fri, 28 Oct 2022 13:08:40 +0800 Subject: [PATCH 4/8] Fix clippy and make `limit` works. --- .../transforms/transform_multi_sort_merge.rs | 40 +++++++++++++++---- .../transforms/transform_sort_merge.rs | 4 +- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 4382c80834818..39366a9513525 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -149,8 +149,12 @@ impl Cursor { (false, false) => {} (true, true) => match comparators[i](self.row_index, other.row_index) { Ordering::Equal => {} - o if !option.asc => Ok(o.reverse()), - o => Ok(o), + o if !option.asc => { + return Ok(o.reverse()); + } + o => { + return Ok(o); + } }, } } @@ -240,7 +244,6 @@ impl MultiSortMergeProcessor { sort_columns_descriptions: Vec, ) -> Self { let input_size = inputs.len(); - Self { inputs, output, @@ -267,10 +270,8 @@ impl MultiSortMergeProcessor { continue; } input.set_need_data(); - if self.cursor_finished[i] { - if input.has_data() { - data.push((i, input.pull_data().unwrap()?)); - } + if self.cursor_finished[i] && input.has_data() { + data.push((i, input.pull_data().unwrap()?)); } } Ok(data) @@ -303,6 +304,12 @@ impl MultiSortMergeProcessor { let block_index = self.blocks[input_index].len() - 1; self.in_progess_rows .push((input_index, block_index, row_index)); + if let Some(limit) = self.limit { + if self.in_progess_rows.len() == limit { + need_output = true; + break; + } + } // Reach the block size, need to output. if self.in_progess_rows.len() >= self.block_size { need_output = true; @@ -446,10 +453,27 @@ impl Processor for MultiSortMergeProcessor { return Ok(Event::NeedConsume); } + if let Some(limit) = self.limit { + if limit == 0 { + for input in self.inputs.iter() { + input.finish(); + } + self.output.finish(); + return Ok(Event::Finished); + } + } + if matches!(self.state, ProcessorState::Generated(_)) { if let ProcessorState::Generated(data_block) = std::mem::replace(&mut self.state, ProcessorState::Consume) { + self.limit = self.limit.map(|limit| { + if data_block.num_rows() > limit { + 0 + } else { + limit - data_block.num_rows() + } + }); self.output.push_data(Ok(data_block)); return Ok(Event::NeedConsume); } @@ -475,7 +499,7 @@ impl Processor for MultiSortMergeProcessor { return Ok(Event::Sync); } self.output.finish(); - return Ok(Event::Finished); + Ok(Event::Finished) } else { // `data_blocks` is empty if !self.heap.is_empty() { diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index 90c0f18b36b6f..72dbf4b6b16de 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -71,7 +71,7 @@ impl Compactor for SortMergeCompactor { // split block by `self.block_size` let num_rows = block.num_rows(); let num_blocks = - num_rows / self.block_size + if num_rows % self.block_size > 0 { 1 } else { 0 }; + num_rows / self.block_size + usize::from(num_rows % self.block_size > 0); let mut start = 0; let mut output = Vec::with_capacity(num_blocks); for _ in 0..num_blocks { @@ -79,7 +79,7 @@ impl Compactor for SortMergeCompactor { let block = DataBlock::block_take_by_slices_limit( &block, (start, end - start), - self.limit.clone(), + self.limit, )?; start = end; output.push(block); From d7880ec7c047483c000048a726c30ec4cefd2f27 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Fri, 28 Oct 2022 15:58:17 +0800 Subject: [PATCH 5/8] Fix bugs when call `DataBlock::build_compare`. Add fix logictest results in suites/crdb/order_by. --- .../datablocks/src/kernels/data_block_sort.rs | 21 +++++++++++-------- .../transforms/transform_multi_sort_merge.rs | 6 ++++-- .../base/03_dml/03_0004_select_order_by | 15 +++++++++++++ tests/logictest/suites/crdb/order_by | 12 +++++------ 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index 0e30855aca191..3fb7038488726 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -206,15 +206,18 @@ impl DataBlock { } } - pub fn build_compare(left: &DataBlock, right: &DataBlock) -> Result { - let fields = left.schema().fields(); - let mut res = Vec::with_capacity(fields.len()); - for f in fields { - let l = left.try_column_by_name(f.name())?; - let l = l.as_arrow_array(f.data_type().clone()); - - let r = right.try_column_by_name(f.name())?; - let r = r.as_arrow_array(f.data_type().clone()); + pub fn build_compare( + left: &DataBlock, + right: &DataBlock, + sort_column_descriptions: &[SortColumnDescription], + ) -> Result { + let mut res = Vec::with_capacity(sort_column_descriptions.len()); + for SortColumnDescription { column_name, .. } in sort_column_descriptions { + let l = left.try_column_by_name(column_name)?; + let l = l.as_arrow_array(l.data_type()); + + let r = right.try_column_by_name(column_name)?; + let r = r.as_arrow_array(r.data_type()); let cmp = build_compare(&*l, &*r)?; res.push(cmp); } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 39366a9513525..11af9da554cf2 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -409,9 +409,11 @@ impl MultiSortMergeProcessor { if i != *j && !self.blocks[i].is_empty() { let left = self.blocks[i].back().unwrap(); let mut cmp = self.compare_map.write().unwrap(); - let comparators = DataBlock::build_compare(left, right)?; + let comparators = + DataBlock::build_compare(left, right, &self.sort_columns_descriptions)?; cmp[i][*j] = comparators; - let comparators = DataBlock::build_compare(right, left)?; + let comparators = + DataBlock::build_compare(right, left, &self.sort_columns_descriptions)?; cmp[*j][i] = comparators; } } diff --git a/tests/logictest/suites/base/03_dml/03_0004_select_order_by b/tests/logictest/suites/base/03_dml/03_0004_select_order_by index 3abb57e5fe4f7..0453ace59f374 100644 --- a/tests/logictest/suites/base/03_dml/03_0004_select_order_by +++ b/tests/logictest/suites/base/03_dml/03_0004_select_order_by @@ -140,3 +140,18 @@ SELECT id, arr FROM t3 ORDER BY arr DESC; statement ok DROP TABLE t3; +statement query I +SELECT number FROM numbers(10000) ORDER BY number LIMIT 3; + +---- +0 +1 +2 + +statement query I +SELECT number FROM numbers(10000) ORDER BY number DESC LIMIT 3; + +---- +9999 +9998 +9997 diff --git a/tests/logictest/suites/crdb/order_by b/tests/logictest/suites/crdb/order_by index f3519ce3c1334..f8b69912466f2 100644 --- a/tests/logictest/suites/crdb/order_by +++ b/tests/logictest/suites/crdb/order_by @@ -387,8 +387,8 @@ SELECT x, y FROM xy ORDER BY x NULLS LAST, y DESC NULLS FIRST; 2 NULL 2 5 4 8 -NULL 6 NULL NULL +NULL 6 statement query II SELECT x, y FROM xy ORDER BY x, y; @@ -407,8 +407,8 @@ SELECT x, y FROM xy ORDER BY x, y DESC NULLS FIRST; 2 NULL 2 5 4 8 -NULL 6 NULL NULL +NULL 6 statement query IT SELECT x, y FROM xy ORDER BY x NULLS LAST, y DESC NULLS FIRST; @@ -417,15 +417,15 @@ SELECT x, y FROM xy ORDER BY x NULLS LAST, y DESC NULLS FIRST; 2 NULL 2 5 4 8 -NULL 6 NULL NULL +NULL 6 statement query TT SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC NULLS LAST; ---- -NULL NULL NULL 6 +NULL NULL 2 5 2 NULL 4 8 @@ -434,8 +434,8 @@ statement query TT SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC; ---- -NULL NULL NULL 6 +NULL NULL 2 5 2 NULL 4 8 @@ -444,8 +444,8 @@ statement query TI SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC NULLS FIRST; ---- -NULL 6 NULL NULL +NULL 6 2 NULL 2 5 4 8 From 74adf2fc6233b0a6d13296971194cf2eb64f21f2 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Tue, 1 Nov 2022 13:50:44 +0800 Subject: [PATCH 6/8] Use a comparable row format. --- Cargo.lock | 7 +- src/common/arrow/Cargo.toml | 2 +- .../datablocks/src/kernels/data_block_sort.rs | 20 -- src/query/datablocks/src/kernels/mod.rs | 1 - src/query/pipeline/transforms/Cargo.toml | 3 +- .../transforms/transform_multi_sort_merge.rs | 187 ++++++------------ 6 files changed, 68 insertions(+), 152 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f8885b223d4c..8de20142891d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,8 +164,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee6f62e41078c967a4c063fcbdfd3801a2a9632276402c045311c4d73d0845f3" +source = "git+https://github.com/RinChanNOWWW/arrow2?rev=78c61d5#78c61d5f1ce43d7bdb13be35a834e585f57230fc" dependencies = [ "ahash 0.7.6", "arrow-format", @@ -181,6 +180,7 @@ dependencies = [ "foreign_vec", "futures", "hash_hasher", + "hashbrown 0.12.3", "indexmap", "itertools", "json-deserializer", @@ -1875,8 +1875,9 @@ name = "common-pipeline-transforms" version = "0.1.0" dependencies = [ "async-trait-fn", - "common-catalog", + "common-arrow", "common-datablocks", + "common-datavalues", "common-exception", "common-pipeline-core", ] diff --git a/src/common/arrow/Cargo.toml b/src/common/arrow/Cargo.toml index fdda7be6e6269..af6381f5323b8 100644 --- a/src/common/arrow/Cargo.toml +++ b/src/common/arrow/Cargo.toml @@ -34,7 +34,7 @@ simd = ["arrow/simd"] # Workspace dependencies # Crates.io dependencies -arrow = { package = "arrow2", version = "0.14.0", default-features = false, features = [ +arrow = { package = "arrow2", git = "https://github.com/RinChanNOWWW/arrow2", rev = "78c61d5", default-features = false, features = [ "io_parquet", "io_parquet_compression", ] } diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index 3fb7038488726..bd818263b2b91 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -205,28 +205,8 @@ impl DataBlock { } } } - - pub fn build_compare( - left: &DataBlock, - right: &DataBlock, - sort_column_descriptions: &[SortColumnDescription], - ) -> Result { - let mut res = Vec::with_capacity(sort_column_descriptions.len()); - for SortColumnDescription { column_name, .. } in sort_column_descriptions { - let l = left.try_column_by_name(column_name)?; - let l = l.as_arrow_array(l.data_type()); - - let r = right.try_column_by_name(column_name)?; - let r = r.as_arrow_array(r.data_type()); - let cmp = build_compare(&*l, &*r)?; - res.push(cmp); - } - Ok(res) - } } -pub type ColumnsDynComparator = Vec; - fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult { let left = VariantColumn::from_arrow_array(left); let right = VariantColumn::from_arrow_array(right); diff --git a/src/query/datablocks/src/kernels/mod.rs b/src/query/datablocks/src/kernels/mod.rs index 6fdff2931d3a5..5e635860da1ea 100644 --- a/src/query/datablocks/src/kernels/mod.rs +++ b/src/query/datablocks/src/kernels/mod.rs @@ -23,5 +23,4 @@ mod data_block_sort; mod data_block_take; pub use data_block_group_by_hash::*; -pub use data_block_sort::ColumnsDynComparator; pub use data_block_sort::SortColumnDescription; diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index 69234ebadff13..ae2e68bd2c70f 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -9,8 +9,9 @@ doctest = false test = false [dependencies] -common-catalog = { path = "../../catalog" } +common-arrow = { path = "../../../common/arrow" } common-datablocks = { path = "../../datablocks" } +common-datavalues = { path = "../../datavalues" } common-exception = { path = "../../../common/exception" } common-pipeline-core = { path = "../core" } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 11af9da554cf2..9f7d69fa645e1 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -14,17 +14,20 @@ use std::any::Any; use std::cmp::Ordering; +use std::cmp::Reverse; use std::collections::BinaryHeap; use std::collections::VecDeque; use std::sync; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::sync::RwLock; -use common_datablocks::ColumnsDynComparator; +use common_arrow::arrow::compute::sort::row::Row; +use common_arrow::arrow::compute::sort::row::RowConverter; +use common_arrow::arrow::compute::sort::row::Rows; +use common_arrow::arrow::compute::sort::row::SortField; +use common_arrow::arrow::compute::sort::SortOptions; use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; -use common_datavalues::ColumnRef; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; @@ -63,7 +66,7 @@ pub fn try_add_multi_sort_merge( block_size, limit, sort_columns_descriptions, - ); + )?; pipeline.pipes.push(Pipe::ResizePipe { inputs_port, outputs_port: vec![output_port], @@ -81,34 +84,17 @@ struct Cursor { num_rows: usize, - sort_columns: Vec, - sort_columns_descriptions: Vec, - - compare_map: Arc>, + rows: Rows, } impl Cursor { - pub fn try_create( - input_index: usize, - block: &DataBlock, - sort_columns_descriptions: Vec, - compare_map: Arc>, - ) -> Result { - let sort_columns = sort_columns_descriptions - .iter() - .map(|f| { - let c = block.try_column_by_name(&f.column_name)?; - Ok(c.clone()) - }) - .collect::>>()?; - Ok(Cursor { + pub fn try_create(input_index: usize, rows: Rows) -> Cursor { + Cursor { input_index, row_index: 0, - sort_columns_descriptions, - sort_columns, - num_rows: block.num_rows(), - compare_map, - }) + num_rows: rows.num_rows(), + rows, + } } #[inline] @@ -123,56 +109,22 @@ impl Cursor { self.num_rows == self.row_index } - pub fn compare(&self, other: &Cursor) -> Result { - if self.sort_columns.len() != other.sort_columns.len() { - return Err(ErrorCode::LogicalError(format!( - "Sort columns length not match: {} != {}", - self.sort_columns.len(), - other.sort_columns.len() - ))); - } - let compare_map = self.compare_map.read().unwrap(); - let comparators = &compare_map[self.input_index][other.input_index]; - - for (i, ((l, r), option)) in self - .sort_columns - .iter() - .zip(other.sort_columns.iter()) - .zip(self.sort_columns_descriptions.iter()) - .enumerate() - { - match (!l.null_at(self.row_index), !r.null_at(other.row_index)) { - (false, true) if option.nulls_first => return Ok(Ordering::Less), - (false, true) => return Ok(Ordering::Greater), - (true, false) if option.nulls_first => return Ok(Ordering::Greater), - (true, false) => return Ok(Ordering::Less), - (false, false) => {} - (true, true) => match comparators[i](self.row_index, other.row_index) { - Ordering::Equal => {} - o if !option.asc => { - return Ok(o.reverse()); - } - o => { - return Ok(o); - } - }, - } - } - - // If all columns are equal, compare the input index. - Ok(self.input_index.cmp(&other.input_index)) + fn current(&self) -> Row<'_> { + self.rows.row(self.row_index) } } impl Ord for Cursor { fn cmp(&self, other: &Self) -> Ordering { - self.compare(other).unwrap() + self.current() + .cmp(&other.current()) + .then_with(|| self.input_index.cmp(&other.input_index)) } } impl PartialEq for Cursor { fn eq(&self, other: &Self) -> bool { - other.compare(self).unwrap() == Ordering::Equal + self.current() == other.current() } } @@ -180,43 +132,25 @@ impl Eq for Cursor {} impl PartialOrd for Cursor { fn partial_cmp(&self, other: &Self) -> Option { - other.compare(self).ok() + Some(self.cmp(other)) } } -type CompareMap = Vec>; - -fn create_compare_map(n: usize) -> CompareMap { - let mut res = Vec::with_capacity(n); - for _ in 0..n { - let mut inner = Vec::with_capacity(n); - for _ in 0..n { - inner.push(Vec::new()); - } - res.push(inner); - } - res -} - /// TransformMultiSortMerge is a processor with multiple input ports; pub struct MultiSortMergeProcessor { /// Data from inputs (every input is sorted) inputs: Vec>, output: Arc, output_schema: DataSchemaRef, + /// Sort fields' indices in `output_schema` + sort_field_indices: Vec, // Parameters block_size: usize, limit: Option, - sort_columns_descriptions: Vec, /// For each input port, maintain a dequeue of data blocks. blocks: Vec>, - /// Compare blocks from different inputs. - /// - /// There are only one block in the heap for each input port at the same time. - /// So we can use a 2d vector to store the comparators. - compare_map: Arc>, /// Maintain a flag for each input denoting if the current cursor has finished /// and needs to pull data from input. cursor_finished: Vec, @@ -225,9 +159,11 @@ pub struct MultiSortMergeProcessor { /// Data format: (input_index, block_index, row_index) in_progess_rows: Vec<(usize, usize, usize)>, /// Heap that yields [`Cursor`] in increasing order. - heap: BinaryHeap, + heap: BinaryHeap>, /// If the input port is finished. input_finished: Vec, + /// Used to convert columns to rows. + row_converter: RowConverter, state: ProcessorState, @@ -242,24 +178,41 @@ impl MultiSortMergeProcessor { block_size: usize, limit: Option, sort_columns_descriptions: Vec, - ) -> Self { + ) -> Result { let input_size = inputs.len(); - Self { + let mut sort_field_indices = Vec::with_capacity(sort_columns_descriptions.len()); + let sort_fields = sort_columns_descriptions + .iter() + .map(|d| { + let data_type = output_schema + .field_with_name(&d.column_name)? + .to_arrow() + .data_type() + .clone(); + sort_field_indices.push(output_schema.index_of(&d.column_name)?); + Ok(SortField::new_with_options(data_type, SortOptions { + descending: !d.asc, + nulls_first: d.nulls_first, + })) + }) + .collect::>>()?; + let row_converter = RowConverter::new(sort_fields); + Ok(Self { inputs, output, output_schema, + sort_field_indices, block_size, limit, - sort_columns_descriptions, blocks: vec![VecDeque::with_capacity(2); input_size], - compare_map: Arc::new(RwLock::new(create_compare_map(input_size))), heap: BinaryHeap::with_capacity(input_size), in_progess_rows: vec![], cursor_finished: vec![true; input_size], input_finished: vec![false; input_size], + row_converter, state: ProcessorState::Consume, aborting: Arc::new(AtomicBool::new(false)), - } + }) } fn get_data_blocks(&mut self) -> Result> { @@ -292,11 +245,11 @@ impl MultiSortMergeProcessor { // Use `>=` because some of the input ports may be finished, but the data is still in the heap. while self.heap.len() >= nums_active_inputs { match self.heap.pop() { - Some(mut cursor) => { + Some(Reverse(mut cursor)) => { let input_index = cursor.input_index; let row_index = cursor.advance(); if !cursor.is_finished() { - self.heap.push(cursor); + self.heap.push(Reverse(cursor)); } else { // We have read all rows of this block, need to read a new one. self.cursor_finished[input_index] = true; @@ -401,25 +354,6 @@ impl MultiSortMergeProcessor { Ok(DataBlock::create(self.output_schema.clone(), columns)) } - - /// Add comparators for newly come data blocks. - fn build_compare_map(&mut self, blocks: &[(usize, DataBlock)]) -> Result<()> { - for i in 0..self.inputs.len() { - for (j, right) in blocks { - if i != *j && !self.blocks[i].is_empty() { - let left = self.blocks[i].back().unwrap(); - let mut cmp = self.compare_map.write().unwrap(); - let comparators = - DataBlock::build_compare(left, right, &self.sort_columns_descriptions)?; - cmp[i][*j] = comparators; - let comparators = - DataBlock::build_compare(right, left, &self.sort_columns_descriptions)?; - cmp[*j][i] = comparators; - } - } - } - Ok(()) - } } #[async_trait::async_trait] @@ -521,20 +455,21 @@ impl Processor for MultiSortMergeProcessor { fn process(&mut self) -> Result<()> { match std::mem::replace(&mut self.state, ProcessorState::Consume) { ProcessorState::Preserve(blocks) => { - for (input_index, block) in blocks.iter() { - self.blocks[*input_index].push_back(block.clone()); - } - self.build_compare_map(&blocks)?; for (input_index, block) in blocks.into_iter() { + let columns = self + .sort_field_indices + .iter() + .map(|i| { + let col = block.column(*i); + col.as_arrow_array(col.data_type()) + }) + .collect::>(); + let rows = self.row_converter.convert_columns(&columns)?; if !block.is_empty() { - let cursor = Cursor::try_create( - input_index, - &block, - self.sort_columns_descriptions.clone(), - self.compare_map.clone(), - )?; - self.heap.push(cursor); + let cursor = Cursor::try_create(input_index, rows); + self.heap.push(Reverse(cursor)); self.cursor_finished[input_index] = false; + self.blocks[input_index].push_back(block); } } self.drain_heap(); From 36856a64d14a2ee8c5b0c576e4ee85a37780b58a Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Tue, 1 Nov 2022 16:25:25 +0800 Subject: [PATCH 7/8] The date type of date and timestap will be number types. --- .../transforms/transform_multi_sort_merge.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 9f7d69fa645e1..425e4a9751a0b 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -26,6 +26,7 @@ use common_arrow::arrow::compute::sort::row::RowConverter; use common_arrow::arrow::compute::sort::row::Rows; use common_arrow::arrow::compute::sort::row::SortField; use common_arrow::arrow::compute::sort::SortOptions; +use common_arrow::arrow::datatypes::DataType; use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_datavalues::DataSchemaRef; @@ -184,11 +185,18 @@ impl MultiSortMergeProcessor { let sort_fields = sort_columns_descriptions .iter() .map(|d| { - let data_type = output_schema + let data_type = match output_schema .field_with_name(&d.column_name)? .to_arrow() .data_type() - .clone(); + { + // The actual data type of `Data` and `Timestmap` will be `Int32` and `Int64`. + DataType::Date32 | DataType::Time32(_) => DataType::Int32, + DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) => { + DataType::Int64 + } + date_type => date_type.clone(), + }; sort_field_indices.push(output_schema.index_of(&d.column_name)?); Ok(SortField::new_with_options(data_type, SortOptions { descending: !d.asc, From 2951f0154f24759b5f30e75b147763dfc588d21d Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Wed, 2 Nov 2022 19:14:40 +0800 Subject: [PATCH 8/8] Remove order by array test cases. --- Cargo.lock | 3 ++- src/common/arrow/Cargo.toml | 2 +- .../transforms/transform_multi_sort_merge.rs | 10 ++++----- .../base/03_dml/03_0003_select_group_by | 8 ------- .../base/03_dml/03_0004_select_order_by | 22 ------------------- .../base/03_dml/03_0023_insert_into_array | 4 ++-- 6 files changed, 10 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8de20142891d3..7b4855f5c6695 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.14.2" -source = "git+https://github.com/RinChanNOWWW/arrow2?rev=78c61d5#78c61d5f1ce43d7bdb13be35a834e585f57230fc" +source = "git+https://github.com/RinChanNOWWW/arrow2?rev=8bd6417#8bd6417392a44496a2f9ca649f9cd2aa65dcc277" dependencies = [ "ahash 0.7.6", "arrow-format", @@ -190,6 +190,7 @@ dependencies = [ "parquet2", "regex", "regex-syntax", + "rustc_version 0.4.0", "simdutf8", "streaming-iterator", "strength_reduce", diff --git a/src/common/arrow/Cargo.toml b/src/common/arrow/Cargo.toml index af6381f5323b8..cda84464a9365 100644 --- a/src/common/arrow/Cargo.toml +++ b/src/common/arrow/Cargo.toml @@ -34,7 +34,7 @@ simd = ["arrow/simd"] # Workspace dependencies # Crates.io dependencies -arrow = { package = "arrow2", git = "https://github.com/RinChanNOWWW/arrow2", rev = "78c61d5", default-features = false, features = [ +arrow = { package = "arrow2", git = "https://github.com/RinChanNOWWW/arrow2", rev = "8bd6417", default-features = false, features = [ "io_parquet", "io_parquet_compression", ] } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 425e4a9751a0b..e167176ed6950 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -48,9 +48,9 @@ pub fn try_add_multi_sort_merge( sort_columns_descriptions: Vec, ) -> Result<()> { match pipeline.pipes.last() { - None => Err(ErrorCode::LogicalError("Cannot resize empty pipe.")), + None => Err(ErrorCode::Internal("Cannot resize empty pipe.")), Some(pipe) if pipe.output_size() == 0 => { - Err(ErrorCode::LogicalError("Cannot resize empty pipe.")) + Err(ErrorCode::Internal("Cannot resize empty pipe.")) } Some(pipe) if pipe.output_size() == 1 => Ok(()), Some(pipe) => { @@ -93,7 +93,7 @@ impl Cursor { Cursor { input_index, row_index: 0, - num_rows: rows.num_rows(), + num_rows: rows.len(), rows, } } @@ -456,7 +456,7 @@ impl Processor for MultiSortMergeProcessor { } } ProcessorState::Output => Ok(Event::Sync), - _ => Err(ErrorCode::LogicalError("It's a bug.")), + _ => Err(ErrorCode::Internal("It's a bug.")), } } @@ -488,7 +488,7 @@ impl Processor for MultiSortMergeProcessor { self.state = ProcessorState::Generated(block); Ok(()) } - _ => Err(ErrorCode::LogicalError("It's a bug.")), + _ => Err(ErrorCode::Internal("It's a bug.")), } } } diff --git a/tests/logictest/suites/base/03_dml/03_0003_select_group_by b/tests/logictest/suites/base/03_dml/03_0003_select_group_by index a0f0fb5e19477..2a57a04c86e95 100644 --- a/tests/logictest/suites/base/03_dml/03_0003_select_group_by +++ b/tests/logictest/suites/base/03_dml/03_0003_select_group_by @@ -167,14 +167,6 @@ CREATE TABLE IF NOT EXISTS t_array(id Int null, arr Array(Int32) null) Engine = statement ok INSERT INTO t_array VALUES(1, []), (2, []), (3, [1,2,3]), (4, [1,2,3]), (5, [4,5,6]), (6, [4,5,6]); -statement query IIT -SELECT max(id), min(id), arr FROM t_array GROUP BY arr ORDER BY arr ASC; - ----- -2 1 [] -4 3 [1, 2, 3] -6 5 [4, 5, 6] - statement query I select count() from numbers(10) group by 'ab'; diff --git a/tests/logictest/suites/base/03_dml/03_0004_select_order_by b/tests/logictest/suites/base/03_dml/03_0004_select_order_by index 0453ace59f374..c0c9d2d322124 100644 --- a/tests/logictest/suites/base/03_dml/03_0004_select_order_by +++ b/tests/logictest/suites/base/03_dml/03_0004_select_order_by @@ -115,28 +115,6 @@ CREATE TABLE IF NOT EXISTS t3(id Int null, arr Array(Int32) null) Engine = Fuse; statement ok INSERT INTO t3 VALUES(1, [1,2,3]), (2, [1,2,4]), (3, []), (4, [3,4,5]), (5, [4]), (6, [4,5]); -statement query IT -SELECT id, arr FROM t3 ORDER BY arr ASC; - ----- -3 [] -1 [1, 2, 3] -2 [1, 2, 4] -4 [3, 4, 5] -5 [4] -6 [4, 5] - -statement query IT -SELECT id, arr FROM t3 ORDER BY arr DESC; - ----- -6 [4, 5] -5 [4] -4 [3, 4, 5] -2 [1, 2, 4] -1 [1, 2, 3] -3 [] - statement ok DROP TABLE t3; diff --git a/tests/logictest/suites/base/03_dml/03_0023_insert_into_array b/tests/logictest/suites/base/03_dml/03_0023_insert_into_array index 1341e89222b9c..e55ae5cc13d35 100644 --- a/tests/logictest/suites/base/03_dml/03_0023_insert_into_array +++ b/tests/logictest/suites/base/03_dml/03_0023_insert_into_array @@ -245,7 +245,7 @@ statement ok INSERT INTO t12 (id, arr) VALUES(1, ['2021-01-01', '2022-01-01']), (2, ['1990-12-01', '2030-01-12']); statement error 1002 -INSERT INTO t12 (id, arr) VALUES(3, ['1000000-01-01', '2000000-01-01']); +INSERT INTO t12 (id, arr) VALUES(3, ['1000000-01-01', '2000000-01-01']); statement query IT select * from t12; @@ -268,7 +268,7 @@ statement ok INSERT INTO t13 (id, arr) VALUES(1, ['2021-01-01 01:01:01', '2022-01-01 01:01:01']), (2, ['1990-12-01 10:11:12', '2030-01-12 22:00:00']); statement error 1002 -INSERT INTO t13 (id, arr) VALUES(3, ['1000000-01-01 01:01:01', '2000000-01-01 01:01:01']); +INSERT INTO t13 (id, arr) VALUES(3, ['1000000-01-01 01:01:01', '2000000-01-01 01:01:01']); statement query IT select * from t13;