-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: improve performance of SortPreservingMergeExec
operator
#722
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very solid to me -- I went over this PR carefully
FYI @Dandandan and @jorgecarleitao
|
||
// A collection of comparators that compare rows in this cursor's batch to | ||
// the cursors in other batches. Other batches are uniquely identified by | ||
// their batch_idx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth mentioning that the index of the Vec
are the sort column positions (not other batch indexes)?
let cmp = self | ||
.batch_comparators | ||
.entry(other.batch_idx) | ||
.or_insert_with(|| Vec::with_capacity(other.columns.len())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was initially worried that mutating cmp
below might mask errors (e.g. that the number of columns had grown somehow), so I wonder if it is worth putting the initialization of cmp
into this callsite (rather than the loop below)?
However, perhaps that avoids having to clone or collect zipped
.
Please feel free to ignore this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha. Yeah that's how I had it but you have to re-create the iterator. I'm happy to go with whatever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this way is fine, personally
@@ -255,15 +295,11 @@ impl SortKeyCursor { | |||
} | |||
(true, false) => return Ok(Ordering::Less), | |||
(false, false) => {} | |||
(true, true) => { | |||
// TODO: Building the predicate each time is sub-optimal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
Once DataFusion is using Arrow 5.0 (which @alamb informs me should be relatively soon), I will rebase this PR and get it green. FWIW I have tested this PR internally and it has effectively eradicated |
This commit stores built Arrow comparators for two arrays on each of the sort key cursors, resulting in a significant reduction in the cost associated with merging record batches using the `SortPreservingMerge` operator. Benchmarks improved as follows: ``` ⇒ critcmp master pr group master pr ----- ------ -- interleave_batches 1.83 623.8±12.41µs ? ?/sec 1.00 341.2±6.98µs ? ?/sec merge_batches_no_overlap_large 1.56 400.6±4.94µs ? ?/sec 1.00 256.3±6.57µs ? ?/sec merge_batches_no_overlap_small 1.63 425.1±24.88µs ? ?/sec 1.00 261.1±7.46µs ? ?/sec merge_batches_small_into_large 1.18 228.0±3.95µs ? ?/sec 1.00 193.6±2.86µs ? ?/sec merge_batches_some_overlap_large 1.68 505.4±10.27µs ? ?/sec 1.00 301.3±6.63µs ? ?/sec merge_batches_some_overlap_small 1.64 515.7±5.21µs ? ?/sec 1.00 314.6±12.66µs ? ?/sec ```
Note to anyone else reviewing / looking at this change: While DataFusion does not (yet) use the |
let cmp = self | ||
.batch_comparators | ||
.entry(other.batch_idx) | ||
.or_insert_with(|| Vec::with_capacity(other.columns.len())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to clean older batch_idx
over time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to reduce the peak memory usage by freeing comparators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HashMap only lives for as long as the lifetime of the sort preserving merge operator. I suppose you could merge n streams large enough to make the old comparators take up non-negligible memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I meant. The memory usage would increase over time by holding onto the older comparators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally think the "right" thing to do in this case is to create an arrow comparator interface that doesn't have bound arrays, so like
let comparator = arrow::compute::build_comparator(array1.data_type(), array2.data_type());
...
if comparator(array1, index_1, array2, index_2) == Ordering::Equal {
...
}
With that interface we could simply use the same comparator
So that is to say, I recommend leaving this PR the way it is and putting our efforts into a better comparison interface rather than trying to optimize for a small amount of memory savings here
I'm happy for this to be merged as-and-when. Cheers 👍 |
One other thing I thought of last night is that this approach will effectively hold references to all input |
NOTE: this requires Arrow 5.0 containing the following PR: apache/arrow-rs#542
Which issue does this PR close?
This will effectively close #655
Rationale for this change
When merging input record batches together into a single output stream the
SortPreservingMergeExec
operator currently builds a comparator for each column in the inputs every single time it compares one row to another. Due to the cost of building an ArrowDynComparator
this ends up being prohibitively expensive.(I have more details in https://github.com/influxdata/influxdb_iox/issues/1983 but you can get the gist of the problem by looking at this profile:
The same comparator should be usable for comparing two input record batches until you have completely merged their rows. Therefore the rationale for the change in this PR is ensure that that's what happens.
What changes are included in this PR?
The state associated with the process or merging two input record batches is managed by a
SortKeyCursor
. It tracks the current row, the columns, the backing record batch and so on. This PR adds a collection of ArrowDynComparator
s that can be used (re-used) every time that aSortKeyCursor
needs to compare one of its rows to that of anotherSortKeyCursor
.In order to know whether a cursor was being compared to a cursor it had seen before or not I needed to be able to uniquely identify an input record batch. I did this by incrementing an index each time a new record batch was fed into the operator.
When row comparison is happening the comparator collection is consulted and the
DynComparator
used if it exists. Otherwise one is created and stored.Performance
This PR significantly improves the performance of the
SortPreservingMergeExec
operator because it amortises the cost of creating a comparator over all row comparisons in the record batch, rather than it being a fixed cost for every single row comparison. Here are some existing benchmark results:The results sugges that the changes in this PR improve performance bu upto
1.8x
. However, since the performance delta is tied to the size of the input, the performance boost could be significantly larger for larger inputs.