Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added reserve to pushable containers in parquet extend_from_decoder #1301

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ impl<O: Offset> Offsets<O> {
}

impl<O: Offset> Pushable<O> for Offsets<O> {
fn reserve(&mut self, additional: usize) {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
self.0.reserve(additional)
}
#[inline]
fn len(&self) -> usize {
self.0.len() - 1
Expand Down Expand Up @@ -88,6 +91,12 @@ impl<O: Offset> Binary<O> {
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
#[inline]
fn reserve(&mut self, additional: usize) {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1);
self.values.reserve(additional * avg_len);
self.offsets.reserve(additional);
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand Down
4 changes: 4 additions & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl FixedSizeBinary {
}

impl<'a> Pushable<&'a [u8]> for FixedSizeBinary {
#[inline]
fn reserve(&mut self, additional: usize) {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
self.values.reserve(additional * self.size);
}
#[inline]
fn push(&mut self, value: &[u8]) {
debug_assert_eq!(value.len(), self.size);
Expand Down
40 changes: 34 additions & 6 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ pub fn not_implemented(page: &DataPage) -> Error {

/// A private trait representing structs that can receive elements.
pub(super) trait Pushable<T>: Sized {
//fn reserve(&mut self, additional: usize);
fn reserve(&mut self, additional: usize);
fn push(&mut self, value: T);
fn len(&self) -> usize;
fn push_null(&mut self);
fn extend_constant(&mut self, additional: usize, value: T);
}

impl Pushable<bool> for MutableBitmap {
#[inline]
fn reserve(&mut self, additional: usize) {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
MutableBitmap::reserve(self, additional)
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand All @@ -60,6 +64,10 @@ impl Pushable<bool> for MutableBitmap {
}

impl<A: Copy + Default> Pushable<A> for Vec<A> {
#[inline]
fn reserve(&mut self, additional: usize) {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
Vec::reserve(self, additional)
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand Down Expand Up @@ -290,11 +298,33 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
) {
let limit = limit.unwrap_or(usize::MAX);

let mut runs = vec![];
let mut remaining = limit;
let mut reserve_pushable = 0;

// first do a scan so that we know how much to reserve up front
while remaining > 0 {
let run = page_validity.next_limited(remaining);
let run = if let Some(run) = run { run } else { break };

match run {
FilteredHybridEncoded::Bitmap { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
FilteredHybridEncoded::Repeated { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
_ => {}
};
runs.push(run)
}
pushable.reserve(reserve_pushable);
validity.reserve(reserve_pushable);

// then a second loop to really fill the buffers
for run in runs {
match run {
FilteredHybridEncoded::Bitmap {
values,
Expand All @@ -313,18 +343,16 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
}
}
validity.extend_from_slice(values, offset, length);

remaining -= length;
}
FilteredHybridEncoded::Repeated { is_set, length } => {
validity.extend_constant(length, is_set);
if is_set {
(0..length).for_each(|_| pushable.push(values_iter.next().unwrap()));
for v in (&mut values_iter).take(length) {
pushable.push(v)
}
} else {
pushable.extend_constant(length, T::default());
}

remaining -= length;
}
FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {},
};
Expand Down