Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Dict parquet nested update #1172

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/array/dictionary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use num_traits::FromPrimitive;
use std::hint::unreachable_unchecked;

use crate::{
Expand All @@ -23,7 +24,7 @@ use super::{new_empty_array, primitive::PrimitiveArray, Array};
use super::{new_null_array, specification::check_indexes};

/// Trait denoting [`NativeType`]s that can be used as keys of a dictionary.
pub trait DictionaryKey: NativeType + TryInto<usize> + TryFrom<usize> {
pub trait DictionaryKey: NativeType + TryInto<usize> + TryFrom<usize> + FromPrimitive {
/// The corresponding [`IntegerType`] of this key
const KEY_TYPE: IntegerType;

Expand Down
37 changes: 2 additions & 35 deletions src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,7 @@ mod dictionary;
mod nested;
mod utils;

use crate::{
array::{Array, Offset},
datatypes::DataType,
};

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::nested::NestedIter;
pub use basic::Iter;
pub use dictionary::DictIter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
ArrayIterator::<O, A, I>::new(iter, init, data_type, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = Box::new(array) as Box<dyn Array>;
(nested, values)
})
}),
)
}
pub use nested::iter_to_arrays_nested;
29 changes: 26 additions & 3 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet2::{
schema::Repetition,
};

use crate::array::Array;
use crate::{
array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result,
io::parquet::read::DataPages,
Expand Down Expand Up @@ -141,7 +142,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct NestedIter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
Expand All @@ -150,7 +151,7 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> NestedIter<O, A, I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
Expand All @@ -168,7 +169,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for NestedIter<O, A, I> {
type Item = Result<(NestedState, A)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -189,3 +190,25 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`TraitBinaryArray`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
NestedIter::<O, A, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
let array = Box::new(array) as Box<dyn Array>;
Ok((nested, array))
}),
)
}
25 changes: 1 addition & 24 deletions src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,5 @@
mod basic;
mod nested;

use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::basic::Iter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}))
}
pub use nested::iter_to_arrays_nested;
22 changes: 19 additions & 3 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct ArrayIterator<I: DataPages> {
pub struct NestedIter<I: DataPages> {
iter: I,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
impl<I: DataPages> NestedIter<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
Self {
iter,
Expand All @@ -123,7 +123,7 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap)
BooleanArray::new(data_type.clone(), values.into(), validity.into())
}

impl<I: DataPages> Iterator for ArrayIterator<I> {
impl<I: DataPages> Iterator for NestedIter<I> {
type Item = Result<(NestedState, BooleanArray)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -144,3 +144,19 @@ impl<I: DataPages> Iterator for ArrayIterator<I> {
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(NestedIter::new(iter, init, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}))
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod nested;

use std::collections::VecDeque;

use parquet2::{
Expand Down Expand Up @@ -74,6 +76,10 @@ impl<'a> Optional<'a> {
validity: OptionalPageValidity::try_new(page)?,
})
}

fn len(&self) -> usize {
self.values.len()
}
}

impl<'a> utils::PageState<'a> for State<'a> {
Expand Down Expand Up @@ -292,8 +298,7 @@ pub(super) fn next_dict<
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
let keys =
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into());
let keys = finish_key(values, validity);
MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
}
}
Expand All @@ -304,11 +309,12 @@ pub(super) fn next_dict<
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
} else {
MaybeNext::None
}
}
}
}

pub use nested::next_dict as nested_next_dict;
Loading