Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator #9043

Closed
wants to merge 10 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 29, 2020

This PR introduces a new CoalesceBatchesExec physical operator which combines small input batches and produces larger output batches. The physical optimizer inserts this operator around filters because highly selective filters can produce lots of small batches and this causes poor performance in some cases (especially joins) because we lose some of the benefits of vectorization if we have batches with single rows for example.

For TPC-H q12 at SF=100 and 8 partitions, this provides the following speedups on my desktop:

Batch Size Master This PR
8192 617.5 s 70.7 s
16384 183.1 s 46.4 s
32768 59.4 s 33.3 s
65536 27.5 s 20.7 s
131072 18.4 s 18.5 s

Note that the new CoalesceBatchesExec uses MutableArrayData which still suffers from some kind of exponential slow down as the number of batches increases, so we should be able to optimize this further, but at least we're using MutableArrayData to combine smaller numbers of batches now.

Even if we fix the slowdown in MutableArrayData, we would still want CoalesceBatchesExec to help avoid empty/tiny batches for other reasons.

@andygrove andygrove changed the title [Rust] [DataFusion] Implement coalesce batches operator ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator Dec 29, 2020
@andygrove
Copy link
Member Author

@jorgecarleitao @alamb @Dandandan fyi

@github-actions
Copy link

// wrap filter in coalesce batches
let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
let target_batch_size = ctx_state.config.batch_size;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this could at some point be part of a cost based on optimization based on the nr. of rows and selectivity of the filters?
Would it also make sense to wrap joins in the coalescebatchexec, as it can also reduce/increase the size of the batches? E.g. what is/would be the effect on tcph query 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good points. I did not think about join output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed https://issues.apache.org/jira/browse/ARROW-11068 to wrap join output and also to make this mechanism more generic.

Rather than hard-code a list of operators that need to be wrapped, we should find a more generic mechanism so that plans can declare if their input and/or output batches should be coalesced (similar to how we handle partitioning) and this would allow custom operators outside of DataFusion to benefit from this optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with Cost based optimizers is that invariably they get it wrong sometimes (e.g. the selectivity is mis calculated due to correlations in the data or nulls or something).

I think state of the art in optimizers is to delay as many such decisions to runtime as possible (when the actual cardinalities are known).

So in this case, rather than figuring out which output operators to wrap, I suggest we do something like wrap all operators with coalesce, or maybe update the Filter operation itself to do this coalescing internally when it is preparing its output and avoid the copy entirely

@Dandandan
Copy link
Contributor

