From d40a8e36664e3963bc063e4cf0b89bc6b6e26319 Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Mon, 10 Jan 2022 02:27:45 +0000 Subject: [PATCH 1/4] Reduce memory usage during Decimal Parquet->Arrow conversion This PR reduces memory usage, both in terms of memory used as well as allocations, in the Parquet->Arrow conversion of Decimal chunks. There are two optimizations: 1. Instead of using `slice::concat` to expand buffers to 16 bytes, a stack-allocated 16 byte buffer is used instead. This removes an allocation per value. 2. Data is expanded from the encoded Parquet fixed-size binary pages into a byte buffer, which is then converted to a buffer of i128s. To reduce the size of the intermediate byte buffer, this conversion is now done page-by-page. --- src/io/parquet/read/mod.rs | 92 ++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 414ab1c2c84..8f749b0d2a6 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -3,7 +3,7 @@ use std::{ collections::VecDeque, - convert::TryInto, + convert::TryFrom, io::{Read, Seek}, sync::Arc, }; @@ -30,9 +30,13 @@ pub use parquet2::{ use crate::{ array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray}, + bitmap::MutableBitmap, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::nested_utils::{create_list, init_nested}, + io::parquet::read::{ + fixed_size_binary::extend_from_page, + nested_utils::{create_list, init_nested}, + }, }; mod binary; @@ -314,44 +318,54 @@ fn page_iter_to_array { primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as i128) } - PhysicalType::FixedLenByteArray(n) => { - if *n > 16 { - Err(ArrowError::NotYetImplemented(format!( - "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", - n - ))) - } else { - let zeros_padding = (0..(16 - *n)).map(|_| 0u8).collect::>(); - let ones_padding = (0..(16 - *n)).map(|_| !0u8).collect::>(); - fixed_size_binary::iter_to_array( - iter, - DataType::FixedSizeBinary(*n as usize), - metadata, - ) - .map(|e| { - let a = e - .into_iter() - .map(|v| { - v.and_then(|v1| { - // Pad with the value of the MSB to correctly handle (two's complement) negative integers. - let msb_set = v1.first().unwrap_or(&0) >> 7 == 1; - let padding = if msb_set { - &ones_padding - } else { - &zeros_padding - }; - [padding, v1] - .concat() - .try_into() - .map(i128::from_be_bytes) - .ok() - }) - }) - .collect::>(); - Box::new(PrimitiveArray::::from(a).to(data_type)) - as Box - }) + &PhysicalType::FixedLenByteArray(n) if n > 16 => { + Err(ArrowError::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", + n + ))) + } + &PhysicalType::FixedLenByteArray(n) => { + let n = usize::try_from(n).unwrap(); + let capacity = metadata.num_values() as usize; + let mut validity = MutableBitmap::with_capacity(capacity); + let mut byte_values = Vec::::new(); + let mut i128_values = Vec::::with_capacity(capacity); + + // Iterate through the fixed-size binary pages, converting each fixed-size + // value to an i128, and append to `i128_values`. This conversion requires + // fully materializing the compressed Parquet page into an uncompressed byte + // buffer (`byte_values`), so operating page-at-a-time reduces memory usage as + // opposed to operating on the entire chunk. + while let Some(page) = iter.next()? { + byte_values.clear(); + byte_values.reserve(page.num_values() * n); + + extend_from_page( + page, + n, + metadata.descriptor(), + &mut byte_values, + &mut validity, + )?; + + debug_assert_eq!(byte_values.len() % n, 0); + + for fixed_size_value in byte_values.as_slice().chunks_exact(n) { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut i128_bytes = [0u8; 16]; + i128_bytes[..n].copy_from_slice(fixed_size_value); + i128_values.push(i128::from_be_bytes(i128_bytes) >> (128 - 8 * n)); + } } + + Ok(Box::new(PrimitiveArray::::from_data( + data_type, + i128_values.into(), + Some(validity.into()), + ))) } _ => unreachable!(), }, From cf79676d33f90f2d7090c37fedd4249e1718d3e4 Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Tue, 11 Jan 2022 02:23:55 +0000 Subject: [PATCH 2/4] backout per-page optimization --- src/io/parquet/read/mod.rs | 69 ++++++++++++-------------------------- 1 file changed, 22 insertions(+), 47 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 8f749b0d2a6..d4a6938f2d8 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -3,8 +3,8 @@ use std::{ collections::VecDeque, - convert::TryFrom, io::{Read, Seek}, + iter::FromIterator, sync::Arc, }; @@ -30,13 +30,9 @@ pub use parquet2::{ use crate::{ array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray}, - bitmap::MutableBitmap, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::{ - fixed_size_binary::extend_from_page, - nested_utils::{create_list, init_nested}, - }, + io::parquet::read::nested_utils::{create_list, init_nested}, }; mod binary; @@ -325,47 +321,26 @@ fn page_iter_to_array { - let n = usize::try_from(n).unwrap(); - let capacity = metadata.num_values() as usize; - let mut validity = MutableBitmap::with_capacity(capacity); - let mut byte_values = Vec::::new(); - let mut i128_values = Vec::::with_capacity(capacity); - - // Iterate through the fixed-size binary pages, converting each fixed-size - // value to an i128, and append to `i128_values`. This conversion requires - // fully materializing the compressed Parquet page into an uncompressed byte - // buffer (`byte_values`), so operating page-at-a-time reduces memory usage as - // opposed to operating on the entire chunk. - while let Some(page) = iter.next()? { - byte_values.clear(); - byte_values.reserve(page.num_values() * n); - - extend_from_page( - page, - n, - metadata.descriptor(), - &mut byte_values, - &mut validity, - )?; - - debug_assert_eq!(byte_values.len() % n, 0); - - for fixed_size_value in byte_values.as_slice().chunks_exact(n) { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut i128_bytes = [0u8; 16]; - i128_bytes[..n].copy_from_slice(fixed_size_value); - i128_values.push(i128::from_be_bytes(i128_bytes) >> (128 - 8 * n)); - } - } - - Ok(Box::new(PrimitiveArray::::from_data( - data_type, - i128_values.into(), - Some(validity.into()), - ))) + let n = n as usize; + let fixed_size_binary_array = fixed_size_binary::iter_to_array( + iter, + DataType::FixedSizeBinary(n), + metadata, + )?; + let i128_values = + fixed_size_binary_array + .into_iter() + .map(|value: Option<&[u8]>| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value?); + Some(i128::from_be_bytes(bytes) >> 8 * (16 - n)) + }); + let i128_array = PrimitiveArray::::from_iter(i128_values); + Ok(Box::new(i128_array.to(data_type)) as _) } _ => unreachable!(), }, From 3e7b85c4886033ff977bdf3555f8ee9fdb387731 Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Tue, 11 Jan 2022 11:14:02 -0800 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Jorge Leitao --- src/io/parquet/read/mod.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d4a6938f2d8..c25451ebedd 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -327,20 +327,23 @@ fn page_iter_to_array| { + .values() + .iter() + .map(|value: &[u8]| { // Copy the fixed-size byte value to the start of a 16 byte stack // allocated buffer, then use an arithmetic right shift to fill in // MSBs, which accounts for leading 1's in negative (two's complement) // values. let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value?); - Some(i128::from_be_bytes(bytes) >> 8 * (16 - n)) - }); - let i128_array = PrimitiveArray::::from_iter(i128_values); - Ok(Box::new(i128_array.to(data_type)) as _) + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> 8 * (16 - n) + }) + .collect::>(); + let validity = fixed_size_binary_array.validity().cloned(); + let i128_array = PrimitiveArray::::from_data(data_type, values.into(), validity); + Ok(Box::new(i128_array) as _) } _ => unreachable!(), }, From d000effe702c8ab07f522c025711e6ed96d41467 Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Tue, 11 Jan 2022 19:23:54 +0000 Subject: [PATCH 4/4] fix values optimization --- src/io/parquet/read/mod.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index c25451ebedd..e4579cdbfd7 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -4,7 +4,6 @@ use std::{ collections::VecDeque, io::{Read, Seek}, - iter::FromIterator, sync::Arc, }; @@ -327,22 +326,22 @@ fn page_iter_to_array> 8 * (16 - n) - }) - .collect::>(); + let values = fixed_size_binary_array + .values() + .chunks_exact(n) + .map(|value: &[u8]| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) + }) + .collect::>(); let validity = fixed_size_binary_array.validity().cloned(); - let i128_array = PrimitiveArray::::from_data(data_type, values.into(), validity); + let i128_array = + PrimitiveArray::::from_data(data_type, values.into(), validity); Ok(Box::new(i128_array) as _) } _ => unreachable!(),