Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
DRY parquet module (#785)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jan 22, 2022
1 parent bdefacb commit 684c259
Show file tree
Hide file tree
Showing 12 changed files with 672 additions and 756 deletions.
284 changes: 109 additions & 175 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,35 @@ use parquet2::{

use crate::{
array::Offset,
bitmap::{utils::BitmapIter, MutableBitmap},
bitmap::MutableBitmap,
error::Result,
io::parquet::read::utils::{extend_from_decoder, Pushable},
};

use super::super::utils;
use super::{super::utils, utils::Binary};

#[inline]
fn values_iter<'a>(
indices_buffer: &'a [u8],
dict: &'a BinaryPageDict,
additional: usize,
) -> impl Iterator<Item = &'a [u8]> + 'a {
let dict_values = dict.values();
let dict_offsets = dict.offsets();

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);
indices.map(move |index| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
})
}

/// Assumptions: No rep levels
#[allow(clippy::too_many_arguments)]
Expand All @@ -19,142 +43,96 @@ fn read_dict_buffer<O: Offset>(
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = (offsets.len() - 1) + additional;
let dict_values = dict.values();
let dict_offsets = dict.offsets();
let mut last_offset = *offsets.as_mut_slice().last().unwrap();
let length = values.len() + additional;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];
let values_iterator = values_iter(indices_buffer, dict, additional);

let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length);
let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

for run in validity_iterator {
match run {
hybrid_rle::HybridEncoded::Bitpacked(packed) => {
let remaining = length - (offsets.len() - 1);
let len = std::cmp::min(packed.len() * 8, remaining);
for is_valid in BitmapIter::new(packed, 0, len) {
if is_valid {
let index = indices.next().unwrap() as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
let length = dict_offset_ip1 - dict_offset_i;
last_offset += O::from_usize(length).unwrap();
values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]);
};
offsets.push(last_offset);
}
validity.extend_from_slice(packed, 0, len);
}
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| {
let index = indices.next().unwrap() as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
let length = dict_offset_ip1 - dict_offset_i;
last_offset += O::from_usize(length).unwrap();
offsets.push(last_offset);
values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]);
})
} else {
offsets.resize(offsets.len() + additional, last_offset);
}
}
}
}
extend_from_decoder(
validity,
&mut validity_iterator,
length,
values,
values_iterator,
);
}

#[allow(clippy::too_many_arguments)]
fn read_dict_required<O: Offset>(
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let dict_values = dict.values();
let dict_offsets = dict.offsets();
let mut last_offset = *offsets.as_mut_slice().last().unwrap();
let values_iterator = values_iter(indices_buffer, dict, additional);

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];
for value in values_iterator {
values.push(value);
}
validity.extend_constant(additional, true);
}

let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);
struct Offsets<'a, O: Offset>(pub &'a mut Vec<O>);

for index in indices {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
let length = dict_offset_ip1 - dict_offset_i;
last_offset += O::from_usize(length).unwrap();
offsets.push(last_offset);
values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]);
impl<'a, O: Offset> Pushable<O> for Offsets<'a, O> {
#[inline]
fn reserve(&mut self, additional: usize) {
self.0.reserve(additional)
}

#[inline]
fn push(&mut self, value: O) {
self.0.push(value)
}

#[inline]
fn push_null(&mut self) {
self.0.push(*self.0.last().unwrap())
}

#[inline]
fn extend_constant(&mut self, additional: usize, value: O) {
self.0.extend_constant(additional, value)
}
validity.extend_constant(additional, true);
}

fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = (offsets.len() - 1) + additional;
let mut last_offset = *offsets.as_mut_slice().last().unwrap();
let length = values.len() + additional;

let Binary {
offsets,
values,
last_offset,
} = values;

// values_buffer: first 4 bytes are len, remaining is values
let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer);
let offsets_iterator = values_iterator.by_ref().map(|x| {
*last_offset += O::from_usize(x as usize).unwrap();
*last_offset
});

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);
let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

