From 2dcfbb1d4e813ce6e615f0a056ac1984cd01e061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 11 May 2021 21:03:09 +0200 Subject: [PATCH 1/4] Support for plain encoded binary in data pages --- src/encoding/mod.rs | 1 + src/encoding/plain_byte_array/decoder.rs | 37 +++++++++++++++++++++++ src/encoding/plain_byte_array/mod.rs | 3 ++ src/serialization/read/binary.rs | 38 +++++++++++++++++++++++- src/serialization/read/mod.rs | 3 ++ 5 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 src/encoding/plain_byte_array/decoder.rs create mode 100644 src/encoding/plain_byte_array/mod.rs diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 179750087..9e8571c07 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -5,6 +5,7 @@ pub mod delta_bitpacked; pub mod delta_byte_array; pub mod delta_length_byte_array; pub mod hybrid_rle; +pub mod plain_byte_array; pub mod uleb128; pub mod zigzag_leb128; diff --git a/src/encoding/plain_byte_array/decoder.rs b/src/encoding/plain_byte_array/decoder.rs new file mode 100644 index 000000000..f2ba32f3c --- /dev/null +++ b/src/encoding/plain_byte_array/decoder.rs @@ -0,0 +1,37 @@ +use crate::encoding::get_length; + +/// Decodes according to [Plain strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0), +/// prefixes, lengths and values +/// # Implementation +/// This struct does not allocate on the heap. +#[derive(Debug)] +pub struct Decoder<'a> { + values: &'a [u8], + index: usize, +} + +impl<'a> Decoder<'a> { + pub fn new(values: &'a [u8]) -> Self { + Self { values, index: 0 } + } +} + +impl<'a> Iterator for Decoder<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option { + let values = self.values; + let index = self.index; + if index + 4 < values.len() { + let next_len = get_length(values) as usize; + let next_index = index + 4 + next_len; + + let result = Some(&values[index + 4..next_index]); + self.index = next_index; + + result + } else { + None + } + } +} diff --git a/src/encoding/plain_byte_array/mod.rs b/src/encoding/plain_byte_array/mod.rs new file mode 100644 index 000000000..32f89d6db --- /dev/null +++ b/src/encoding/plain_byte_array/mod.rs @@ -0,0 +1,3 @@ +mod decoder; + +pub use decoder::Decoder; diff --git a/src/serialization/read/binary.rs b/src/serialization/read/binary.rs index b6dd3d30c..1130751ad 100644 --- a/src/serialization/read/binary.rs +++ b/src/serialization/read/binary.rs @@ -4,8 +4,9 @@ use super::levels::consume_level; use crate::error::{ParquetError, Result}; use crate::metadata::ColumnDescriptor; use crate::read::Page; +use crate::serialization::read::utils::ValuesDef; use crate::{ - encoding::{bitpacking, uleb128}, + encoding::{bitpacking, plain_byte_array, uleb128}, read::BinaryPageDict, }; @@ -62,3 +63,38 @@ pub fn page_dict_to_vec( _ => todo!(), } } + +fn read_plain_buffer( + values: &[u8], + length: u32, + def_level_encoding: (&Encoding, i16), +) -> Vec>> { + let (values, def_levels) = consume_level(values, length, def_level_encoding); + + let decoded_values = plain_byte_array::Decoder::new(values).map(|bytes| bytes.to_vec()); + + ValuesDef::new( + decoded_values, + def_levels.into_iter(), + def_level_encoding.1 as u32, + ) + .collect() +} + +pub fn page_to_vec(page: &Page, descriptor: &ColumnDescriptor) -> Result>>> { + assert_eq!(descriptor.max_rep_level(), 0); + match page { + Page::V1(page) => match (&page.header.encoding, &page.dictionary_page) { + (Encoding::Plain, None) => Ok(read_plain_buffer( + &page.buffer, + page.header.num_values as u32, + ( + &page.header.definition_level_encoding, + descriptor.max_def_level(), + ), + )), + _ => todo!(), + }, + _ => todo!(), + } +} diff --git a/src/serialization/read/mod.rs b/src/serialization/read/mod.rs index 351f80c17..ae3d014ba 100644 --- a/src/serialization/read/mod.rs +++ b/src/serialization/read/mod.rs @@ -88,6 +88,9 @@ pub fn page_to_array(page: CompressedPage, descriptor: &ColumnDescriptor) -> Res PhysicalType::Double => { Ok(Array::Float64(primitive::page_to_vec(&page, descriptor)?)) } + PhysicalType::ByteArray => { + Ok(Array::Binary(binary::page_to_vec(&page, descriptor)?)) + } _ => todo!(), }, }, From bba72c72bc2381b0e91ee217b16df3c091304e41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Wed, 12 May 2021 11:16:27 +0200 Subject: [PATCH 2/4] Fix reading of length and implement size_hint for plain binary decoder --- src/encoding/plain_byte_array/decoder.rs | 25 ++++++++++++++++-------- src/serialization/read/binary.rs | 3 ++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/encoding/plain_byte_array/decoder.rs b/src/encoding/plain_byte_array/decoder.rs index f2ba32f3c..c0f1e62e3 100644 --- a/src/encoding/plain_byte_array/decoder.rs +++ b/src/encoding/plain_byte_array/decoder.rs @@ -7,31 +7,40 @@ use crate::encoding::get_length; #[derive(Debug)] pub struct Decoder<'a> { values: &'a [u8], - index: usize, + remaining: usize, } impl<'a> Decoder<'a> { - pub fn new(values: &'a [u8]) -> Self { - Self { values, index: 0 } + #[inline] + pub fn new(values: &'a [u8], length: usize) -> Self { + Self { + values, + remaining: length, + } } } impl<'a> Iterator for Decoder<'a> { type Item = &'a [u8]; + #[inline] fn next(&mut self) -> Option { let values = self.values; - let index = self.index; - if index + 4 < values.len() { + if values.len() >= 4 { let next_len = get_length(values) as usize; - let next_index = index + 4 + next_len; + let values = &values[4..]; - let result = Some(&values[index + 4..next_index]); - self.index = next_index; + let result = Some(&values[0..next_len]); + self.values = &values[next_len..]; + self.remaining -= 1; result } else { None } } + + fn size_hint(&self) -> (usize, Option) { + (self.remaining, None) + } } diff --git a/src/serialization/read/binary.rs b/src/serialization/read/binary.rs index 1130751ad..eb4b0aa59 100644 --- a/src/serialization/read/binary.rs +++ b/src/serialization/read/binary.rs @@ -71,7 +71,8 @@ fn read_plain_buffer( ) -> Vec>> { let (values, def_levels) = consume_level(values, length, def_level_encoding); - let decoded_values = plain_byte_array::Decoder::new(values).map(|bytes| bytes.to_vec()); + let decoded_values = + plain_byte_array::Decoder::new(values, length as usize).map(|bytes| bytes.to_vec()); ValuesDef::new( decoded_values, From c5366b006800d9dec2071513793b6b7edcf52c27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Sat, 15 May 2021 20:29:11 +0200 Subject: [PATCH 3/4] Add tests for string columns with and without dictionary encoding --- integration/write_pyarrow.py | 12 ++++++- src/lib.rs | 59 ++++++++++++++++++++++++++++------- src/serialization/read/mod.rs | 32 ++++++++++++++++++- 3 files changed, 89 insertions(+), 14 deletions(-) diff --git a/integration/write_pyarrow.py b/integration/write_pyarrow.py index 299eb92ed..d31c848c2 100644 --- a/integration/write_pyarrow.py +++ b/integration/write_pyarrow.py @@ -18,6 +18,7 @@ def case_basic_nullable(size = 1): pa.field('bool', pa.bool_()), pa.field('date', pa.timestamp('ms')), pa.field('uint32', pa.uint32()), + pa.field('string_non_dict', pa.utf8(), nullable=False), ] schema = pa.schema(fields) @@ -28,6 +29,7 @@ def case_basic_nullable(size = 1): "bool": boolean * size, "date": int64 * size, "uint32": int64 * size, + "string_non_dict": string * size, }, schema, f"basic_nullable_{size*10}.parquet" @@ -44,6 +46,7 @@ def case_basic_required(size = 1): pa.field('bool', pa.bool_(), nullable=False), pa.field('date', pa.timestamp('ms'), nullable=False), pa.field('uint32', pa.uint32(), nullable=False), + pa.field('string_non_dict', pa.utf8(), nullable=False), ] schema = pa.schema(fields) @@ -54,6 +57,7 @@ def case_basic_required(size = 1): "bool": boolean * size, "date": int64 * size, "uint32": int64 * size, + "string_non_dict": string * size, }, schema, f"basic_required_{size*10}.parquet" @@ -75,7 +79,7 @@ def write_pyarrow(case, size = 1, page_version = 1): t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) - pa.parquet.write_table(t, f"{base_path}/{path}", data_page_version=f"{page_version}.0") + pa.parquet.write_table(t, f"{base_path}/{path}", data_page_version=f"{page_version}.0", use_dictionary=["string"]) write_pyarrow(case_basic_nullable, 1, 1) # V1 @@ -85,3 +89,9 @@ def write_pyarrow(case, size = 1, page_version = 1): write_pyarrow(case_basic_required, 1, 2) # V2 write_pyarrow(case_nested, 1, 1) + +# pyarrow seems to write corrupt file when disabling dictionary encoding for nullable strings +# pyarrow can't read the column itself +# rust parquet1 also fails +# parquet2 seems to skip the null values and instead returns them at the end +# print(pa.parquet.read_table(f"{PYARROW_PATH}/v1/basic_nullable_10.parquet", columns=["string_non_dict"])) diff --git a/src/lib.rs b/src/lib.rs index 178091b1a..d7e265ff6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,6 +172,7 @@ mod tests { 2 => Array::Binary(string_values.to_vec()), 3 => Array::Boolean(bool_values.to_vec()), 4 => Array::Int64(i64_values.to_vec()), + 6 => Array::Binary(string_values.to_vec()), _ => unreachable!(), } } @@ -201,21 +202,33 @@ mod tests { // these values match the values in `integration` pub fn pyarrow_required(column: usize) -> Array { - let i64_values = &[ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - Some(7), - Some(8), - Some(9), + let i64_values = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let f64_values = &[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; + let string_values = &[ + "Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa", + ]; + let bool_values = &[ + true, true, false, false, false, true, true, true, true, true, ]; match column { - 0 => Array::Int64(i64_values.to_vec()), + 0 => Array::Int64(i64_values.iter().map(|i| Some(*i as i64)).collect()), + 1 => Array::Float64(f64_values.iter().map(|f| Some(*f)).collect()), + 2 => Array::Binary( + string_values + .iter() + .map(|s| Some(s.as_bytes().to_vec())) + .collect(), + ), + 3 => Array::Boolean(bool_values.iter().map(|b| Some(*b)).collect()), + 4 => Array::Int64(i64_values.iter().map(|i| Some(*i as i64)).collect()), + 5 => Array::Int32(i64_values.iter().map(|i| Some(*i as i32)).collect()), + 6 => Array::Binary( + string_values + .iter() + .map(|s| Some(s.as_bytes().to_vec())) + .collect(), + ), _ => unreachable!(), } } @@ -223,6 +236,28 @@ mod tests { pub fn pyarrow_required_stats(column: usize) -> (Option, Value, Value) { match column { 0 => (Some(0), Value::Int64(Some(0)), Value::Int64(Some(9))), + 1 => ( + Some(3), + Value::Float64(Some(0.0)), + Value::Float64(Some(9.0)), + ), + 2 => ( + Some(4), + Value::Binary(Some(b"".to_vec())), + Value::Binary(Some(b"def".to_vec())), + ), + 3 => ( + Some(4), + Value::Boolean(Some(false)), + Value::Boolean(Some(true)), + ), + 4 => (Some(3), Value::Int64(Some(0)), Value::Int64(Some(9))), + 5 => (Some(0), Value::Int32(Some(0)), Value::Int32(Some(9))), + 6 => ( + Some(4), + Value::Binary(Some(b"".to_vec())), + Value::Binary(Some(b"def".to_vec())), + ), _ => unreachable!(), } } diff --git a/src/serialization/read/mod.rs b/src/serialization/read/mod.rs index ae3d014ba..1f5877047 100644 --- a/src/serialization/read/mod.rs +++ b/src/serialization/read/mod.rs @@ -118,7 +118,9 @@ pub fn page_to_array(page: CompressedPage, descriptor: &ColumnDescriptor) -> Res mod tests { use std::fs::File; - use crate::read::{get_page_iterator, read_metadata, PrimitiveStatistics, Statistics}; + use crate::read::{ + get_page_iterator, read_metadata, BinaryStatistics, PrimitiveStatistics, Statistics, + }; use crate::tests::*; use crate::types::int96_to_i64; @@ -277,6 +279,12 @@ mod tests { assert_eq!(s.min_value, min); assert_eq!(s.max_value, max); } + (Value::Binary(min), Value::Binary(max)) => { + let s = stats.as_any().downcast_ref::().unwrap(); + + assert_eq!(s.min_value, min); + assert_eq!(s.max_value, max); + } _ => todo!(), } @@ -341,6 +349,28 @@ mod tests { test_pyarrow_integration("basic", 0, 1, false) } + #[test] + fn pyarrow_v1_dict_string_required() -> Result<()> { + test_pyarrow_integration("basic", 2, 1, true) + } + + #[test] + #[ignore = "optional strings are not yet supported, see https://github.com/jorgecarleitao/parquet2/pull/7"] + fn pyarrow_v1_dict_string_optional() -> Result<()> { + test_pyarrow_integration("basic", 2, 1, false) + } + + #[test] + fn pyarrow_v1_non_dict_string_required() -> Result<()> { + test_pyarrow_integration("basic", 6, 1, true) + } + + #[test] + #[ignore = "optional non dictionary encoded strings seem to be written incorrectly by pyarrow, neither pyarrow itself nor rust parquet1 can read the generated file"] + fn pyarrow_v1_non_dict_string_optional() -> Result<()> { + test_pyarrow_integration("basic", 6, 1, false) + } + #[test] fn pyarrow_v1_list_nullable() -> Result<()> { test_pyarrow_integration("nested", 0, 1, false) From 5a9ec2cb1ff4dc914bfb378a644025e164d0d223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Sat, 15 May 2021 20:31:13 +0200 Subject: [PATCH 4/4] Return upper bound in plain binary decoder --- src/encoding/plain_byte_array/decoder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/encoding/plain_byte_array/decoder.rs b/src/encoding/plain_byte_array/decoder.rs index c0f1e62e3..a950e4ce1 100644 --- a/src/encoding/plain_byte_array/decoder.rs +++ b/src/encoding/plain_byte_array/decoder.rs @@ -41,6 +41,6 @@ impl<'a> Iterator for Decoder<'a> { } fn size_hint(&self) -> (usize, Option) { - (self.remaining, None) + (self.remaining, Some(self.remaining)) } }