Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 28, 2022
1 parent 6ef21ba commit 9ea3406
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 143 deletions.
68 changes: 26 additions & 42 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::{
error::Result,
};

use super::super::utils::{extend_from_decoder, OptionalPageValidity};
use super::super::DataPages;
use super::super::utils::{extend_from_decoder, Decoder, OptionalPageValidity};
use super::{super::utils, utils::Binary};

fn read_delta_optional<O: Offset>(
Expand Down Expand Up @@ -220,27 +220,13 @@ impl<O: Offset> TraitBinaryArray<O> for Utf8Array<O> {
}
}

#[derive(Debug)]
struct BinaryDecoder<O: Offset, A: TraitBinaryArray<O>> {
#[derive(Debug, Default)]
struct BinaryDecoder<O: Offset> {
phantom_o: std::marker::PhantomData<O>,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>> Default for BinaryDecoder<O, A> {
#[inline]
fn default() -> Self {
Self {
phantom_o: std::marker::PhantomData,
phantom_a: std::marker::PhantomData,
}
}
}

impl<'a, O: Offset, A: TraitBinaryArray<O>> utils::Decoder<'a, &'a [u8], Binary<O>>
for BinaryDecoder<O, A>
{
impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O> {
type State = State<'a>;
type Array = A;

fn with_capacity(&self, capacity: usize) -> Binary<O> {
Binary::<O>::with_capacity(capacity)
Expand Down Expand Up @@ -281,15 +267,19 @@ impl<'a, O: Offset, A: TraitBinaryArray<O>> utils::Decoder<'a, &'a [u8], Binary<
}
}
}
}

fn finish(data_type: DataType, values: Binary<O>, validity: MutableBitmap) -> Self::Array {
A::from_data(
data_type,
values.offsets.0.into(),
values.values.into(),
validity.into(),
)
}
fn finish<O: Offset, A: TraitBinaryArray<O>>(
data_type: &DataType,
values: Binary<O>,
validity: MutableBitmap,
) -> A {
A::from_data(
data_type.clone(),
values.offsets.0.into(),
values.values.into(),
validity.into(),
)
}

pub struct BinaryArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
Expand Down Expand Up @@ -320,13 +310,10 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for BinaryArrayIt
fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
Ok(BinaryDecoder::finish(
self.data_type.clone(),
values,
validity,
))
});
return self
.items
.pop_back()
.map(|(values, validity)| Ok(finish(&self.data_type, values, validity)));
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
Expand All @@ -340,17 +327,18 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for BinaryArrayIt
Err(e) => return Some(Err(e)),
};

utils::extend_from_new_page::<BinaryDecoder<O, A>, _, _>(
utils::extend_from_new_page(
page,
state,
&self.data_type,
self.chunk_size,
&mut self.items,
&BinaryDecoder::<O, A>::default(),
&BinaryDecoder::<O>::default(),
)
};
match maybe_array {
Ok(Some(array)) => Some(Ok(array)),
Ok(Some((values, validity))) => {
Some(Ok(finish(&self.data_type, values, validity)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
Expand All @@ -359,11 +347,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for BinaryArrayIt
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);
Some(Ok(BinaryDecoder::finish(
self.data_type.clone(),
values,
validity,
)))
Some(Ok(finish(&self.data_type, values, validity)))
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{collections::VecDeque, sync::Arc};
use parquet2::page::BinaryPageDict;

use crate::{
array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array},
array::{
Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array,
},
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::{ArrowError, Result},
};

use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
Expand Down Expand Up @@ -66,7 +67,7 @@ where
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
let keys = PrimitiveDecoder::<K>::finish(self.data_type.clone(), values, validity);
let keys = finish_key(values, validity);
let values = self.values.unwrap();
Ok(DictionaryArray::from_data(keys, values))
});
Expand Down Expand Up @@ -128,14 +129,19 @@ where
utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
&self.data_type,
self.chunk_size,
&mut self.items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some(keys)) => {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Expand All @@ -148,7 +154,8 @@ where
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);

let keys = PrimitiveDecoder::<K>::finish(self.data_type.clone(), values, validity);
let keys = finish_key(values, validity);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Expand Down
55 changes: 24 additions & 31 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,32 @@ impl<'a> Required<'a> {
}
}

