Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 29, 2022
1 parent eeb3b1c commit 9e96c9e
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 127 deletions.
19 changes: 11 additions & 8 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ use super::super::utils::{
};
use super::super::DataPages;

#[inline]
pub(super) fn values_iter(values: &[u8]) -> BitmapIter {
// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
// note that `values_buffer` contains only non-null values.
// thus, at this point, it is not known how many values this buffer contains
// values_len is the upper bound. The actual number depends on how many nulls there is.
let values_len = values.len() * 8;
BitmapIter::new(values, 0, values_len)
}

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
Expand All @@ -26,15 +36,8 @@ impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, validity_buffer, values_buffer, _) = split_buffer(page, page.descriptor());

// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
// note that `values_buffer` contains only non-null values.
// thus, at this point, it is not known how many values this buffer contains
// values_len is the upper bound. The actual number depends on how many nulls there is.
let values_len = values_buffer.len() * 8;
let values = BitmapIter::new(values_buffer, 0, values_len);

Self {
values,
values: values_iter(values_buffer),
validity: OptionalPageValidity::new(validity_buffer, page.num_values()),
}
}
Expand Down
150 changes: 32 additions & 118 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,19 @@
use std::collections::VecDeque;

use parquet2::{
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
page::DataPage,
read::levels::get_bit_width,
schema::Repetition,
};
use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition};

use crate::{
array::BooleanArray,
bitmap::{utils::BitmapIter, MutableBitmap},
datatypes::{DataType, Field},
error::Result,
io::parquet::read::{utils::Decoder, DataPages},
};

use super::super::nested_utils::*;
use super::super::utils;

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
values: BitmapIter<'a>,
definition_levels: HybridRleDecoder<'a>,
max_def: u32,
}

impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, def_levels, values_buffer, _) = utils::split_buffer(page, page.descriptor());

// in PLAIN, booleans are LSB bitpacked and thus we can read them as if they were a bitmap.
// note that `values_buffer` contains only non-null values.
// thus, at this point, it is not known how many values this buffer contains
// values_len is the upper bound. The actual number depends on how many nulls there is.
let values_len = values_buffer.len() * 8;
let values = BitmapIter::new(values_buffer, 0, values_len);

let max_def = page.descriptor().max_def_level();

Self {
values,
definition_levels: HybridRleDecoder::new(
def_levels,
get_bit_width(max_def),
page.num_values(),
),
max_def: max_def as u32,
}
}
}
use super::super::utils::{Decoder, MaybeNext};
use super::super::DataPages;
use super::basic::values_iter;

// The state of a required DataPage with a boolean physical type
#[derive(Debug)]
Expand All @@ -74,14 +38,14 @@ impl<'a> Required<'a> {
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>),
Optional(Optional<'a>, BitmapIter<'a>),
Required(Required<'a>),
}

impl<'a> State<'a> {
pub fn len(&self) -> usize {
match self {
State::Optional(page) => page.definition_levels.size_hint().0,
State::Optional(optional, _) => optional.len(),
State::Required(page) => page.length - page.offset,
}
}
Expand All @@ -104,7 +68,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))),
(Encoding::Plain, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());
Ok(State::Optional(Optional::new(page), values_iter(values)))
}
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(
&page.encoding(),
Expand All @@ -127,10 +94,10 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
required: usize,
) {
match state {
State::Optional(page) => read_optional_values(
page.definition_levels.by_ref(),
page.max_def,
page.values.by_ref(),
State::Optional(page_validity, page_values) => read_optional_values(
page_validity.definition_levels.by_ref(),
page_validity.max_def(),
page_values.by_ref(),
values,
validity,
required,
Expand Down Expand Up @@ -166,82 +133,29 @@ impl<I: DataPages> ArrayIterator<I> {
}
}

fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) -> BooleanArray {
BooleanArray::from_data(data_type.clone(), values.into(), validity.into())
}

impl<I: DataPages> Iterator for ArrayIterator<I> {
type Item = Result<(NestedState, BooleanArray)>;

fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
let nested = self.nested.pop_back().unwrap();
let (values, validity) = self.items.pop_back().unwrap();
let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into());
return Some(Ok((nested, array)));
}
match (
self.nested.pop_back(),
self.items.pop_back(),
self.iter.next(),
) {
(_, _, Err(e)) => Some(Err(e.into())),
(None, None, Ok(None)) => None,
(state, p_state, Ok(Some(page))) => {
// the invariant
assert_eq!(state.is_some(), p_state.is_some());

// there is a new page => consume the page from the start
let mut nested_page = NestedPage::new(page);

// read next chunk from `nested_page` and get number of values to read
let maybe_nested = extend_offsets1(
&mut nested_page,
state,
&self.field,
&mut self.nested,
self.chunk_size,
);
let nested = match maybe_nested {
Ok(nested) => nested,
Err(e) => return Some(Err(e)),
};
// at this point we know whether there were enough rows in `page`
// to fill chunk_size or not (`nested.is_some()`)
// irrespectively, we need to consume the values from the page

let maybe_page = BooleanDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return Some(Err(e)),
};

let maybe_array = extend_from_new_page::<BooleanDecoder, _, _>(
page,
p_state,
&mut self.items,
&nested,
&self.nested,
&BooleanDecoder::default(),
);
let state = match maybe_array {
Ok(s) => s,
Err(e) => return Some(Err(e)),
};
match nested {
Some(p_state) => Some(Ok((
p_state,
BooleanArray::from_data(DataType::Boolean, state.0.into(), state.1.into()),
))),
None => self.next(),
}
}
(Some(nested), Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
let array =
BooleanArray::from_data(DataType::Boolean, values.into(), validity.into());
Some(Ok((nested, array)))
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.nested,
&self.field,
self.chunk_size,
&BooleanDecoder::default(),
);
match maybe_state {
MaybeNext::Some(Ok((nested, values, validity))) => {
Some(Ok((nested, finish(&DataType::Boolean, values, validity))))
}
(Some(_), None, _) => unreachable!(),
(None, Some(_), _) => unreachable!(),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}
115 changes: 114 additions & 1 deletion src/io/parquet/read/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{
error::{ArrowError, Result},
};

use super::utils::{split_buffer, Decoder, Pushable};
use super::{
utils::{split_buffer, Decoder, MaybeNext, Pushable},
DataPages,
};

/// trait describing deserialized repetition and definition levels
pub trait Nested: std::fmt::Debug {
Expand Down Expand Up @@ -598,3 +601,113 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi
nested.close(*length);
});
}

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
pub struct Optional<'a> {
pub definition_levels: HybridRleDecoder<'a>,
max_def: u32,
}

impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, def_levels, values_buffer, _) = split_buffer(page, page.descriptor());

let max_def = page.descriptor().max_def_level();

Self {
definition_levels: HybridRleDecoder::new(
def_levels,
get_bit_width(max_def),
page.num_values(),
),
max_def: max_def as u32,
}
}

#[inline]
pub fn len(&self) -> usize {
self.definition_levels.size_hint().0
}

#[inline]
pub fn max_def(&self) -> u32 {
self.max_def
}
}

#[inline]
pub(super) fn next<'a, I, C, P, D>(
iter: &'a mut I,
items: &mut VecDeque<(P, MutableBitmap)>,
nested_items: &mut VecDeque<NestedState>,
field: &Field,
chunk_size: usize,
decoder: &D,
) -> MaybeNext<Result<(NestedState, P, MutableBitmap)>>
where
I: DataPages,
C: Default,
P: Pushable<C>,
D: Decoder<'a, C, P>,
{
// back[a1, a2, a3, ...]front
if items.len() > 1 {
let nested = nested_items.pop_back().unwrap();
let (values, validity) = items.pop_back().unwrap();
//let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into());
return MaybeNext::Some(Ok((nested, values, validity)));
}
match (nested_items.pop_back(), items.pop_back(), iter.next()) {
(_, _, Err(e)) => MaybeNext::Some(Err(e.into())),
(None, None, Ok(None)) => MaybeNext::None,
(state, p_state, Ok(Some(page))) => {
// the invariant
assert_eq!(state.is_some(), p_state.is_some());

// there is a new page => consume the page from the start
let mut nested_page = NestedPage::new(page);

// read next chunk from `nested_page` and get number of values to read
let maybe_nested =
extend_offsets1(&mut nested_page, state, field, nested_items, chunk_size);
let nested = match maybe_nested {
Ok(nested) => nested,
Err(e) => return MaybeNext::Some(Err(e)),
};
// at this point we know whether there were enough rows in `page`
// to fill chunk_size or not (`nested.is_some()`)
// irrespectively, we need to consume the values from the page

let maybe_page = decoder.build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return MaybeNext::Some(Err(e)),
};

let maybe_array = extend_from_new_page::<D, _, _>(
page,
p_state,
items,
&nested,
nested_items,
decoder,
);
let state = match maybe_array {
Ok(s) => s,
Err(e) => return MaybeNext::Some(Err(e)),
};
match nested {
Some(p_state) => MaybeNext::Some(Ok((p_state, state.0, state.1))),
None => MaybeNext::More,
}
}
(Some(nested), Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
MaybeNext::Some(Ok((nested, values, validity)))
}
(Some(_), None, _) => unreachable!(),
(None, Some(_), _) => unreachable!(),
}
}
1 change: 1 addition & 0 deletions src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub fn not_implemented(
))
}

#[inline]
pub fn split_buffer<'a>(
page: &'a DataPage,
descriptor: &ColumnDescriptor,
Expand Down

0 comments on commit 9e96c9e

Please sign in to comment.