Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for dictionary encodings and nested values #7

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 77 additions & 39 deletions src/encoding/hybrid_rle/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,75 @@ use super::{super::ceil8, HybridEncoded};

/// An iterator that, given a slice of bytes, returns `HybridEncoded`
pub struct Decoder<'a> {
values: &'a [u8],
num_bits: u32,
inner: std::iter::Flatten<HybridIterator<'a>>,
}

impl<'a> Decoder<'a> {
pub fn new(values: &'a [u8], num_bits: u32) -> Self {
Self { values, num_bits }
pub fn new(values: &'a [u8], num_bits: u32, length: usize) -> Self {
Self {
inner: HybridIterator {
values,
num_bits,
remaining: length,
}
.flatten(),
}
}
}

impl<'a> Iterator for Decoder<'a> {
type Item = HybridEncoded<'a>;
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}

pub struct HybridIterator<'a> {
values: &'a [u8],
num_bits: u32,
remaining: usize,
}

impl<'a> Iterator for HybridIterator<'a> {
type Item = HybridEncoded<'a>;

fn next(&mut self) -> Option<HybridEncoded<'a>> {
if self.values.is_empty() {
return None;
}
let (indicator, consumed) = uleb128::decode(self.values);
self.values = &self.values[consumed..];
if indicator & 1 == 1 {
// is bitpacking
let bytes = (indicator as usize >> 1) * self.num_bits as usize;
let result = Some(HybridEncoded::Bitpacked(&self.values[..bytes]));
self.values = &self.values[bytes..];
let num_bits = self.num_bits as usize;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes mostly correspond to what was previously in rle_decode in levels.rs

let num_bytes = (indicator as usize >> 1) * num_bits;
let length = (indicator as usize >> 1) * 8;
let run_length = std::cmp::min(length, self.remaining);
let result = Some(HybridEncoded::Bitpacked {
compressed: &self.values[..num_bytes],
num_bits,
run_length,
});
self.remaining -= run_length;
self.values = &self.values[num_bytes..];
result
} else {
// is rle
let run_length = indicator as usize >> 1;
// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
let rle_bytes = ceil8(self.num_bits as usize);
let result = Some(HybridEncoded::Rle(&self.values[..rle_bytes], run_length));

let pack = &self.values[0..rle_bytes];
let mut value_bytes = [0u8; std::mem::size_of::<u32>()];
pack.iter()
.enumerate()
.for_each(|(i, byte)| value_bytes[i] = *byte);
let value = u32::from_le_bytes(value_bytes);
Comment on lines +64 to +70
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the reason I do not want to push this here is that, in the arrow format, validity is in bytes as LSB, which is the same as parquet when rep level = 0. This particular change would require a round trip <u8 in LSB> -> i32 -> <u8 in LSB> for non-nested types, which has a (significant) performance hit.

This is why I tried to offer the "raw" bytes here, and provide a higher-end API based on i32 closer to the deserialization.

So, the idea was: if people want the raw stuff, use the decoder, else, use the rle_decode in levels.rs. However, I do agree that rle_decode is suboptimal as it allocates. What do you think about keeping this decoder as is, and instead have rle_decode implemented as returning an iterator? It may solve both of our use-cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about keeping this decoder as is, and instead have rle_decode implemented as returning an iterator? It may solve both of our use-cases?

That sounds like it should work, seems all users call into_iter on the rle decoded vec anyway. It should then even be possible to expose the iterator directly in the api instead of first creating a Vec<Option<Vec<u8>>> for example, whose contents then have to be copied again into the arrow format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I realized that arrow2 is using the lower level api I understand what you actually meant :)
Do you think the code in arrow2 could be simplified by exposing an iterator api in parquet2, without loosing performance?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we already have an iterator API, that returns either variant, RLE or Bitpacked. This is made so that arrow2 can use it without having to incur the cost of bit-unpacking the bitpacked to then compare an i32 to 1 (max def level).

There will be a performance degradation if we move to iterators over i32, which is why I was trying to suggest to keep the encoder as is, and offer a higher-end iterator to iterate over it with i32 (the level.rs).


let result = Some(HybridEncoded::Rle { value, run_length });
self.values = &self.values[rle_bytes..];
self.remaining -= run_length;
result
}
}
Expand All @@ -44,8 +81,6 @@ impl<'a> Iterator for Decoder<'a> {
mod tests {
use super::*;

use super::super::super::bitpacking;

#[test]
fn basics_1() {
let bit_width = 1;
Expand All @@ -54,43 +89,32 @@ mod tests {
2, 0, 0, 0, // length
0b00000011, 0b00001011, // data
];
let expected = vec![1, 1, 0, 1, 0];

let mut decoder = Decoder::new(&values[4..6], bit_width);
let decoder = Decoder::new(&values[4..6], bit_width, length);

let run = decoder.next().unwrap();
let result = decoder.collect::<Vec<_>>();

if let HybridEncoded::Bitpacked(values) = run {
assert_eq!(values, &[0b00001011]);
let result =
bitpacking::Decoder::new(values, bit_width as u8, length).collect::<Vec<_>>();
assert_eq!(result, &[1, 1, 0, 1, 0]);
} else {
panic!()
};
assert_eq!(result, expected);
}

#[test]
fn basics_2() {
// This test was validated by the result of what pyarrow3 outputs when
// the bitmap is used.
let bit_width = 1;
let length = 10;
let values = vec![
3, 0, 0, 0, // length
0b00000101, 0b11101011, 0b00000010, // data
];
let expected = &[1, 1, 0, 1, 0, 1, 1, 1, 0, 1];
let expected = vec![1, 1, 0, 1, 0, 1, 1, 1, 0, 1];

let mut decoder = Decoder::new(&values[4..4 + 3], bit_width);
let decoder = Decoder::new(&values[4..4 + 3], bit_width, length);

let run = decoder.next().unwrap();
let result = decoder.collect::<Vec<_>>();

if let HybridEncoded::Bitpacked(values) = run {
assert_eq!(values, &[0b11101011, 0b00000010]);
let result = bitpacking::Decoder::new(values, bit_width as u8, 10).collect::<Vec<_>>();
assert_eq!(result, expected);
} else {
panic!()
};
assert_eq!(result, expected);
}

#[test]
Expand All @@ -102,16 +126,30 @@ mod tests {
0b00010000, // data
0b00000001,
];
let expected = vec![1_u32; length];

let mut decoder = Decoder::new(&values[4..4 + 2], bit_width);
let decoder = Decoder::new(&values[4..4 + 2], bit_width, length);

let run = decoder.next().unwrap();
let result = decoder.collect::<Vec<_>>();

if let HybridEncoded::Rle(values, items) = run {
assert_eq!(values, &[0b00000001]);
assert_eq!(items, length);
} else {
panic!()
};
assert_eq!(result, expected);
}

#[test]
fn rle_and_bit_packed() {
let bit_width = 1;
let length = 8;
let values = vec![
4, 0, 0, 0, // length
0b00001000, // data
0b00000001, 0b00000011, 0b00001010,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values are according to my understanding of the encoding, I did not yet validate them

];
let expected = vec![1, 1, 1, 1, 0, 1, 0, 1];

let decoder = Decoder::new(&values[4..4 + 4], bit_width, length);

let result = decoder.collect::<Vec<_>>();

assert_eq!(result, expected);
}
}
52 changes: 47 additions & 5 deletions src/encoding/hybrid_rle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,51 @@ pub use encoder::encode;