LGTM 👍 😎 very nice speedup, good to see it also works on the bigger batch sizes (so it's not only because of the "problematic" join). I think it is a nice building block which we can decide to use in optimizations.
We should still look at improving the join implementation, although this hides the problem quite a bit.

@andygrove
Copy link
Member Author

We should still look at improving the join implementation, although this hides the problem quite a bit.

https://issues.apache.org/jira/browse/ARROW-11030 is the tracking issue for the MutableArrayData performance issue

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool, @andygrove .

I think that this would benefit from a test demonstrating its correctness, e.g. in terms of the minimum number of rows per batch.

I left a small comment wrt to code simplification.

Note that our github build is disabled, and we have to rely on our own repo's to verify that the CI is passing.

Comment on lines 149 to 167
let source_arrays = self
.buffer
.iter()
.map(|batch| batch.column(i).data_ref().as_ref())
.collect();
let mut array_data = MutableArrayData::new(
source_arrays,
true,
self.buffered_rows,
);
for j in 0..self.buffer.len() {
array_data.extend(
j,
0,
self.buffer[j].num_rows(),
);
}
let data = array_data.freeze();
arrays.push(make_array(Arc::new(data)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this operation is equivalent to arrow::compute::kernels::concat::concat, which we may use instead for simplicity (and in case we optimize concat.

Note that this operation is also done in the sort node, where we merge all batches from all partitions in a single batch.

@andygrove andygrove marked this pull request as draft December 29, 2020 18:57
@andygrove
Copy link
Member Author

I just realized I missed a case (we need to send any remaining buffered batches when the input has no more batches) and have changed this to a draft. I am also working on tests.

@andygrove andygrove marked this pull request as ready for review December 29, 2020 20:11
@andygrove
Copy link
Member Author

This is ready for review now

self.buffer.push(batch.clone());
self.buffered_rows += batch.num_rows();
// check to see if we have enough batches yet
if self.buffered_rows >= self.target_batch_size {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense too to make batches smaller or split them if they are bigger than the target batch size (e.g. for increased parallelism), or do we for now only want to grow them for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we use partitioning as the unit of parallelism (which I think makes sense) and we recently added the repartition operator which can increase or decrease parallelism.

I'm not sure if we will need the ability to split batches. The only use case I can think of right now would be if we had kernels that had limits on the size of batches that they could process for some reason.

}
_ => {
let mut partitions = vec![];
for i in 0..plan.output_partitioning().partition_count() {
Copy link
Contributor

@Dandandan Dandandan Dec 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could bind this partition count to a variable and reuse it in the code?

@Dandandan
Copy link
Contributor

Looks great. Added one question that I think could be done in the future and 1 style comment. But in general LGTM, awesome!

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great work, @andygrove .

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to argue with the performance improvement -- I do think the approach of keeping batch sizes reasonable might deserve some more high level planning.

Nice work @andygrove

if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
return Poll::Ready(Some(Ok(batch.clone())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the effect that it can reorder the output rows from this operator. I think that is fine, but it should probably be noted somewhere (so that when we get to optimizations related to sorting we know this operation as written will not preserve the input order)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this change the ordering within a single partition? The intent was to produce the rows in the same order they are received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops -- that was my mistake -- I didn't see the condition of self.buffer.is_empty() -- in that case I agree that the rows remain ordered

I guess I was thinking ahead to an operator that only copies data when needed rather than always. Too fancy. Sorry for the noise

@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// wrap filter in coalesce batches
let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
let target_batch_size = ctx_state.config.batch_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a heuristic like config.batch_size / 2 might be faster -- by setting it to batch_size we'll end up copying data if even a single row from a batch is filtered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wanted a separate config for this but I would like to do this once we have https://issues.apache.org/jira/browse/ARROW-11059 (which I would like to try and get in for 3.0.0).

I think changing it to half the batch size for now could make sense. I will push that change to this PR.

// wrap filter in coalesce batches
let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
let target_batch_size = ctx_state.config.batch_size;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with Cost based optimizers is that invariably they get it wrong sometimes (e.g. the selectivity is mis calculated due to correlations in the data or nulls or something).

I think state of the art in optimizers is to delay as many such decisions to runtime as possible (when the actual cardinalities are known).

So in this case, rather than figuring out which output operators to wrap, I suggest we do something like wrap all operators with coalesce, or maybe update the Filter operation itself to do this coalescing internally when it is preparing its output and avoid the copy entirely

@andygrove andygrove closed this in 1ddf721 Dec 30, 2020
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
This PR introduces a new `CoalesceBatchesExec` physical operator which combines small input batches and produces larger output batches. The physical optimizer inserts this operator around filters because highly selective filters can produce lots of small batches and this causes poor performance in some cases (especially joins) because we lose some of the benefits of vectorization if we have batches with single rows for example.

For TPC-H q12 at SF=100 and 8 partitions, this provides the following speedups on my desktop:

| Batch Size | Master | This PR |
| --- | --- | --- |
| 8192 | 617.5 s | 70.7 s |
| 16384 | 183.1 s | 46.4 s |
| 32768 | 59.4 s | 33.3 s |
| 65536 | 27.5 s | 20.7 s |
| 131072 | 18.4 s | 18.5 s |

Note that the new `CoalesceBatchesExec` uses `MutableArrayData` which still suffers from some kind of exponential slow down as the number of batches increases, so we should be able to optimize this further, but at least we're using `MutableArrayData` to combine smaller numbers of batches now.

Even if we fix the slowdown in `MutableArrayData`, we would still want `CoalesceBatchesExec` to help avoid empty/tiny batches for other reasons.

Closes apache#9043 from andygrove/coalesce_batches

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants