Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

DRY parquet reading #845

Merged
merged 3 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 39 additions & 39 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,61 +72,40 @@ impl<'a> Required<'a> {
}
}

#[inline]
fn values_iter1<'a>(
indices_buffer: &'a [u8],
dict: &'a BinaryPageDict,
additional: usize,
) -> std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>> {
let dict_values = dict.values();
let dict_offsets = dict.offsets();

let op = Box::new(move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
}) as _;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional);
indices.map(op)
}

struct RequiredDictionary<'a> {
pub values: std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub remaining: usize,
pub dict: &'a BinaryPageDict,
}

impl<'a> RequiredDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = values_iter1(page.buffer(), dict, page.num_values());
let values = utils::dict_indices_decoder(page.buffer(), page.num_values());

Self {
values,
remaining: page.num_values(),
dict,
}
}
}

struct OptionalDictionary<'a> {
values: std::iter::Map<hybrid_rle::HybridRleDecoder<'a>, Box<dyn Fn(u32) -> &'a [u8] + 'a>>,
values: hybrid_rle::HybridRleDecoder<'a>,
validity: OptionalPageValidity<'a>,
dict: &'a BinaryPageDict,
}

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let (_, _, values_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(values_buffer, dict, page.num_values());
let values = utils::dict_indices_decoder(indices_buffer, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(page),
dict,
}
}
}
Expand Down Expand Up @@ -208,7 +187,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
)))
}
(Encoding::Plain, _, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
let (_, _, values) = utils::split_buffer(page);

let values = BinaryIter::new(values);

Expand All @@ -230,6 +209,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
Expand All @@ -249,16 +229,36 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
values.push(x)
}
}
State::OptionalDictionary(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(additional),
values,
&mut page.values,
),
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
extend_from_decoder(
validity,
&mut page.validity,
Some(additional),
values,
&mut page.values.by_ref().map(op),
)
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};

page.remaining = page.remaining.saturating_sub(additional);
for x in page.values.by_ref().take(additional) {
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
let (_, _, values) = utils::split_buffer(page);

let values = utils::BinaryIter::new(values);

Expand All @@ -69,6 +69,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
Expand Down
15 changes: 3 additions & 12 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ use super::super::utils::{
};
use super::super::DataPages;

#[inline]
pub(super) fn values_iter(values: &[u8]) -> BitmapIter {
// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
// note that `values_buffer` contains only non-null values.
// thus, at this point, it is not known how many values this buffer contains
// values_len is the upper bound. The actual number depends on how many nulls there is.
let values_len = values.len() * 8;
BitmapIter::new(values, 0, values_len)
}

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
Expand All @@ -34,10 +24,10 @@ struct Optional<'a> {

impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values_buffer, _) = split_buffer(page, page.descriptor());
let (_, _, values_buffer) = split_buffer(page);

Self {
values: values_iter(values_buffer),
values: BitmapIter::new(values_buffer, 0, values_buffer.len() * 8),
validity: OptionalPageValidity::new(page),
}
}
Expand Down Expand Up @@ -112,6 +102,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand Down
8 changes: 5 additions & 3 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use super::super::nested_utils::*;
use super::super::utils;
use super::super::utils::{Decoder, MaybeNext};
use super::super::DataPages;
use super::basic::values_iter;

// The state of a required DataPage with a boolean physical type
#[derive(Debug)]
Expand Down Expand Up @@ -69,8 +68,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
Ok(State::Optional(Optional::new(page), values_iter(values)))
let (_, _, values) = utils::split_buffer(page);
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(Optional::new(page), values))
}
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(
Expand All @@ -88,6 +89,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
}

fn extend_from_state(
&self,
state: &mut State,
values: &mut MutableBitmap,
validity: &mut MutableBitmap,
Expand Down
18 changes: 3 additions & 15 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, Utf8Array},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::primitive::read_item,
};

use super::binary;
Expand Down Expand Up @@ -50,39 +49,34 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u8,
))),
UInt16 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u16,
))),
UInt32 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as u32,
))),
Int8 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i8,
))),
Int16 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i16,
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32),
primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32),
)),

Timestamp(time_unit, None) => {
Expand All @@ -104,14 +98,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
|x: i32| x as i128,
))),
PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i64| x as i128,
))),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
Expand Down Expand Up @@ -159,28 +151,25 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64),
primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x as i64),
)),
UInt64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: i64| x as u64,
))),

Float32 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: f32| x,
))),
Float64 => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
|x: f64| x,
))),

Expand Down Expand Up @@ -226,7 +215,6 @@ fn timestamp<'a, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
read_item,
int96_to_i64_ns,
))));
} else {
Expand All @@ -241,7 +229,7 @@ fn timestamp<'a, I: 'a + DataPages>(
));
}

let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x);
let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x);

let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type {
unit
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/read/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where
K: DictionaryKey,
{
fn new(page: &'a DataPage) -> Self {
let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(indices_buffer, page.num_values());

Expand All @@ -79,7 +79,7 @@ where
K: DictionaryKey,
{
fn new(page: &'a DataPage) -> Self {
let (_, _, indices_buffer, _) = utils::split_buffer(page, page.descriptor());
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = values_iter1(indices_buffer, page.num_values());

Expand Down Expand Up @@ -154,6 +154,7 @@ where
}

fn extend_from_state(
&self,
state: &mut Self::State,
values: &mut Vec<K>,
validity: &mut MutableBitmap,
Expand Down
Loading