Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: split the output of SRF by max_block_size. #13817

Merged
merged 8 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod transform_accumulating_async;
mod transform_async;
mod transform_block_compact;
mod transform_block_compact_for_copy;
mod transform_blocking;
mod transform_compact;
mod transform_dummy;
mod transform_multi_sort_merge;
Expand All @@ -33,6 +34,7 @@ pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_block_compact::*;
pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_dummy::*;
pub use transform_sort::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use common_exception::Result;
use common_expression::DataBlock;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::InputPort;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;

pub trait BlockingTransform: Send {
const NAME: &'static str;

fn consume(&mut self, block: DataBlock) -> Result<()>;

fn transform(&mut self) -> Result<Option<DataBlock>>;
}

/// A transform may be blocked on a certain input.
///
/// This transform will not pull new data from the input until the inner transform returns [None].
pub struct BlockingTransformer<T: BlockingTransform + 'static> {
inner: T,
input: Arc<InputPort>,
output: Arc<OutputPort>,
input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
need_data: bool,
}

impl<T: BlockingTransform + 'static> BlockingTransformer<T> {
pub fn create(input: Arc<InputPort>, output: Arc<OutputPort>, inner: T) -> Box<dyn Processor> {
Box::new(Self {
inner,
input,
output,
input_data: None,
output_data: None,
need_data: true,
})
}
}

#[async_trait::async_trait]
impl<T: BlockingTransform + 'static> Processor for BlockingTransformer<T> {
fn name(&self) -> String {
String::from(T::NAME)
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn event(&mut self) -> Result<Event> {
if self.output.is_finished() {
self.input.finish();
return Ok(Event::Finished);
}

if !self.output.can_push() {
self.input.set_not_need_data();
return Ok(Event::NeedConsume);
}

if let Some(output) = self.output_data.take() {
self.output.push_data(Ok(output));
return Ok(Event::NeedConsume);
}

if !self.need_data {
// There is data needed to be transformed.
return Ok(Event::Sync);
}

// The data is fully consumed, we can begin to consume new data.
if self.input.has_data() {
let data = self.input.pull_data().unwrap()?;
self.input_data = Some(data);
return Ok(Event::Sync);
}

if self.input.is_finished() {
self.output.finish();
return Ok(Event::Finished);
}

self.input.set_need_data();
Ok(Event::NeedData)
}

fn process(&mut self) -> Result<()> {
if let Some(input) = self.input_data.take() {
debug_assert!(self.need_data);
self.inner.consume(input)?;
}

if let Some(block) = self.inner.transform()? {
self.output_data = Some(block);
self.need_data = false;
} else {
self.need_data = true;
}

Ok(())
}
}
37 changes: 16 additions & 21 deletions src/query/service/src/pipelines/builders/builder_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use common_functions::BUILTIN_FUNCTIONS;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_pipeline_sinks::EmptySink;
use common_pipeline_transforms::processors::TransformProfileWrapper;
use common_pipeline_transforms::processors::Transformer;
use common_pipeline_transforms::processors::ProcessorProfileWrapper;
use common_sql::evaluator::BlockOperator;
use common_sql::evaluator::CompoundBlockOperator;
use common_sql::executor::physical_plans::Project;
use common_sql::executor::physical_plans::ProjectSet;
use common_sql::ColumnBinding;

use crate::pipelines::processors::transforms::TransformSRF;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
Expand Down Expand Up @@ -82,36 +82,31 @@ impl PipelineBuilder {
pub(crate) fn build_project_set(&mut self, project_set: &ProjectSet) -> Result<()> {
self.build_pipeline(&project_set.input)?;

let op = BlockOperator::FlatMap {
projections: project_set.projections.clone(),
srf_exprs: project_set
.srf_exprs
.iter()
.map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS))
.collect(),
};

let num_input_columns = project_set.input.output_schema()?.num_fields();
let srf_exprs = project_set
.srf_exprs
.iter()
.map(|(expr, _)| expr.as_expr(&BUILTIN_FUNCTIONS))
.collect::<Vec<_>>();
let max_block_size = self.settings.get_max_block_size()? as usize;

self.main_pipeline.add_transform(|input, output| {
let transform = CompoundBlockOperator::new(
vec![op.clone()],
let transform = TransformSRF::try_create(
input,
output,
self.func_ctx.clone(),
num_input_columns,
project_set.projections.clone(),
srf_exprs.clone(),
max_block_size,
);

if self.enable_profiling {
Ok(ProcessorPtr::create(TransformProfileWrapper::create(
Ok(ProcessorPtr::create(ProcessorProfileWrapper::create(
transform,
input,
output,
project_set.plan_id,
self.proc_profs.clone(),
)))
} else {
Ok(ProcessorPtr::create(Transformer::create(
input, output, transform,
)))
Ok(ProcessorPtr::create(transform))
}
})
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod transform_resort_addon;
mod transform_resort_addon_without_source_schema;
mod transform_runtime_cast_schema;
mod transform_runtime_filter;
mod transform_srf;
mod transform_udf;
mod window;

Expand All @@ -57,6 +58,7 @@ pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithou
pub use transform_runtime_cast_schema::TransformRuntimeCastSchema;
pub use transform_runtime_filter::SinkRuntimeFilterSource;
pub use transform_runtime_filter::TransformRuntimeFilter;
pub use transform_srf::TransformSRF;
pub use transform_udf::TransformUdf;
pub use window::FrameBound;
pub use window::TransformWindow;
Expand Down
Loading
Loading