#[derive(Debug, PartialEq, Eq)]
pub enum HybridEncoded<'a> {
/// A bitpacked slice. The consumer must know its bit-width to unpack it.
Bitpacked(&'a [u8]),
/// A RLE-encoded slice. The first attribute corresponds to the slice (that can be interpreted)
/// the second attribute corresponds to the number of repetitions.
Rle(&'a [u8], usize),
/// A bitpacked slice.
Bitpacked {
compressed: &'a [u8],
num_bits: usize,
run_length: usize,
},
/// A RLE-encoded slice.
Rle { value: u32, run_length: usize },
}

impl<'a> IntoIterator for HybridEncoded<'a> {
type Item = u32;
type IntoIter = RunIterator<'a>;

fn into_iter(self) -> Self::IntoIter {
match self {
HybridEncoded::Bitpacked {
compressed,
num_bits,
run_length,
} => RunIterator::Bitpacked(super::bitpacking::Decoder::new(
compressed,
num_bits as u8,
run_length,
)),
HybridEncoded::Rle {
value,
run_length: len,
} => RunIterator::Rle(std::iter::repeat(value).take(len)),
}
}
}

pub enum RunIterator<'a> {
Bitpacked(super::bitpacking::Decoder<'a>),
Rle(std::iter::Take<std::iter::Repeat<u32>>),
}

impl<'a> Iterator for RunIterator<'a> {
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
match self {
RunIterator::Bitpacked(delegate) => delegate.next(),
RunIterator::Rle(delegate) => delegate.next(),
}
}
}
1 change: 1 addition & 0 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod delta_bitpacked;
pub mod delta_byte_array;
pub mod delta_length_byte_array;
pub mod hybrid_rle;
pub mod plain_byte_array;
pub mod uleb128;
pub mod zigzag_leb128;

