Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read and write nested dictionaries to parquet (#1175)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 23, 2022
1 parent f79c4bd commit c720eb2
Show file tree
Hide file tree
Showing 25 changed files with 896 additions and 226 deletions.
10 changes: 5 additions & 5 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ use arrow2::{
},
};

fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let iter = vec![Ok(columns)];
let iter = vec![Ok(chunk)];

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary))
.collect();

let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn main() -> Result<()> {
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);
let chunk = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns)
write_chunk("test.parquet", schema, chunk)
}
2 changes: 1 addition & 1 deletion src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn check<T: NativeType>(

if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) {
return Err(Error::oos(
"BooleanArray can only be initialized with a DataType whose physical type is Primitive",
"PrimitiveArray can only be initialized with a DataType whose physical type is Primitive",
));
}
Ok(())
Expand Down
102 changes: 95 additions & 7 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand All @@ -23,7 +24,6 @@ where
{
iter: I,
data_type: DataType,
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: Option<usize>,
Expand All @@ -37,14 +37,9 @@ where
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
};
Self {
iter,
data_type,
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
Expand All @@ -54,6 +49,11 @@ where
}

fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Box<dyn Array> {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
};

let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
Expand Down Expand Up @@ -94,7 +94,74 @@ where
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.values_data_type.clone(), dict),
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
phantom: Default::default(),
}
}
}

impl<K, O, I> Iterator for NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
Expand All @@ -104,3 +171,24 @@ where
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, O, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, O, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
39 changes: 3 additions & 36 deletions src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,7 @@ mod dictionary;
mod nested;
mod utils;

use crate::{
array::{Array, Offset},
datatypes::DataType,
};

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::nested::NestedIter;
pub use basic::Iter;
pub use dictionary::DictIter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
ArrayIterator::<O, A, I>::new(iter, init, data_type, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = Box::new(array) as Box<dyn Array>;
(nested, values)
})
}),
)
}
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
pub use nested::iter_to_arrays_nested;
29 changes: 26 additions & 3 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet2::{
schema::Repetition,
};

use crate::array::Array;
use crate::{
array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result,
io::parquet::read::DataPages,
Expand Down Expand Up @@ -141,7 +142,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct NestedIter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
Expand All @@ -150,7 +151,7 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> NestedIter<O, A, I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
Expand All @@ -168,7 +169,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for NestedIter<O, A, I> {
type Item = Result<(NestedState, A)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -189,3 +190,25 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`TraitBinaryArray`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
NestedIter::<O, A, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
let array = Box::new(array) as Box<dyn Array>;
Ok((nested, array))
}),
)
}
25 changes: 1 addition & 24 deletions src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,5 @@
mod basic;
mod nested;

use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::basic::Iter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}))
}
pub use nested::iter_to_arrays_nested;
22 changes: 19 additions & 3 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct ArrayIterator<I: DataPages> {
pub struct NestedIter<I: DataPages> {
iter: I,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
impl<I: DataPages> NestedIter<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
Self {
iter,
Expand All @@ -123,7 +123,7 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap)
BooleanArray::new(data_type.clone(), values.into(), validity.into())
}

impl<I: DataPages> Iterator for ArrayIterator<I> {
impl<I: DataPages> Iterator for NestedIter<I> {
type Item = Result<(NestedState, BooleanArray)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -144,3 +144,19 @@ impl<I: DataPages> Iterator for ArrayIterator<I> {
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(NestedIter::new(iter, init, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}))
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod nested;

use std::collections::VecDeque;

use parquet2::{
Expand Down Expand Up @@ -292,8 +294,7 @@ pub(super) fn next_dict<
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
let keys =
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into());
let keys = finish_key(values, validity);
MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
}
}
Expand All @@ -304,11 +305,12 @@ pub(super) fn next_dict<
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
} else {
MaybeNext::None
}
}
}
}

pub use nested::next_dict as nested_next_dict;
Loading

0 comments on commit c720eb2

Please sign in to comment.