-
Notifications
You must be signed in to change notification settings - Fork 853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Specialize interleave string ~2-3x faster #2944
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -16,11 +16,11 @@ | |||||||
// under the License. | ||||||||
|
||||||||
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; | ||||||||
use arrow_array::cast::as_primitive_array; | ||||||||
use arrow_array::{ | ||||||||
downcast_primitive, make_array, new_empty_array, Array, ArrayRef, ArrowPrimitiveType, | ||||||||
PrimitiveArray, | ||||||||
GenericStringArray, OffsetSizeTrait, PrimitiveArray, | ||||||||
}; | ||||||||
use arrow_buffer::{Buffer, MutableBuffer}; | ||||||||
use arrow_data::transform::MutableArrayData; | ||||||||
use arrow_data::ArrayDataBuilder; | ||||||||
use arrow_schema::{ArrowError, DataType}; | ||||||||
|
@@ -85,51 +85,105 @@ pub fn interleave( | |||||||
|
||||||||
downcast_primitive! { | ||||||||
data_type => (primitive_helper, values, indices, data_type), | ||||||||
DataType::Utf8 => interleave_string::<i32>(values, indices, data_type), | ||||||||
DataType::LargeUtf8 => interleave_string::<i64>(values, indices, data_type), | ||||||||
_ => interleave_fallback(values, indices) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/// Common functionality for interleaving arrays | ||||||||
struct Interleave<'a, T> { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some comments might help here specifically what |
||||||||
arrays: Vec<&'a T>, | ||||||||
null_count: usize, | ||||||||
nulls: Option<Buffer>, | ||||||||
} | ||||||||
|
||||||||
impl<'a, T: Array + 'static> Interleave<'a, T> { | ||||||||
fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self { | ||||||||
let mut has_nulls = false; | ||||||||
let arrays: Vec<&T> = values | ||||||||
.iter() | ||||||||
.map(|x| { | ||||||||
has_nulls = has_nulls || x.null_count() != 0; | ||||||||
x.as_any().downcast_ref().unwrap() | ||||||||
}) | ||||||||
.collect(); | ||||||||
|
||||||||
let mut null_count = 0; | ||||||||
let nulls = has_nulls.then(|| { | ||||||||
let mut builder = BooleanBufferBuilder::new(indices.len()); | ||||||||
for (a, b) in indices { | ||||||||
let v = arrays[*a].is_valid(*b); | ||||||||
null_count += !v as usize; | ||||||||
builder.append(v) | ||||||||
} | ||||||||
builder.finish() | ||||||||
}); | ||||||||
|
||||||||
Self { | ||||||||
arrays, | ||||||||
null_count, | ||||||||
nulls, | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
fn interleave_primitive<T: ArrowPrimitiveType>( | ||||||||
values: &[&dyn Array], | ||||||||
indices: &[(usize, usize)], | ||||||||
data_type: &DataType, | ||||||||
) -> Result<ArrayRef, ArrowError> { | ||||||||
let mut has_nulls = false; | ||||||||
let cast: Vec<_> = values | ||||||||
.iter() | ||||||||
.map(|x| { | ||||||||
has_nulls = has_nulls || x.null_count() != 0; | ||||||||
as_primitive_array::<T>(*x) | ||||||||
}) | ||||||||
.collect(); | ||||||||
let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||||||||
|
||||||||
let mut values = BufferBuilder::<T::Native>::new(indices.len()); | ||||||||
for (a, b) in indices { | ||||||||
let v = cast[*a].value(*b); | ||||||||
let v = interleaved.arrays[*a].value(*b); | ||||||||
values.append(v) | ||||||||
} | ||||||||
|
||||||||
let mut null_count = 0; | ||||||||
let nulls = has_nulls.then(|| { | ||||||||
let mut builder = BooleanBufferBuilder::new(indices.len()); | ||||||||
for (a, b) in indices { | ||||||||
let v = cast[*a].is_valid(*b); | ||||||||
null_count += !v as usize; | ||||||||
builder.append(v) | ||||||||
} | ||||||||
builder.finish() | ||||||||
}); | ||||||||
|
||||||||
let builder = ArrayDataBuilder::new(data_type.clone()) | ||||||||
.len(indices.len()) | ||||||||
.add_buffer(values.finish()) | ||||||||
.null_bit_buffer(nulls) | ||||||||
.null_count(null_count); | ||||||||
.null_bit_buffer(interleaved.nulls) | ||||||||
.null_count(interleaved.null_count); | ||||||||
|
||||||||
let data = unsafe { builder.build_unchecked() }; | ||||||||
Ok(Arc::new(PrimitiveArray::<T>::from(data))) | ||||||||
} | ||||||||
|
||||||||
fn interleave_string<O: OffsetSizeTrait>( | ||||||||
values: &[&dyn Array], | ||||||||
indices: &[(usize, usize)], | ||||||||
data_type: &DataType, | ||||||||
) -> Result<ArrayRef, ArrowError> { | ||||||||
let interleaved = Interleave::<'_, GenericStringArray<O>>::new(values, indices); | ||||||||
|
||||||||
let mut capacity = 0; | ||||||||
let mut offsets = BufferBuilder::<O>::new(indices.len() + 1); | ||||||||
offsets.append(O::from_usize(0).unwrap()); | ||||||||
for (a, b) in indices { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is clever -- do the offsets in one pass and the strings in another |
||||||||
let o = interleaved.arrays[*a].value_offsets(); | ||||||||
let len = o[*b + 1].as_usize() - o[*b].as_usize(); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
capacity += len; | ||||||||
offsets.append(O::from_usize(capacity).expect("overflow")); | ||||||||
} | ||||||||
|
||||||||
let mut values = MutableBuffer::new(capacity); | ||||||||
for (a, b) in indices { | ||||||||
values.extend_from_slice(interleaved.arrays[*a].value(*b).as_bytes()); | ||||||||
} | ||||||||
|
||||||||
let builder = ArrayDataBuilder::new(data_type.clone()) | ||||||||
.len(indices.len()) | ||||||||
.add_buffer(offsets.finish()) | ||||||||
.add_buffer(values.into()) | ||||||||
.null_bit_buffer(interleaved.nulls) | ||||||||
.null_count(interleaved.null_count); | ||||||||
|
||||||||
let data = unsafe { builder.build_unchecked() }; | ||||||||
Ok(Arc::new(GenericStringArray::<O>::from(data))) | ||||||||
} | ||||||||
|
||||||||
/// Fallback implementation of interleave using [`MutableArrayData`] | ||||||||
fn interleave_fallback( | ||||||||
values: &[&dyn Array], | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a drive by cleanup right?