Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Reduce memory usage in Parquet->Arrow decimal column chunk conversion (
Browse files Browse the repository at this point in the history
  • Loading branch information
danburkert authored Jan 13, 2022
1 parent 2ca8aa7 commit 6b7af9f
Showing 1 changed file with 29 additions and 38 deletions.
67 changes: 29 additions & 38 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::{
collections::VecDeque,
convert::TryInto,
io::{Read, Seek},
sync::Arc,
};
Expand Down Expand Up @@ -314,44 +313,36 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
PhysicalType::Int64 => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as i128)
}
PhysicalType::FixedLenByteArray(n) => {
if *n > 16 {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
} else {
let zeros_padding = (0..(16 - *n)).map(|_| 0u8).collect::<Vec<_>>();
let ones_padding = (0..(16 - *n)).map(|_| !0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(*n as usize),
metadata,
)
.map(|e| {
let a = e
.into_iter()
.map(|v| {
v.and_then(|v1| {
// Pad with the value of the MSB to correctly handle (two's complement) negative integers.
let msb_set = v1.first().unwrap_or(&0) >> 7 == 1;
let padding = if msb_set {
&ones_padding
} else {
&zeros_padding
};
[padding, v1]
.concat()
.try_into()
.map(i128::from_be_bytes)
.ok()
})
})
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(a).to(data_type))
as Box<dyn Array>
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = n as usize;
let fixed_size_binary_array = fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(n),
metadata,
)?;
let values = fixed_size_binary_array
.values()
.chunks_exact(n)
.map(|value: &[u8]| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
}
.collect::<Vec<_>>();
let validity = fixed_size_binary_array.validity().cloned();
let i128_array =
PrimitiveArray::<i128>::from_data(data_type, values.into(), validity);
Ok(Box::new(i128_array) as _)
}
_ => unreachable!(),
},
Expand Down

0 comments on commit 6b7af9f

Please sign in to comment.