Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
backout per-page optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
danburkert committed Jan 11, 2022
1 parent d40a8e3 commit cf79676
Showing 1 changed file with 22 additions and 47 deletions.
69 changes: 22 additions & 47 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use std::{
collections::VecDeque,
convert::TryFrom,
io::{Read, Seek},
iter::FromIterator,
sync::Arc,
};

Expand All @@ -30,13 +30,9 @@ pub use parquet2::{

use crate::{
array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray},
bitmap::MutableBitmap,
datatypes::{DataType, Field, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::{
fixed_size_binary::extend_from_page,
nested_utils::{create_list, init_nested},
},
io::parquet::read::nested_utils::{create_list, init_nested},
};

mod binary;
Expand Down Expand Up @@ -325,47 +321,26 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = usize::try_from(n).unwrap();
let capacity = metadata.num_values() as usize;
let mut validity = MutableBitmap::with_capacity(capacity);
let mut byte_values = Vec::<u8>::new();
let mut i128_values = Vec::<i128>::with_capacity(capacity);

// Iterate through the fixed-size binary pages, converting each fixed-size
// value to an i128, and append to `i128_values`. This conversion requires
// fully materializing the compressed Parquet page into an uncompressed byte
// buffer (`byte_values`), so operating page-at-a-time reduces memory usage as
// opposed to operating on the entire chunk.
while let Some(page) = iter.next()? {
byte_values.clear();
byte_values.reserve(page.num_values() * n);

extend_from_page(
page,
n,
metadata.descriptor(),
&mut byte_values,
&mut validity,
)?;

debug_assert_eq!(byte_values.len() % n, 0);

for fixed_size_value in byte_values.as_slice().chunks_exact(n) {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut i128_bytes = [0u8; 16];
i128_bytes[..n].copy_from_slice(fixed_size_value);
i128_values.push(i128::from_be_bytes(i128_bytes) >> (128 - 8 * n));
}
}

Ok(Box::new(PrimitiveArray::<i128>::from_data(
data_type,
i128_values.into(),
Some(validity.into()),
)))
let n = n as usize;
let fixed_size_binary_array = fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(n),
metadata,
)?;
let i128_values =
fixed_size_binary_array
.into_iter()
.map(|value: Option<&[u8]>| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value?);
Some(i128::from_be_bytes(bytes) >> 8 * (16 - n))
});
let i128_array = PrimitiveArray::<i128>::from_iter(i128_values);
Ok(Box::new(i128_array.to(data_type)) as _)
}
_ => unreachable!(),
},
Expand Down

0 comments on commit cf79676

Please sign in to comment.