Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Add support to merge sort with a limit #222

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions src/compute/merge_sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! let slices2 = merge_sort_slices(a2, a3);
//! let slices = merge_sort_slices(slices1, slices2);
//!
//! let array = take_arrays(&[a0, a1, a2, a3], slices);
//! let array = take_arrays(&[a0, a1, a2, a3], slices, None);
//! ```
//!
//! A common operation in query engines is to merge multiple fields based on the
Expand All @@ -49,8 +49,8 @@
//! ```rust,ignore
//! // `slices` computed before-hand
//! // in parallel
//! let array1 = take_arrays(&[a0, a1, a2, a3], slices);
//! let array2 = take_arrays(&[b0, b1, b2, b3], slices);
//! let array1 = take_arrays(&[a0, a1, a2, a3], slices, None);
//! let array2 = take_arrays(&[b0, b1, b2, b3], slices, None);
//! ```
//!
//! To serialize slices, e.g. for checkpointing or transfer via Arrow's IPC, you can store
Expand Down Expand Up @@ -89,13 +89,22 @@ type MergeSlice = (usize, usize, usize);
pub fn take_arrays<I: IntoIterator<Item = MergeSlice>>(
arrays: &[&dyn Array],
slices: I,
limit: Option<usize>,
) -> Box<dyn Array> {
let slices = slices.into_iter();
let len = arrays.iter().map(|array| array.len()).sum();
let mut growable = make_growable(arrays, false, len);
let limit = limit.unwrap_or(len);
let mut growable = make_growable(arrays, false, limit);

let mut current_len = 0;
for (index, start, len) in slices {
growable.extend(index, start, len)
if len + current_len >= limit {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably makes the code without a limit (limit=None) slower.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be avoided by having two different implementations/bodies.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to not expose a new function just for this; wouldn't it make sense to move this if to outside of the loop and keep the function with the limit argument?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid the if can't move outside of the loop because the slice is an iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do slices.into_iter() in two code blocks instead of reusing it, it shouldn't be a compile error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is thatslices is lazy iterated, we don't know the item of len unless we call next(), so the if check must be inside the loop when limit is some value.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking in something like this:

...
if let Some(limit) = limit {
    iterate with the limit condition
} else {
   iterate without the limit condition
}
...
growable.to_box()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great.

growable.extend(index, start, limit - current_len);
break;
} else {
growable.extend(index, start, len);
current_len += len;
}
}
growable.as_box()
}
Expand All @@ -114,7 +123,7 @@ pub fn take_arrays<I: IntoIterator<Item = MergeSlice>>(
/// # fn main() -> Result<()> {
/// let a = Int32Array::from_slice(&[2, 4, 6]);
/// let b = Int32Array::from_slice(&[0, 1, 3]);
/// let sorted = merge_sort(&a, &b, &SortOptions::default())?;
/// let sorted = merge_sort(&a, &b, &SortOptions::default(), None)?;
/// let expected = Int32Array::from_slice(&[0, 1, 2, 3, 4, 6]);
/// assert_eq!(expected, sorted.as_ref());
/// # Ok(())
Expand All @@ -124,6 +133,7 @@ pub fn merge_sort(
lhs: &dyn Array,
rhs: &dyn Array,
options: &SortOptions,
limit: Option<usize>,
) -> Result<Box<dyn Array>> {
let arrays = &[lhs, rhs];

Expand All @@ -133,7 +143,7 @@ pub fn merge_sort(
let lhs = (0, 0, lhs.len());
let rhs = (1, 0, rhs.len());
let slices = merge_sort_slices(once(&lhs), once(&rhs), &comparator);
Ok(take_arrays(arrays, slices))
Ok(take_arrays(arrays, slices, limit))
}

/// Returns a vector of slices from different sorted arrays that can be used to create sorted arrays.
Expand Down Expand Up @@ -519,6 +529,26 @@ mod tests {
Ok(())
}

#[test]
fn test_merge_with_limit() -> Result<()> {
let a0: &dyn Array = &Int32Array::from_slice(&[0, 2, 4, 6, 8]);
let a1: &dyn Array = &Int32Array::from_slice(&[1, 3, 5, 7, 9]);

let options = SortOptions::default();
let arrays = vec![a0, a1];
let pairs = vec![(arrays.as_ref(), &options)];
let comparator = build_comparator(&pairs)?;

let slices = merge_sort_slices(once(&(0, 0, 5)), once(&(1, 0, 5)), &comparator);
// thus, they can be used to take from the arrays
let array = take_arrays(&arrays, slices, Some(5));

let expected = Int32Array::from_slice(&[0, 1, 2, 3, 4]);
// values are right
assert_eq!(expected, array.as_ref());
Ok(())
}

#[test]
fn test_merge_4_i32() -> Result<()> {
let a0: &dyn Array = &Int32Array::from_slice(&[0, 1]);
Expand Down Expand Up @@ -546,7 +576,7 @@ mod tests {
);

// thus, they can be used to take from the arrays
let array = take_arrays(&arrays, slices);
let array = take_arrays(&arrays, slices, None);

let expected = Int32Array::from_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);

Expand Down Expand Up @@ -616,7 +646,7 @@ mod tests {
let pairs = vec![(arrays0.as_ref(), &options), (arrays1.as_ref(), &options)];
let slices = slices(&pairs)?;

let array = take_arrays(&[array0, array1], slices);
let array = take_arrays(&[array0, array1], slices, None);

assert_eq!(expected, array.as_ref());
Ok(())
Expand All @@ -641,7 +671,7 @@ mod tests {
let a1 = sort(a1, &options, None)?;

// merge then. If multiple arrays, this can be applied in parallel.
let result = merge_sort(a0.as_ref(), a1.as_ref(), &options)?;
let result = merge_sort(a0.as_ref(), a1.as_ref(), &options, None)?;

assert_eq!(expected, result.as_ref());
Ok(())
Expand Down