Skip to content

Commit

Permalink
chore: split the input block of BlockOperator::FlatMap.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Nov 27, 2023
1 parent df815ca commit be9a22b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod transform_block_compact_for_copy;
mod transform_compact;
mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_split;

pub mod transform_sort;
mod transform_sort_merge;
Expand All @@ -38,3 +39,4 @@ pub use transform_dummy::*;
pub use transform_sort::*;
pub use transform_sort_merge::sort_merge;
pub use transform_sort_partial::*;
pub use transform_split::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_pipeline_core::processors::InputPort;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;

use super::AccumulatingTransform;
use super::AccumulatingTransformer;

/// A processor to split a block into multiple blocks by `block_size`.
pub struct TransformSplit {
block_size: usize,
}

impl TransformSplit {
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
block_size: usize,
) -> Box<dyn Processor> {
AccumulatingTransformer::create(input, output, TransformSplit { block_size })
}
}

impl AccumulatingTransform for TransformSplit {
const NAME: &'static str = "TransformSplit";

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
let (mut blocks, remain) = data.split_by_rows(self.block_size);
if let Some(block) = remain {
blocks.push(block);
}
Ok(blocks)
}
}
12 changes: 12 additions & 0 deletions src/query/service/src/pipelines/builders/builder_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_pipeline_sinks::EmptySink;
use common_pipeline_transforms::processors::TransformProfileWrapper;
use common_pipeline_transforms::processors::TransformSplit;
use common_pipeline_transforms::processors::Transformer;
use common_sql::evaluator::BlockOperator;
use common_sql::evaluator::CompoundBlockOperator;
use common_sql::executor::physical_plans::Project;
use common_sql::executor::physical_plans::ProjectSet;
use common_sql::ColumnBinding;
use common_storages_fuse::TableContext;

use crate::pipelines::PipelineBuilder;

Expand Down Expand Up @@ -82,6 +84,16 @@ impl PipelineBuilder {
pub(crate) fn build_project_set(&mut self, project_set: &ProjectSet) -> Result<()> {
self.build_pipeline(&project_set.input)?;

// Split the input block into multiple blocks to reduce memory usage when executing `BlockOperator::FlatMap`.
let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize;
self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(TransformSplit::try_create(
input,
output,
max_block_size,
)))
})?;

let op = BlockOperator::FlatMap {
projections: project_set.projections.clone(),
srf_exprs: project_set
Expand Down

0 comments on commit be9a22b

Please sign in to comment.