Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
DRY parquet reading (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 16, 2022
1 parent 1f9c94d commit 6eb0c1d
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 307 deletions.
78 changes: 39 additions & 39 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,61 +72,40 @@ impl<'a> Required<'a> {
}
}

#[inline]
fn values_iter1<'a>(
indices_buffer: &'a [u8],
dict: &'a BinaryPageDict,
additional: usize,
) -> std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>> {
let dict_values = dict.values();
let dict_offsets = dict.offsets();

let op = Box::new(move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
}) as _;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);
indices.map(op)
}

struct RequiredDictionary<'a> {
pub values: std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub remaining: usize,
pub dict: &'a BinaryPageDict,
}

impl<'a> RequiredDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = values_iter1(page.buffer(), dict, page.num_values());
let values = utils::dict_indices_decoder(page.buffer(), page.num_values());

Self {
values,
remaining: page.num_values(),
dict,
}
}
}

struct OptionalDictionary<'a> {
values: std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>>,
values: hybrid_rle::HybridRleDecoder<'a>,
validity: OptionalPageValidity<'a>,
dict: &'a BinaryPageDict,
}

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(values_buffer, dict, page.num_values());
let values = utils::dict_indices_decoder(indices_buffer, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(page),
dict,
}
}
}
Expand Down Expand Up @@ -208,7 +187,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
)))
}
(Encoding::Plain, _, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
let (_, _, values) = utils::split_buffer(page);

let values = BinaryIter::new(values);

Expand All @@ -230,6 +209,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
Expand All @@ -249,16 +229,36 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
values.push(x)
}
}
State::OptionalDictionary(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(additional),
values,
&mut page.values,
),
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
extend_from_decoder(
validity,
&mut page.validity,
Some(additional),
values,
&mut page.values.by_ref().map(op),
)
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};

page.remaining = page.remaining.saturating_sub(additional);
for x in page.values.by_ref().take(additional) {
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
let (_, _, values) = utils::split_buffer(page);

let values = utils::BinaryIter::new(values);

Expand All @@ -69,6 +69,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
Expand Down
15 changes: 3 additions & 12 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ use super::super::utils::{
};
use super::super::DataPages;

#[inline]
pub(super) fn values_iter(values: &[u8]) -> BitmapIter {
// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
// note that `values_buffer` contains only non-null values.
// thus, at this point, it is not known how many values this buffer contains
// values_len is the upper bound. The actual number depends on how many nulls there is.
let values_len = values.len() * 8;
BitmapIter::new(values, 0, values_len)
}

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
Expand All @@ -34,10 +24,10 @@ struct Optional<'a> {

impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values_buffer, _) = split_buffer(page, page.descriptor());
let (_, _, values_buffer) = split_buffer(page);

Self {
values: values_iter(values_buffer),
values: BitmapIter::new(values_buffer, 0, values_buffer.len() * 8),
validity: OptionalPageValidity::new(page),
}
}
Expand Down Expand Up @@ -112,6 +102,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand Down
8 changes: 5 additions & 3 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use super::super::nested_utils::*;
use super::super::utils;
use super::super::utils::{Decoder, MaybeNext};
use super::super::DataPages;
use super::basic::values_iter;

// The state of a required DataPage with a boolean physical type
#[derive(Debug)]
Expand Down Expand Up @@ -69,8 +68,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
Ok(State::Optional(Optional::new(page), values_iter(values)))
let (_, _, values) = utils::split_buffer(page);
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(Optional::new(page), values))
}
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(
Expand All @@ -88,6 +89,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
}

fn extend_from_state(
&self,
state: &mut State,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand Down
18 changes: 3 additions & 15 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, Utf8Array},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::primitive::read_item,
};

use super::binary;
Expand Down Expand Up @@ -50,39 +49,34 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u8,
))),
UInt16 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u16,
))),
UInt32 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u32,
))),
Int8 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i8,
))),
Int16 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i16,
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32),
primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32),
)),

Timestamp(time_unit, None) => {
Expand All @@ -104,14 +98,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i128,
))),
PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i64| x as i128,
))),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
Expand Down Expand Up @@ -159,28 +151,25 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64),
primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x as i64),
)),
UInt64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i64| x as u64,
))),

Float32 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: f32| x,
))),
Float64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: f64| x,
))),

Expand Down Expand Up @@ -226,7 +215,6 @@ fn timestamp<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
int96_to_i64_ns,
))));
} else {
Expand All @@ -241,7 +229,7 @@ fn timestamp<'a, I: 'a + DataPages>(
));
}

let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x);
let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x);

let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type {
unit
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/read/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
K: DictionaryKey,
{
fn new(page: &'a DataPage) -> Self {
let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(indices_buffer, page.num_values());

Expand All @@ -79,7 +79,7 @@ where
K: DictionaryKey,
{
fn new(page: &'a DataPage) -> Self {
let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(indices_buffer, page.num_values());

Expand Down Expand Up @@ -154,6 +154,7 @@ where
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Vec<K>,
validity: &mut MutableBitmap,
Expand Down
Loading

0 comments on commit 6eb0c1d

Please sign in to comment.