From a1049b802d626d300effb232022eb7430ac27690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Mon, 10 May 2021 23:40:06 +0200 Subject: [PATCH] Support plain encoding for binary types , nullable and hybrid encoded dictionary ids and nested primitives where some levels are requires --- src/encoding/hybrid_rle/decoder.rs | 116 ++++++++++++++------- src/encoding/hybrid_rle/mod.rs | 52 ++++++++- src/encoding/mod.rs | 1 + src/encoding/plain_byte_array/decoder.rs | 37 +++++++ src/encoding/plain_byte_array/mod.rs | 3 + src/serialization/read/binary.rs | 65 +++++++++--- src/serialization/read/levels.rs | 40 +------ src/serialization/read/mod.rs | 3 + src/serialization/read/primitive_nested.rs | 18 ++-- 9 files changed, 233 insertions(+), 102 deletions(-) create mode 100644 src/encoding/plain_byte_array/decoder.rs create mode 100644 src/encoding/plain_byte_array/mod.rs diff --git a/src/encoding/hybrid_rle/decoder.rs b/src/encoding/hybrid_rle/decoder.rs index 1294979c6..2b2e4aab5 100644 --- a/src/encoding/hybrid_rle/decoder.rs +++ b/src/encoding/hybrid_rle/decoder.rs @@ -3,20 +3,40 @@ use super::{super::ceil8, HybridEncoded}; /// An iterator that, given a slice of bytes, returns `HybridEncoded` pub struct Decoder<'a> { - values: &'a [u8], - num_bits: u32, + inner: std::iter::Flatten>, } impl<'a> Decoder<'a> { - pub fn new(values: &'a [u8], num_bits: u32) -> Self { - Self { values, num_bits } + pub fn new(values: &'a [u8], num_bits: u32, length: usize) -> Self { + Self { + inner: HybridIterator { + values, + num_bits, + remaining: length, + } + .flatten(), + } } } impl<'a> Iterator for Decoder<'a> { - type Item = HybridEncoded<'a>; + type Item = u32; fn next(&mut self) -> Option { + self.inner.next() + } +} + +pub struct HybridIterator<'a> { + values: &'a [u8], + num_bits: u32, + remaining: usize, +} + +impl<'a> Iterator for HybridIterator<'a> { + type Item = HybridEncoded<'a>; + + fn next(&mut self) -> Option> { if self.values.is_empty() { return None; } @@ -24,17 +44,34 @@ impl<'a> Iterator for Decoder<'a> { self.values = &self.values[consumed..]; if indicator & 1 == 1 { // is bitpacking - let bytes = (indicator as usize >> 1) * self.num_bits as usize; - let result = Some(HybridEncoded::Bitpacked(&self.values[..bytes])); - self.values = &self.values[bytes..]; + let num_bits = self.num_bits as usize; + let num_bytes = (indicator as usize >> 1) * num_bits; + let length = (indicator as usize >> 1) * 8; + let run_length = std::cmp::min(length, self.remaining); + let result = Some(HybridEncoded::Bitpacked { + compressed: &self.values[..num_bytes], + num_bits, + run_length, + }); + self.remaining -= run_length; + self.values = &self.values[num_bytes..]; result } else { // is rle let run_length = indicator as usize >> 1; // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) let rle_bytes = ceil8(self.num_bits as usize); - let result = Some(HybridEncoded::Rle(&self.values[..rle_bytes], run_length)); + + let pack = &self.values[0..rle_bytes]; + let mut value_bytes = [0u8; std::mem::size_of::()]; + pack.iter() + .enumerate() + .for_each(|(i, byte)| value_bytes[i] = *byte); + let value = u32::from_le_bytes(value_bytes); + + let result = Some(HybridEncoded::Rle { value, run_length }); self.values = &self.values[rle_bytes..]; + self.remaining -= run_length; result } } @@ -44,8 +81,6 @@ impl<'a> Iterator for Decoder<'a> { mod tests { use super::*; - use super::super::super::bitpacking; - #[test] fn basics_1() { let bit_width = 1; @@ -54,19 +89,13 @@ mod tests { 2, 0, 0, 0, // length 0b00000011, 0b00001011, // data ]; + let expected = vec![1, 1, 0, 1, 0]; - let mut decoder = Decoder::new(&values[4..6], bit_width); + let decoder = Decoder::new(&values[4..6], bit_width, length); - let run = decoder.next().unwrap(); + let result = decoder.collect::>(); - if let HybridEncoded::Bitpacked(values) = run { - assert_eq!(values, &[0b00001011]); - let result = - bitpacking::Decoder::new(values, bit_width as u8, length).collect::>(); - assert_eq!(result, &[1, 1, 0, 1, 0]); - } else { - panic!() - }; + assert_eq!(result, expected); } #[test] @@ -74,23 +103,18 @@ mod tests { // This test was validated by the result of what pyarrow3 outputs when // the bitmap is used. let bit_width = 1; + let length = 10; let values = vec![ 3, 0, 0, 0, // length 0b00000101, 0b11101011, 0b00000010, // data ]; - let expected = &[1, 1, 0, 1, 0, 1, 1, 1, 0, 1]; + let expected = vec![1, 1, 0, 1, 0, 1, 1, 1, 0, 1]; - let mut decoder = Decoder::new(&values[4..4 + 3], bit_width); + let decoder = Decoder::new(&values[4..4 + 3], bit_width, length); - let run = decoder.next().unwrap(); + let result = decoder.collect::>(); - if let HybridEncoded::Bitpacked(values) = run { - assert_eq!(values, &[0b11101011, 0b00000010]); - let result = bitpacking::Decoder::new(values, bit_width as u8, 10).collect::>(); - assert_eq!(result, expected); - } else { - panic!() - }; + assert_eq!(result, expected); } #[test] @@ -102,16 +126,30 @@ mod tests { 0b00010000, // data 0b00000001, ]; + let expected = vec![1_u32; length]; - let mut decoder = Decoder::new(&values[4..4 + 2], bit_width); + let decoder = Decoder::new(&values[4..4 + 2], bit_width, length); - let run = decoder.next().unwrap(); + let result = decoder.collect::>(); - if let HybridEncoded::Rle(values, items) = run { - assert_eq!(values, &[0b00000001]); - assert_eq!(items, length); - } else { - panic!() - }; + assert_eq!(result, expected); + } + + #[test] + fn rle_and_bit_packed() { + let bit_width = 1; + let length = 8; + let values = vec![ + 4, 0, 0, 0, // length + 0b00001000, // data + 0b00000001, 0b00000011, 0b00001010, + ]; + let expected = vec![1, 1, 1, 1, 0, 1, 0, 1]; + + let decoder = Decoder::new(&values[4..4 + 4], bit_width, length); + + let result = decoder.collect::>(); + + assert_eq!(result, expected); } } diff --git a/src/encoding/hybrid_rle/mod.rs b/src/encoding/hybrid_rle/mod.rs index 2869e5d85..60e642934 100644 --- a/src/encoding/hybrid_rle/mod.rs +++ b/src/encoding/hybrid_rle/mod.rs @@ -8,9 +8,51 @@ pub use encoder::encode; #[derive(Debug, PartialEq, Eq)] pub enum HybridEncoded<'a> { - /// A bitpacked slice. The consumer must know its bit-width to unpack it. - Bitpacked(&'a [u8]), - /// A RLE-encoded slice. The first attribute corresponds to the slice (that can be interpreted) - /// the second attribute corresponds to the number of repetitions. - Rle(&'a [u8], usize), + /// A bitpacked slice. + Bitpacked { + compressed: &'a [u8], + num_bits: usize, + run_length: usize, + }, + /// A RLE-encoded slice. + Rle { value: u32, run_length: usize }, +} + +impl<'a> IntoIterator for HybridEncoded<'a> { + type Item = u32; + type IntoIter = RunIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + match self { + HybridEncoded::Bitpacked { + compressed, + num_bits, + run_length, + } => RunIterator::Bitpacked(super::bitpacking::Decoder::new( + compressed, + num_bits as u8, + run_length, + )), + HybridEncoded::Rle { + value, + run_length: len, + } => RunIterator::Rle(std::iter::repeat(value).take(len)), + } + } +} + +pub enum RunIterator<'a> { + Bitpacked(super::bitpacking::Decoder<'a>), + Rle(std::iter::Take>), +} + +impl<'a> Iterator for RunIterator<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + match self { + RunIterator::Bitpacked(delegate) => delegate.next(), + RunIterator::Rle(delegate) => delegate.next(), + } + } } diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 179750087..9e8571c07 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -5,6 +5,7 @@ pub mod delta_bitpacked; pub mod delta_byte_array; pub mod delta_length_byte_array; pub mod hybrid_rle; +pub mod plain_byte_array; pub mod uleb128; pub mod zigzag_leb128; diff --git a/src/encoding/plain_byte_array/decoder.rs b/src/encoding/plain_byte_array/decoder.rs new file mode 100644 index 000000000..f2ba32f3c --- /dev/null +++ b/src/encoding/plain_byte_array/decoder.rs @@ -0,0 +1,37 @@ +use crate::encoding::get_length; + +/// Decodes according to [Plain strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0), +/// prefixes, lengths and values +/// # Implementation +/// This struct does not allocate on the heap. +#[derive(Debug)] +pub struct Decoder<'a> { + values: &'a [u8], + index: usize, +} + +impl<'a> Decoder<'a> { + pub fn new(values: &'a [u8]) -> Self { + Self { values, index: 0 } + } +} + +impl<'a> Iterator for Decoder<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option { + let values = self.values; + let index = self.index; + if index + 4 < values.len() { + let next_len = get_length(values) as usize; + let next_index = index + 4 + next_len; + + let result = Some(&values[index + 4..next_index]); + self.index = next_index; + + result + } else { + None + } + } +} diff --git a/src/encoding/plain_byte_array/mod.rs b/src/encoding/plain_byte_array/mod.rs new file mode 100644 index 000000000..32f89d6db --- /dev/null +++ b/src/encoding/plain_byte_array/mod.rs @@ -0,0 +1,3 @@ +mod decoder; + +pub use decoder::Decoder; diff --git a/src/serialization/read/binary.rs b/src/serialization/read/binary.rs index b6dd3d30c..786d97aed 100644 --- a/src/serialization/read/binary.rs +++ b/src/serialization/read/binary.rs @@ -4,8 +4,9 @@ use super::levels::consume_level; use crate::error::{ParquetError, Result}; use crate::metadata::ColumnDescriptor; use crate::read::Page; +use crate::serialization::read::utils::ValuesDef; use crate::{ - encoding::{bitpacking, uleb128}, + encoding::{hybrid_rle, plain_byte_array}, read::BinaryPageDict, }; @@ -18,24 +19,27 @@ fn read_dict_buffer( let dict_values = dict.values(); let dict_offsets = dict.offsets(); - let (values, _) = consume_level(values, length, def_level_encoding); + let (values, def_levels) = consume_level(values, length, def_level_encoding); let bit_width = values[0]; let values = &values[1..]; - let (_, consumed) = uleb128::decode(&values); - let values = &values[consumed..]; + let indices = hybrid_rle::Decoder::new(values, bit_width as u32, length as usize); - let indices = bitpacking::Decoder::new(values, bit_width, length as usize); + let decoded_values = indices.map(|id| { + let id = id as usize; + let start = dict_offsets[id] as usize; + let end = dict_offsets[id + 1] as usize; - indices - .map(|id| { - let id = id as usize; - let start = dict_offsets[id] as usize; - let end = dict_offsets[id + 1] as usize; - Some(dict_values[start..end].to_vec()) - }) - .collect() + dict_values[start..end].to_vec() + }); + + ValuesDef::new( + decoded_values, + def_levels.into_iter(), + def_level_encoding.1 as u32, + ) + .collect() } pub fn page_dict_to_vec( @@ -62,3 +66,38 @@ pub fn page_dict_to_vec( _ => todo!(), } } + +fn read_plain_buffer( + values: &[u8], + length: u32, + def_level_encoding: (&Encoding, i16), +) -> Vec>> { + let (values, def_levels) = consume_level(values, length, def_level_encoding); + + let decoded_values = plain_byte_array::Decoder::new(values).map(|bytes| bytes.to_vec()); + + ValuesDef::new( + decoded_values, + def_levels.into_iter(), + def_level_encoding.1 as u32, + ) + .collect() +} + +pub fn page_to_vec(page: &Page, descriptor: &ColumnDescriptor) -> Result>>> { + assert_eq!(descriptor.max_rep_level(), 0); + match page { + Page::V1(page) => match (&page.header.encoding, &page.dictionary_page) { + (Encoding::Plain, None) => Ok(read_plain_buffer( + &page.buffer, + page.header.num_values as u32, + ( + &page.header.definition_level_encoding, + descriptor.max_def_level(), + ), + )), + _ => todo!(), + }, + _ => todo!(), + } +} diff --git a/src/serialization/read/levels.rs b/src/serialization/read/levels.rs index ebdeee2ff..f37b6d36f 100644 --- a/src/serialization/read/levels.rs +++ b/src/serialization/read/levels.rs @@ -1,6 +1,6 @@ use parquet_format::Encoding; -use crate::encoding::{bitpacking, get_length, hybrid_rle, log2}; +use crate::encoding::{get_length, hybrid_rle, log2}; #[inline] fn get_bit_width(max_level: i16) -> u32 { @@ -28,11 +28,9 @@ pub fn decode(values: &[u8], length: u32, encoding: (&Encoding, i16)) -> Vec vec![0; length as usize], // no levels => required => all zero (Encoding::Rle, max_length) => { let bit_width = get_bit_width(max_length); - rle_decode( - &values[4..4 + get_length(values) as usize], - bit_width as u32, - length, - ) + let values = &values[4..4 + get_length(values) as usize]; + let num_bits = bit_width as u32; + hybrid_rle::Decoder::new(values, num_bits, length as usize).collect() } (Encoding::BitPacked, _) => { todo!() @@ -41,36 +39,6 @@ pub fn decode(values: &[u8], length: u32, encoding: (&Encoding, i16)) -> Vec Vec { - let length = length as usize; - let runner = hybrid_rle::Decoder::new(&values, num_bits); - - let mut values = Vec::with_capacity(length); - runner.for_each(|run| match run { - hybrid_rle::HybridEncoded::Bitpacked(compressed) => { - let previous_len = values.len(); - let pack_length = (compressed.len() as usize / num_bits as usize) * 8; - let additional = std::cmp::min(previous_len + pack_length, length - previous_len); - values.extend(bitpacking::Decoder::new( - compressed, - num_bits as u8, - additional, - )); - debug_assert_eq!(previous_len + additional, values.len()); - } - hybrid_rle::HybridEncoded::Rle(pack, items) => { - let mut bytes = [0u8; std::mem::size_of::()]; - pack.iter() - .enumerate() - .for_each(|(i, byte)| bytes[i] = *byte); - let value = u32::from_le_bytes(bytes); - values.extend(std::iter::repeat(value).take(items)) - } - }); - values -} - pub fn consume_level<'a>( values: &'a [u8], length: u32, diff --git a/src/serialization/read/mod.rs b/src/serialization/read/mod.rs index 351f80c17..ae3d014ba 100644 --- a/src/serialization/read/mod.rs +++ b/src/serialization/read/mod.rs @@ -88,6 +88,9 @@ pub fn page_to_array(page: CompressedPage, descriptor: &ColumnDescriptor) -> Res PhysicalType::Double => { Ok(Array::Float64(primitive::page_to_vec(&page, descriptor)?)) } + PhysicalType::ByteArray => { + Ok(Array::Binary(binary::page_to_vec(&page, descriptor)?)) + } _ => todo!(), }, }, diff --git a/src/serialization/read/primitive_nested.rs b/src/serialization/read/primitive_nested.rs index 67fbcc970..a2dffa495 100644 --- a/src/serialization/read/primitive_nested.rs +++ b/src/serialization/read/primitive_nested.rs @@ -35,8 +35,8 @@ fn compose_array, F: Iterator, G: Iterator= 1); + let mut first = true; rep_levels .into_iter() .zip(def_levels.into_iter()) @@ -44,21 +44,21 @@ fn compose_array, F: Iterator, G: Iterator {} 0 => { - if prev_def > 1 { + if !first { let old = std::mem::take(&mut inner); outer.push(Some(Array::Int64(old))); } } _ => unreachable!(), } - match def { - 3 => inner.push(Some(values.next().unwrap())), - 2 => inner.push(None), - 1 => outer.push(Some(Array::Int64(vec![]))), - 0 => outer.push(None), + match max_def - def { + 0 => inner.push(Some(values.next().unwrap())), + 1 => inner.push(None), + 2 => outer.push(Some(Array::Int64(vec![]))), + 3 => outer.push(None), _ => unreachable!(), } - prev_def = def; + first = false; }); outer.push(Some(Array::Int64(inner))); Array::List(outer)