Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reduce memory usage in Parquet->Arrow decimal column chunk conversion #751

Merged
merged 4 commits into from
Jan 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 29 additions & 38 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::VecDeque,
convert::TryInto,
io::{Read, Seek},
sync::Arc,
};
Expand Down Expand Up @@ -314,44 +313,36 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
PhysicalType::Int64 => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as i128)
}
PhysicalType::FixedLenByteArray(n) => {
if *n > 16 {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
} else {
let zeros_padding = (0..(16 - *n)).map(|_| 0u8).collect::<Vec<_>>();
let ones_padding = (0..(16 - *n)).map(|_| !0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(*n as usize),
metadata,
)
.map(|e| {
let a = e
.into_iter()
.map(|v| {
v.and_then(|v1| {
// Pad with the value of the MSB to correctly handle (two's complement) negative integers.
let msb_set = v1.first().unwrap_or(&0) >> 7 == 1;
let padding = if msb_set {
&ones_padding
} else {
&zeros_padding
};
[padding, v1]
.concat()
.try_into()
.map(i128::from_be_bytes)
.ok()
})
})
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(a).to(data_type))
as Box<dyn Array>
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = n as usize;
let fixed_size_binary_array = fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(n),
metadata,
)?;
let values = fixed_size_binary_array
.values()
.chunks_exact(n)
.map(|value: &[u8]| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
}
.collect::<Vec<_>>();
let validity = fixed_size_binary_array.validity().cloned();
let i128_array =
PrimitiveArray::<i128>::from_data(data_type, values.into(), validity);
Ok(Box::new(i128_array) as _)
}
_ => unreachable!(),
},
Expand Down