Expand Down
37 changes: 37 additions & 0 deletions src/encoding/plain_byte_array/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::encoding::get_length;

/// Decodes according to [Plain strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#plain-plain--0),
/// prefixes, lengths and values
/// # Implementation
/// This struct does not allocate on the heap.
#[derive(Debug)]
pub struct Decoder<'a> {
values: &'a [u8],
index: usize,
}

impl<'a> Decoder<'a> {
pub fn new(values: &'a [u8]) -> Self {
Self { values, index: 0 }
}
}

impl<'a> Iterator for Decoder<'a> {
type Item = &'a [u8];

fn next(&mut self) -> Option<Self::Item> {
let values = self.values;
let index = self.index;
if index + 4 < values.len() {
let next_len = get_length(values) as usize;
let next_index = index + 4 + next_len;

let result = Some(&values[index + 4..next_index]);
self.index = next_index;

result
} else {
None
}
}
}
3 changes: 3 additions & 0 deletions src/encoding/plain_byte_array/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod decoder;

pub use decoder::Decoder;
65 changes: 52 additions & 13 deletions src/serialization/read/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use super::levels::consume_level;
use crate::error::{ParquetError, Result};
use crate::metadata::ColumnDescriptor;
use crate::read::Page;
use crate::serialization::read::utils::ValuesDef;
use crate::{
encoding::{bitpacking, uleb128},
encoding::{hybrid_rle, plain_byte_array},
read::BinaryPageDict,
};

Expand All @@ -18,24 +19,27 @@ fn read_dict_buffer(
let dict_values = dict.values();
let dict_offsets = dict.offsets();

let (values, _) = consume_level(values, length, def_level_encoding);
let (values, def_levels) = consume_level(values, length, def_level_encoding);

let bit_width = values[0];
let values = &values[1..];

let (_, consumed) = uleb128::decode(&values);
let values = &values[consumed..];
let indices = hybrid_rle::Decoder::new(values, bit_width as u32, length as usize);

let indices = bitpacking::Decoder::new(values, bit_width, length as usize);
let decoded_values = indices.map(|id| {
let id = id as usize;
let start = dict_offsets[id] as usize;
let end = dict_offsets[id + 1] as usize;

indices
.map(|id| {
let id = id as usize;
let start = dict_offsets[id] as usize;
let end = dict_offsets[id + 1] as usize;
Some(dict_values[start..end].to_vec())
})
.collect()
dict_values[start..end].to_vec()
});

ValuesDef::new(
decoded_values,
def_levels.into_iter(),
def_level_encoding.1 as u32,
)
.collect()
}

pub fn page_dict_to_vec(
Expand All @@ -62,3 +66,38 @@ pub fn page_dict_to_vec(
_ => todo!(),
}
}

fn read_plain_buffer(
values: &[u8],
length: u32,
def_level_encoding: (&Encoding, i16),
) -> Vec<Option<Vec<u8>>> {
let (values, def_levels) = consume_level(values, length, def_level_encoding);

let decoded_values = plain_byte_array::Decoder::new(values).map(|bytes| bytes.to_vec());

ValuesDef::new(
decoded_values,
def_levels.into_iter(),
def_level_encoding.1 as u32,
)
.collect()
}

pub fn page_to_vec(page: &Page, descriptor: &ColumnDescriptor) -> Result<Vec<Option<Vec<u8>>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page {
Page::V1(page) => match (&page.header.encoding, &page.dictionary_page) {
(Encoding::Plain, None) => Ok(read_plain_buffer(
&page.buffer,
page.header.num_values as u32,
(
&page.header.definition_level_encoding,
descriptor.max_def_level(),
),
)),
_ => todo!(),
},
_ => todo!(),
}
}
Loading