From 639739366981caa47874ad3568ba69e5669063b4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 27 May 2022 06:17:29 +0000 Subject: [PATCH] Simpler --- src/io/parquet/read/deserialize/binary/mod.rs | 2 +- .../parquet/read/deserialize/binary/nested.rs | 4 +- .../parquet/read/deserialize/boolean/mod.rs | 2 +- .../read/deserialize/boolean/nested.rs | 4 +- src/io/parquet/read/deserialize/mod.rs | 109 +++++++++--------- .../parquet/read/deserialize/nested_utils.rs | 64 +++++----- .../parquet/read/deserialize/primitive/mod.rs | 2 +- .../read/deserialize/primitive/nested.rs | 4 +- 8 files changed, 88 insertions(+), 103 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 7c43154073c..103c6c5fcab 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -23,7 +23,7 @@ pub use dictionary::DictIter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, - init: InitNested, + init: Vec, data_type: DataType, chunk_size: usize, ) -> NestedArrayIter<'a> diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index f4492d412ee..95461125553 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -142,7 +142,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { pub struct ArrayIterator, I: DataPages> { iter: I, data_type: DataType, - init: InitNested, + init: Vec, items: VecDeque<(Binary, MutableBitmap)>, nested: VecDeque, chunk_size: usize, @@ -150,7 +150,7 @@ pub struct ArrayIterator, I: DataPages> { } impl, I: DataPages> ArrayIterator { - pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, init: Vec, data_type: DataType, chunk_size: usize) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index 61aeeb97757..ab663682eba 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -16,7 +16,7 @@ pub use self::basic::Iter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, - init: InitNested, + init: Vec, chunk_size: usize, ) -> NestedArrayIter<'a> where diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 5f30c698a80..b2e2477fb48 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -113,7 +113,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { #[derive(Debug)] pub struct ArrayIterator { iter: I, - init: InitNested, + init: Vec, // invariant: items.len() == nested.len() items: VecDeque<(MutableBitmap, MutableBitmap)>, nested: VecDeque, @@ -121,7 +121,7 @@ pub struct ArrayIterator { } impl ArrayIterator { - pub fn new(iter: I, init: InitNested, chunk_size: usize) -> Self { + pub fn new(iter: I, init: Vec, chunk_size: usize) -> Self { Self { iter, init, diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 448f059653e..05e3d5ebb13 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -74,6 +74,21 @@ fn create_list( } } +fn is_primitive(data_type: &DataType) -> bool { + matches!( + data_type.to_physical_type(), + crate::datatypes::PhysicalType::Primitive(_) + | crate::datatypes::PhysicalType::Null + | crate::datatypes::PhysicalType::Boolean + | crate::datatypes::PhysicalType::Utf8 + | crate::datatypes::PhysicalType::LargeUtf8 + | crate::datatypes::PhysicalType::Binary + | crate::datatypes::PhysicalType::LargeBinary + | crate::datatypes::PhysicalType::FixedSizeBinary + | crate::datatypes::PhysicalType::Dictionary(_) + ) +} + fn columns_to_iter_recursive<'a, I: 'a>( mut columns: Vec, mut types: Vec<&PrimitiveType>, @@ -85,7 +100,7 @@ where I: DataPages, { use DataType::*; - if init.len() == 1 && init[0].is_primitive() { + if init.is_empty() && is_primitive(&field.data_type) { return Ok(Box::new( page_iter_to_arrays( columns.pop().unwrap(), @@ -99,146 +114,162 @@ where Ok(match field.data_type().to_logical_type() { Boolean => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size) + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size) } Int8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as i8, ) } Int16 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as i16, ) } Int32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x, ) } Int64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i64| x, ) } UInt8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u8, ) } UInt16 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u16, ) } UInt32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u32, ) } UInt64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i64| x as u64, ) } Float32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: f32| x, ) } Float64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: f64| x, ) } Utf8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } LargeUtf8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } Binary => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } LargeBinary => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } List(inner) | LargeList(inner) | FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); let iter = columns_to_iter_recursive( vec![columns.pop().unwrap()], types, @@ -258,11 +289,13 @@ where .iter() .rev() .map(|f| { + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); columns_to_iter_recursive( vec![columns.pop().unwrap()], vec![types.pop().unwrap()], f.clone(), - vec![init.pop().unwrap()], + init, chunk_size, ) }) @@ -274,43 +307,6 @@ where }) } -fn field_to_init(field: &Field) -> Vec { - use crate::datatypes::PhysicalType::*; - match field.data_type.to_physical_type() { - Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], - List | FixedSizeList | LargeList => { - let a = field.data_type().to_logical_type(); - let inner = if let DataType::List(inner) = a { - field_to_init(inner) - } else if let DataType::LargeList(inner) = a { - field_to_init(inner) - } else if let DataType::FixedSizeList(inner, _) = a { - field_to_init(inner) - } else { - unreachable!() - }; - inner - .into_iter() - .map(|x| InitNested::List(Box::new(x), field.is_nullable)) - .collect() - } - Struct => { - let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() { - fields.iter().rev().map(field_to_init).collect::>() - } else { - unreachable!() - }; - inner - .into_iter() - .flatten() - .map(|x| InitNested::Struct(Box::new(x), field.is_nullable)) - .collect() - } - _ => todo!(), - } -} - /// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s. /// /// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. @@ -323,9 +319,8 @@ pub fn column_iter_to_arrays<'a, I: 'a>( where I: DataPages, { - let init = field_to_init(&field); - Ok(Box::new( - columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)), + columns_to_iter_recursive(columns, types, field, vec![], chunk_size)? + .map(|x| x.map(|x| x.1)), )) } diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index b74f7c24734..39233efb92e 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -221,46 +221,36 @@ where } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum InitNested { Primitive(bool), - List(Box, bool), - Struct(Box, bool), + List(bool), + Struct(bool), } -impl InitNested { - pub fn is_primitive(&self) -> bool { - matches!(self, Self::Primitive(_)) - } -} - -fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec>) { - match init { - InitNested::Primitive(is_nullable) => { - container.push(Box::new(NestedPrimitive::new(*is_nullable)) as Box) - } - InitNested::List(inner, is_nullable) => { - container.push(if *is_nullable { - Box::new(NestedOptional::with_capacity(capacity)) as Box - } else { - Box::new(NestedValid::with_capacity(capacity)) as Box - }); - init_nested_recursive(inner, capacity, container) - } - InitNested::Struct(inner, is_nullable) => { - if *is_nullable { - container.push(Box::new(NestedStruct::with_capacity(capacity)) as Box) - } else { - container.push(Box::new(NestedStructValid::new()) as Box) +fn init_nested(init: &[InitNested], capacity: usize) -> NestedState { + let container = init + .iter() + .map(|init| match init { + InitNested::Primitive(is_nullable) => { + Box::new(NestedPrimitive::new(*is_nullable)) as Box } - init_nested_recursive(inner, capacity, container) - } - } -} - -fn init_nested(init: &InitNested, capacity: usize) -> NestedState { - let mut container = vec![]; - init_nested_recursive(init, capacity, &mut container); + InitNested::List(is_nullable) => { + if *is_nullable { + Box::new(NestedOptional::with_capacity(capacity)) as Box + } else { + Box::new(NestedValid::with_capacity(capacity)) as Box + } + } + InitNested::Struct(is_nullable) => { + if *is_nullable { + Box::new(NestedStruct::with_capacity(capacity)) as Box + } else { + Box::new(NestedStructValid::new()) as Box + } + } + }) + .collect(); NestedState::new(container) } @@ -359,7 +349,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( /// has less items than `chunk_size` pub fn extend_offsets1<'a>( page: &mut NestedPage<'a>, - init: &InitNested, + init: &[InitNested], items: &mut VecDeque, chunk_size: usize, ) { @@ -475,7 +465,7 @@ pub(super) fn next<'a, I, D>( iter: &'a mut I, items: &mut VecDeque, nested_items: &mut VecDeque, - init: &InitNested, + init: &[InitNested], chunk_size: usize, decoder: &D, ) -> MaybeNext> diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index b2eba1534ff..400b87f1040 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -16,7 +16,7 @@ use nested::ArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, - init: InitNested, + init: Vec, data_type: DataType, chunk_size: usize, op: F, diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 0aff18e2578..8db433a5737 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -174,7 +174,7 @@ where F: Copy + Fn(P) -> T, { iter: I, - init: InitNested, + init: Vec, data_type: DataType, // invariant: items.len() == nested.len() items: VecDeque<(Vec, MutableBitmap)>, @@ -191,7 +191,7 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new(iter: I, init: Vec, data_type: DataType, chunk_size: usize, op: F) -> Self { Self { iter, init,