Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 27, 2022
1 parent 7e2f110 commit ab74177
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 103 deletions.
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use dictionary::DictIter;
/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: InitNested,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
) -> NestedArrayIter<'a>
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: InitNested,
init: Vec<InitNested>,
items: VecDeque<(Binary<O>, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, data_type: DataType, chunk_size: usize) -> Self {
Self {
iter,
data_type,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use self::basic::Iter;
/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: InitNested,
init: Vec<InitNested>,
chunk_size: usize,
) -> NestedArrayIter<'a>
where
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ impl<'a> Decoder<'a> for BooleanDecoder {
#[derive(Debug)]
pub struct ArrayIterator<I: DataPages> {
iter: I,
init: InitNested,
init: Vec<InitNested>,
// invariant: items.len() == nested.len()
items: VecDeque<(MutableBitmap, MutableBitmap)>,
nested: VecDeque<NestedState>,
chunk_size: usize,
}

impl<I: DataPages> ArrayIterator<I> {
pub fn new(iter: I, init: InitNested, chunk_size: usize) -> Self {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: usize) -> Self {
Self {
iter,
init,
Expand Down
109 changes: 52 additions & 57 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ fn create_list(
}
}

fn is_primitive(data_type: &DataType) -> bool {
matches!(
data_type.to_physical_type(),
crate::datatypes::PhysicalType::Primitive(_)
| crate::datatypes::PhysicalType::Null
| crate::datatypes::PhysicalType::Boolean
| crate::datatypes::PhysicalType::Utf8
| crate::datatypes::PhysicalType::LargeUtf8
| crate::datatypes::PhysicalType::Binary
| crate::datatypes::PhysicalType::LargeBinary
| crate::datatypes::PhysicalType::FixedSizeBinary
| crate::datatypes::PhysicalType::Dictionary(_)
)
}

fn columns_to_iter_recursive<'a, I: 'a>(
mut columns: Vec<I>,
mut types: Vec<&PrimitiveType>,
Expand All @@ -85,7 +100,7 @@ where
I: DataPages,
{
use DataType::*;
if init.len() == 1 && init[0].is_primitive() {
if init.is_empty() && is_primitive(&field.data_type) {
return Ok(Box::new(
page_iter_to_arrays(
columns.pop().unwrap(),
Expand All @@ -99,146 +114,162 @@ where

Ok(match field.data_type().to_logical_type() {
Boolean => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size)
boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size)
}
Int8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x as i8,
)
}
Int16 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x as i16,
)
}
Int32 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x,
)
}
Int64 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i64| x,
)
}
UInt8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x as u8,
)
}
UInt16 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x as u16,
)
}
UInt32 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i32| x as u32,
)
}
UInt64 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: i64| x as u64,
)
}
Float32 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: f32| x,
)
}
Float64 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
|x: f64| x,
)
}
Utf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
binary::iter_to_arrays_nested::<i32, Utf8Array<i32>, _>(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
)
}
LargeUtf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
binary::iter_to_arrays_nested::<i64, Utf8Array<i64>, _>(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
)
}
Binary => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
binary::iter_to_arrays_nested::<i32, BinaryArray<i32>, _>(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
)
}
LargeBinary => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
binary::iter_to_arrays_nested::<i64, BinaryArray<i64>, _>(
columns.pop().unwrap(),
init.pop().unwrap(),
init,
field.data_type().clone(),
chunk_size,
)
}
List(inner) | LargeList(inner) | FixedSizeList(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
vec![columns.pop().unwrap()],
types,
Expand All @@ -258,11 +289,13 @@ where
.iter()
.rev()
.map(|f| {
let mut init = init.clone();
init.push(InitNested::Struct(field.is_nullable));
columns_to_iter_recursive(
vec![columns.pop().unwrap()],
vec![types.pop().unwrap()],
f.clone(),
vec![init.pop().unwrap()],
init,
chunk_size,
)
})
Expand All @@ -274,43 +307,6 @@ where
})
}

fn field_to_init(field: &Field) -> Vec<InitNested> {
use crate::datatypes::PhysicalType::*;
match field.data_type.to_physical_type() {
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)],
List | FixedSizeList | LargeList => {
let a = field.data_type().to_logical_type();
let inner = if let DataType::List(inner) = a {
field_to_init(inner)
} else if let DataType::LargeList(inner) = a {
field_to_init(inner)
} else if let DataType::FixedSizeList(inner, _) = a {
field_to_init(inner)
} else {
unreachable!()
};
inner
.into_iter()
.map(|x| InitNested::List(Box::new(x), field.is_nullable))
.collect()
}
Struct => {
let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() {
fields.iter().rev().map(field_to_init).collect::<Vec<_>>()
} else {
unreachable!()
};
inner
.into_iter()
.flatten()
.map(|x| InitNested::Struct(Box::new(x), field.is_nullable))
.collect()
}
_ => todo!(),
}
}

/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s.
///
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
Expand All @@ -323,9 +319,8 @@ pub fn column_iter_to_arrays<'a, I: 'a>(
where
I: DataPages,
{
let init = field_to_init(&field);

Ok(Box::new(
columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)),
columns_to_iter_recursive(columns, types, field, vec![], chunk_size)?
.map(|x| x.map(|x| x.1)),
))
}
Loading

0 comments on commit ab74177

Please sign in to comment.