// offsets:
for run in validity_iterator {
match run {
hybrid_rle::HybridEncoded::Bitpacked(packed) => {
// the pack may contain more items than needed.
let remaining = length - (offsets.len() - 1);
let len = std::cmp::min(packed.len() * 8, remaining);
for is_valid in BitmapIter::new(packed, 0, len) {
if is_valid {
let value = values_iterator.next().unwrap() as usize;
last_offset += O::from_usize(value).unwrap();
}
offsets.push(last_offset);
}
validity.extend_from_slice(packed, 0, len);
}
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| {
let value = values_iterator.next().unwrap() as usize;
last_offset += O::from_usize(value).unwrap();
offsets.push(last_offset);
})
} else {
offsets.resize(offsets.len() + additional, last_offset);
}
}
}
}
extend_from_decoder(
validity,
&mut validity_iterator,
length,
&mut Offsets::<O>(offsets),
offsets_iterator,
);

// values:
let new_values = values_iterator.into_values();
Expand All @@ -165,78 +143,46 @@ fn read_plain_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let length = (offsets.len() - 1) + additional;
let mut last_offset = *offsets.as_mut_slice().last().unwrap();
let length = values.len() + additional;

// values_buffer: first 4 bytes are len, remaining is values
let mut values_iterator = utils::BinaryIter::new(values_buffer);
let values_iterator = utils::BinaryIter::new(values_buffer);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);
let mut validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

for run in validity_iterator {
match run {
hybrid_rle::HybridEncoded::Bitpacked(packed) => {
// the pack may contain more items than needed.
let remaining = length - (offsets.len() - 1);
let len = std::cmp::min(packed.len() * 8, remaining);
for is_valid in BitmapIter::new(packed, 0, len) {
if is_valid {
let value = values_iterator.next().unwrap();
last_offset += O::from_usize(value.len()).unwrap();
values.extend_from_slice(value);
}
offsets.push(last_offset);
}
validity.extend_from_slice(packed, 0, len);
}
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| {
let value = values_iterator.next().unwrap();
last_offset += O::from_usize(value.len()).unwrap();
offsets.push(last_offset);
values.extend_from_slice(value)
})
} else {
offsets.resize(offsets.len() + additional, last_offset);
}
}
}
}
extend_from_decoder(
validity,
&mut validity_iterator,
length,
values,
values_iterator,
)
}

pub(super) fn read_plain_required<O: Offset>(
buffer: &[u8],
additional: usize,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
) {
let mut last_offset = *offsets.as_mut_slice().last().unwrap();

let values_iterator = utils::BinaryIter::new(buffer);

// each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly.
values.reserve(buffer.len() - 4 * additional);
let a = values.capacity();
values.offsets.reserve(additional);
values.values.reserve(buffer.len() - 4 * additional);
let a = values.values.capacity();
for value in values_iterator {
last_offset += O::from_usize(value.len()).unwrap();
values.extend_from_slice(value);
offsets.push(last_offset);
values.push(value);
}
debug_assert_eq!(a, values.capacity());
debug_assert_eq!(a, values.values.capacity());
}

pub(super) fn extend_from_page<O: Offset>(
page: &DataPage,
descriptor: &ColumnDescriptor,
offsets: &mut Vec<O>,
values: &mut Vec<u8>,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) -> Result<()> {
let additional = page.num_values();
Expand All @@ -253,7 +199,6 @@ pub(super) fn extend_from_page<O: Offset>(
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
Expand All @@ -263,29 +208,18 @@ pub(super) fn extend_from_page<O: Offset>(
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
offsets,
values,
validity,
)
}
(Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::<O>(
validity_buffer,
values_buffer,
additional,
offsets,
values,
validity,
),
(Encoding::Plain, _, true) => read_plain_optional::<O>(
validity_buffer,
values_buffer,
additional,
offsets,
values,
validity,
),
(Encoding::DeltaLengthByteArray, None, true) => {
read_delta_optional::<O>(validity_buffer, values_buffer, additional, values, validity)
}
(Encoding::Plain, _, true) => {
read_plain_optional::<O>(validity_buffer, values_buffer, additional, values, validity)
}
(Encoding::Plain, _, false) => {
read_plain_required::<O>(page.buffer(), page.num_values(), offsets, values)
read_plain_required::<O>(page.buffer(), page.num_values(), values)
}
_ => {
return Err(utils::not_implemented(
Expand Down
Loading

0 comments on commit 684c259

Please sign in to comment.