Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 22, 2022
1 parent d6f3966 commit eae9244
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 171 deletions.
10 changes: 5 additions & 5 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ use arrow2::{
},
};

fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let iter = vec![Ok(columns)];
let iter = vec![Ok(chunk)];

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary))
.collect();

let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn main() -> Result<()> {
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);
let chunk = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns)
write_chunk("test.parquet", schema, chunk)
}
2 changes: 1 addition & 1 deletion src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn check<T: NativeType>(

if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) {
return Err(Error::oos(
"BooleanArray can only be initialized with a DataType whose physical type is Primitive",
"PrimitiveArray can only be initialized with a DataType whose physical type is Primitive",
));
}
Ok(())
Expand Down
102 changes: 95 additions & 7 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand All @@ -23,7 +24,6 @@ where
{
iter: I,
data_type: DataType,
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: Option<usize>,
Expand All @@ -37,14 +37,9 @@ where
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
};
Self {
iter,
data_type,
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
Expand All @@ -54,6 +49,11 @@ where
}

fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Box<dyn Array> {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
};

let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
Expand Down Expand Up @@ -94,7 +94,74 @@ where
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.values_data_type.clone(), dict),
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
phantom: Default::default(),
}
}
}

impl<K, O, I> Iterator for NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
Expand All @@ -104,3 +171,24 @@ where
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, O, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, O, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod utils;

pub use self::nested::NestedIter;
pub use basic::Iter;
pub use dictionary::DictIter;
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
pub use nested::iter_to_arrays_nested;
95 changes: 88 additions & 7 deletions src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::DataType,
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand All @@ -22,7 +23,6 @@ where
{
iter: I,
data_type: DataType,
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: Option<usize>,
Expand All @@ -34,14 +34,9 @@ where
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
};
Self {
iter,
data_type,
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
Expand All @@ -50,6 +45,10 @@ where
}

fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Box<dyn Array> {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
};
let dict = dict
.as_any()
.downcast_ref::<FixedLenByteArrayPageDict>()
Expand Down Expand Up @@ -77,7 +76,7 @@ where
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict(self.values_data_type.clone(), dict),
|dict| read_dict(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
Expand All @@ -87,3 +86,85 @@ where
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
}

impl<K, I> NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
}
}
}

impl<K, I> Iterator for NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod dictionary;
mod utils;

pub use basic::Iter;
pub use dictionary::DictIter;
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
Loading

0 comments on commit eae9244

Please sign in to comment.