Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator #9043

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 203 additions & 0 deletions rust/datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
//! vectorized processing by upstream operators.

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{make_array, MutableArrayData};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::stream::{Stream, StreamExt};
use log::debug;

/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
/// vectorized processing by upstream operators.
#[derive(Debug)]
pub struct CoalesceBatchesExec {
/// The input plan
input: Arc<dyn ExecutionPlan>,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
}

impl CoalesceBatchesExec {
/// Create a new CoalesceBatchesExec
pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
Self {
input,
target_batch_size,
}
}
}

#[async_trait]
impl ExecutionPlan for CoalesceBatchesExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
// The coalesce batches operator does not make any changes to the schema of its input
self.input.schema()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
// The coalesce batches operator does not make any changes to the partitioning of its input
self.input.output_partitioning()
}

fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(CoalesceBatchesExec::new(
children[0].clone(),
self.target_batch_size,
))),
_ => Err(DataFusionError::Internal(
"CoalesceBatchesExec wrong number of children".to_string(),
)),
}
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(CoalesceBatchesStream {
input: self.input.execute(partition).await?,
schema: self.input.schema().clone(),
target_batch_size: self.target_batch_size.clone(),
buffer: Vec::new(),
buffered_rows: 0,
}))
}
}

struct CoalesceBatchesStream {
/// The input plan
input: SendableRecordBatchStream,
/// The input schema
schema: SchemaRef,
/// Minimum number of rows for coalesces batches
target_batch_size: usize,
/// Buffered batches
buffer: Vec<RecordBatch>,
/// Buffered row count
buffered_rows: usize,
}

impl Stream for CoalesceBatchesStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let input_batch = self.input.poll_next_unpin(cx);
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(ref batch)) => {
if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
return Poll::Ready(Some(Ok(batch.clone())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the effect that it can reorder the output rows from this operator. I think that is fine, but it should probably be noted somewhere (so that when we get to optimizations related to sorting we know this operation as written will not preserve the input order)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this change the ordering within a single partition? The intent was to produce the rows in the same order they are received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops -- that was my mistake -- I didn't see the condition of self.buffer.is_empty() -- in that case I agree that the rows remain ordered

I guess I was thinking ahead to an operator that only copies data when needed rather than always. Too fancy. Sorry for the noise

} else {
// add to the buffered batches
self.buffer.push(batch.clone());
self.buffered_rows += batch.num_rows();
// check to see if we have enough batches yet
if self.buffered_rows >= self.target_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense too to make batches smaller or split them if they are bigger than the target batch size (e.g. for increased parallelism), or do we for now only want to grow them for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we use partitioning as the unit of parallelism (which I think makes sense) and we recently added the repartition operator which can increase or decrease parallelism.

I'm not sure if we will need the ability to split batches. The only use case I can think of right now would be if we had kernels that had limits on the size of batches that they could process for some reason.

// combine the batches and return
let mut arrays =
Vec::with_capacity(self.schema.fields().len());
for i in 0..self.schema.fields().len() {
let source_arrays = self
.buffer
.iter()
.map(|batch| batch.column(i).data_ref().as_ref())
.collect();
let mut array_data = MutableArrayData::new(
source_arrays,
true,
self.buffered_rows,
);
for j in 0..self.buffer.len() {
array_data.extend(
j,
0,
self.buffer[j].num_rows(),
);
}
let data = array_data.freeze();
arrays.push(make_array(Arc::new(data)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this operation is equivalent to arrow::compute::kernels::concat::concat, which we may use instead for simplicity (and in case we optimize concat.

Note that this operation is also done in the sort node, where we merge all batches from all partitions in a single batch.

}
let batch =
RecordBatch::try_new(self.schema.clone(), arrays)?;

debug!(
"Combined {} batches containing {} rows",
self.buffer.len(),
self.buffered_rows
);

// reset buffer state
self.buffer.clear();
self.buffered_rows = 0;

// return batch
return Poll::Ready(Some(Ok(batch)));
}
}
}
other => return Poll::Ready(other),
},
Poll::Pending => return Poll::Pending,
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
//TODO need to do something here?
// same number of record batches
self.input.size_hint()
}
}

impl RecordBatchStream for CoalesceBatchesStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
1 change: 1 addition & 0 deletions rust/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ pub trait Accumulator: Send + Sync + Debug {

pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
pub mod common;
pub mod csv;
pub mod datetime_expressions;
Expand Down
11 changes: 11 additions & 0 deletions rust/datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::logical_plan::{
DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
StringifiedPlan, UserDefinedLogicalNode,
};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
Expand Down Expand Up @@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// wrap filter in coalesce batches
let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
let target_batch_size = ctx_state.config.batch_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a heuristic like config.batch_size / 2 might be faster -- by setting it to batch_size we'll end up copying data if even a single row from a batch is filtered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wanted a separate config for this but I would like to do this once we have https://issues.apache.org/jira/browse/ARROW-11059 (which I would like to try and get in for 3.0.0).

I think changing it to half the batch size for now could make sense. I will push that change to this PR.

Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could at some point be part of a cost based on optimization based on the nr. of rows and selectivity of the filters?
Would it also make sense to wrap joins in the coalescebatchexec, as it can also reduce/increase the size of the batches? E.g. what is/would be the effect on tcph query 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good points. I did not think about join output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed https://issues.apache.org/jira/browse/ARROW-11068 to wrap join output and also to make this mechanism more generic.

Rather than hard-code a list of operators that need to be wrapped, we should find a more generic mechanism so that plans can declare if their input and/or output batches should be coalesced (similar to how we handle partitioning) and this would allow custom operators outside of DataFusion to benefit from this optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with Cost based optimizers is that invariably they get it wrong sometimes (e.g. the selectivity is mis calculated due to correlations in the data or nulls or something).

I think state of the art in optimizers is to delay as many such decisions to runtime as possible (when the actual cardinalities are known).

So in this case, rather than figuring out which output operators to wrap, I suggest we do something like wrap all operators with coalesce, or maybe update the Filter operation itself to do this coalescing internally when it is preparing its output and avoid the copy entirely

} else {
plan.clone()
};

let children = plan.children().clone();

match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
Expand Down