diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index a56d36086df..93d0c805fd8 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -10,11 +10,10 @@ use parquet2::{ use crate::{ array::{Array, BinaryArray, Utf8Array}, - bitmap::{Bitmap, MutableBitmap}, - buffer::Buffer, - datatypes::DataType, + bitmap::MutableBitmap, + datatypes::{DataType, PhysicalType}, error::{Error, Result}, - offset::{Offset, OffsetsBuffer}, + offset::Offset, }; use super::super::utils::{ @@ -225,39 +224,6 @@ impl<'a> utils::PageState<'a> for State<'a> { } } -pub trait TraitBinaryArray: Array + 'static { - fn try_new( - data_type: DataType, - offsets: OffsetsBuffer, - values: Buffer, - validity: Option, - ) -> Result - where - Self: Sized; -} - -impl TraitBinaryArray for BinaryArray { - fn try_new( - data_type: DataType, - offsets: OffsetsBuffer, - values: Buffer, - validity: Option, - ) -> Result { - Self::try_new(data_type, offsets, values, validity) - } -} - -impl TraitBinaryArray for Utf8Array { - fn try_new( - data_type: DataType, - offsets: OffsetsBuffer, - values: Buffer, - validity: Option, - ) -> Result { - Self::try_new(data_type, offsets, values, validity) - } -} - impl DecodedState for (Binary, MutableBitmap) { fn len(&self) -> usize { self.0.len() @@ -475,34 +441,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { } } -pub(super) fn finish>( +pub(super) fn finish( data_type: &DataType, mut values: Binary, mut validity: MutableBitmap, -) -> Result { +) -> Result> { values.offsets.shrink_to_fit(); values.values.shrink_to_fit(); validity.shrink_to_fit(); - A::try_new( - data_type.clone(), - values.offsets.into(), - values.values.into(), - validity.into(), - ) + match data_type.to_physical_type() { + PhysicalType::Binary | PhysicalType::LargeBinary => BinaryArray::::try_new( + data_type.clone(), + values.offsets.into(), + values.values.into(), + validity.into(), + ) + .map(|x| x.boxed()), + PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Utf8Array::::try_new( + data_type.clone(), + values.offsets.into(), + values.values.into(), + validity.into(), + ) + .map(|x| x.boxed()), + _ => unreachable!(), + } } -pub struct Iter, I: Pages> { +pub struct Iter { iter: I, data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, dict: Option, chunk_size: Option, remaining: usize, - phantom_a: std::marker::PhantomData, } -impl, I: Pages> Iter { +impl Iter { pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, @@ -511,13 +487,12 @@ impl, I: Pages> Iter { dict: None, chunk_size, remaining: num_rows, - phantom_a: Default::default(), } } } -impl, I: Pages> Iterator for Iter { - type Item = Result; +impl Iterator for Iter { + type Item = Result>; fn next(&mut self) -> Option { let maybe_state = next( diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 2d345140db7..76d58f9c498 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -7,8 +7,8 @@ use parquet2::{ }; use crate::{ - bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::Pages, - offset::Offset, + array::Array, bitmap::MutableBitmap, datatypes::DataType, error::Result, + io::parquet::read::Pages, offset::Offset, }; use super::super::utils::MaybeNext; @@ -17,7 +17,7 @@ use super::utils::*; use super::{super::nested_utils::*, basic::deserialize_plain}; use super::{ super::utils, - basic::{finish, Dict, TraitBinaryArray}, + basic::{finish, Dict}, }; #[derive(Debug)] @@ -136,7 +136,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { } } -pub struct NestedIter, I: Pages> { +pub struct NestedIter { iter: I, data_type: DataType, init: Vec, @@ -144,10 +144,9 @@ pub struct NestedIter, I: Pages> { dict: Option, chunk_size: Option, remaining: usize, - phantom_a: std::marker::PhantomData, } -impl, I: Pages> NestedIter { +impl NestedIter { pub fn new( iter: I, init: Vec, @@ -163,13 +162,12 @@ impl, I: Pages> NestedIter { dict: None, chunk_size, remaining: num_rows, - phantom_a: Default::default(), } } } -impl, I: Pages> Iterator for NestedIter { - type Item = Result<(NestedState, A)>; +impl Iterator for NestedIter { + type Item = Result<(NestedState, Box)>; fn next(&mut self) -> Option { let maybe_state = next( diff --git a/src/io/parquet/read/deserialize/nested.rs b/src/io/parquet/read/deserialize/nested.rs index 4efa7adbcca..de06a60fd65 100644 --- a/src/io/parquet/read/deserialize/nested.rs +++ b/src/io/parquet/read/deserialize/nested.rs @@ -1,7 +1,7 @@ use parquet2::schema::types::PrimitiveType; use crate::{ - array::{BinaryArray, MapArray, Utf8Array}, + array::MapArray, datatypes::{DataType, Field}, error::{Error, Result}, }; @@ -9,6 +9,20 @@ use crate::{ use super::nested_utils::{InitNested, NestedArrayIter}; use super::*; +/// Converts an iterator of arrays to a trait object returning trait objects +#[inline] +fn remove_nested<'a, I>(iter: I) -> NestedArrayIter<'a> +where + I: Iterator)>> + Send + Sync + 'a, +{ + Box::new(iter.map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + (nested, array) + }) + })) +} + /// Converts an iterator of arrays to a trait object returning trait objects #[inline] fn primitive<'a, A, I>(iter: I) -> NestedArrayIter<'a> @@ -185,32 +199,10 @@ where |x: f64| x, )) } - Utf8 => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive(binary::NestedIter::, _>::new( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - )) - } - LargeUtf8 => { - init.push(InitNested::Primitive(field.is_nullable)); - types.pop(); - primitive(binary::NestedIter::, _>::new( - columns.pop().unwrap(), - init, - field.data_type().clone(), - num_rows, - chunk_size, - )) - } - Binary => { + Binary | Utf8 => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - primitive(binary::NestedIter::, _>::new( + remove_nested(binary::NestedIter::::new( columns.pop().unwrap(), init, field.data_type().clone(), @@ -218,10 +210,10 @@ where chunk_size, )) } - LargeBinary => { + LargeBinary | LargeUtf8 => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - primitive(binary::NestedIter::, _>::new( + remove_nested(binary::NestedIter::::new( columns.pop().unwrap(), init, field.data_type().clone(), diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index cacb518bb3e..b0abf400edf 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -6,7 +6,7 @@ use parquet2::{ }; use crate::{ - array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array}, + array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{Error, Result}, types::{days_ms, NativeType}, @@ -278,16 +278,10 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( |x: f64| x, ))), - Binary => dyn_iter(binary::Iter::, _>::new( + Utf8 | Binary => Box::new(binary::Iter::::new( pages, data_type, chunk_size, num_rows, )), - LargeBinary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, num_rows, - )), - Utf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, num_rows, - )), - LargeUtf8 => dyn_iter(binary::Iter::, _>::new( + LargeBinary | LargeUtf8 => Box::new(binary::Iter::::new( pages, data_type, chunk_size, num_rows, )),