// The state of a `DataPage` of `Boolean` parquet primitive type
// The state of a `DataPage` of `Boolean` parquet boolean type
#[derive(Debug)]
enum BooleanPageState<'a> {
enum State<'a> {
Optional(Optional<'a>),
Required(Required<'a>),
}

impl<'a> BooleanPageState<'a> {
impl<'a> State<'a> {
pub fn len(&self) -> usize {
match self {
BooleanPageState::Optional(page) => page.validity.len(),
BooleanPageState::Required(page) => page.length - page.offset,
State::Optional(page) => page.validity.len(),
State::Required(page) => page.length - page.offset,
}
}
}

impl<'a> utils::PageState<'a> for BooleanPageState<'a> {
impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
self.len()
}
}

fn build_state(page: &DataPage, is_optional: bool) -> Result<BooleanPageState> {
fn build_state(page: &DataPage, is_optional: bool) -> Result<State> {
match (page.encoding(), is_optional) {
(Encoding::Plain, true) => Ok(BooleanPageState::Optional(Optional::new(page))),
(Encoding::Plain, false) => Ok(BooleanPageState::Required(Required::new(page))),
(Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))),
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(
&page.encoding(),
is_optional,
Expand All @@ -99,8 +99,7 @@ fn build_state(page: &DataPage, is_optional: bool) -> Result<BooleanPageState> {
struct BooleanDecoder {}

impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
type State = BooleanPageState<'a>;
type Array = BooleanArray;
type State = State<'a>;

fn with_capacity(&self, capacity: usize) -> MutableBitmap {
MutableBitmap::with_capacity(capacity)
Expand All @@ -113,24 +112,24 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder {
remaining: usize,
) {
match state {
BooleanPageState::Optional(page) => extend_from_decoder(
State::Optional(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
&mut page.values,
),
BooleanPageState::Required(page) => {
State::Required(page) => {
let remaining = remaining.min(page.length - page.offset);
values.extend_from_slice(page.values, page.offset, remaining);
page.offset += remaining;
}
}
}
}

fn finish(data_type: DataType, values: MutableBitmap, validity: MutableBitmap) -> Self::Array {
BooleanArray::from_data(data_type, values.into(), validity.into())
}
fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) -> BooleanArray {
BooleanArray::from_data(data_type.clone(), values.into(), validity.into())
}

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
Expand Down Expand Up @@ -161,13 +160,10 @@ impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
Ok(BooleanDecoder::finish(
self.data_type.clone(),
values,
validity,
))
});
return self
.items
.pop_back()
.map(|(values, validity)| Ok(finish(&self.data_type, values, validity)));
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
Expand All @@ -180,16 +176,17 @@ impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
Err(e) => return Some(Err(e)),
};

let maybe_array = extend_from_new_page::<BooleanDecoder, _, _>(
let maybe_array = extend_from_new_page(
page,
state,
&self.data_type,
self.chunk_size,
&mut self.items,
&BooleanDecoder::default(),
);
match maybe_array {
Ok(Some(array)) => Some(Ok(array)),
Ok(Some((values, validity))) => {
Some(Ok(finish(&self.data_type, values, validity)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
Expand All @@ -198,11 +195,7 @@ impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);
Some(Ok(BooleanDecoder::finish(
self.data_type.clone(),
values,
validity,
)))
Some(Ok(finish(&self.data_type, values, validity)))
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/io/parquet/read/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use super::utils;
use crate::{
array::{Array, DictionaryKey, PrimitiveArray},
bitmap::MutableBitmap,
datatypes::DataType,
error::Result,
io::parquet::read::utils::{extend_from_decoder, OptionalPageValidity},
};
Expand Down Expand Up @@ -126,7 +125,6 @@ where
K: DictionaryKey,
{
type State = State<'a, K>;
type Array = PrimitiveArray<K>;

fn with_capacity(&self, capacity: usize) -> Vec<K> {
Vec::<K>::with_capacity(capacity)
Expand All @@ -151,10 +149,6 @@ where
}*/
}
}

fn finish(data_type: DataType, values: Vec<K>, validity: MutableBitmap) -> Self::Array {
PrimitiveArray::from_data(data_type, values.into(), validity.into())
}
}

#[derive(Debug)]
Expand All @@ -171,3 +165,7 @@ impl Dict {
}
}
}

pub fn finish_key<K: DictionaryKey>(values: Vec<K>, validity: MutableBitmap) -> PrimitiveArray<K> {
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into())
}
Loading

0 comments on commit 9ea3406

Please sign in to comment.