diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs similarity index 100% rename from src/io/parquet/read/binary/basic.rs rename to src/io/parquet/read/deserialize/binary/basic.rs diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs similarity index 98% rename from src/io/parquet/read/binary/dictionary.rs rename to src/io/parquet/read/deserialize/binary/dictionary.rs index 1a95b8a1dcc..4f8adc70268 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -7,10 +7,10 @@ use crate::{ bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, - io::parquet::read::utils::MaybeNext, }; use super::super::dictionary::*; +use super::super::utils::MaybeNext; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs similarity index 100% rename from src/io/parquet/read/binary/mod.rs rename to src/io/parquet/read/deserialize/binary/mod.rs diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs similarity index 96% rename from src/io/parquet/read/binary/nested.rs rename to src/io/parquet/read/deserialize/binary/nested.rs index 2276829e70c..17de2d93a97 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -3,14 +3,12 @@ use std::collections::VecDeque; use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; use crate::{ - array::Offset, - bitmap::MutableBitmap, - datatypes::DataType, - error::Result, - io::parquet::read::{utils::MaybeNext, DataPages}, + array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, + io::parquet::read::DataPages, }; use super::super::nested_utils::*; +use super::super::utils::MaybeNext; use super::utils::Binary; use super::{ super::utils, diff --git a/src/io/parquet/read/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs similarity index 96% rename from src/io/parquet/read/binary/utils.rs rename to src/io/parquet/read/deserialize/binary/utils.rs index 6bf16655fac..d967417eb63 100644 --- a/src/io/parquet/read/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -1,4 +1,6 @@ -use crate::{array::Offset, io::parquet::read::utils::Pushable}; +use crate::array::Offset; + +use super::super::utils::Pushable; /// [`Pushable`] for variable length binary data. #[derive(Debug)] diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs similarity index 100% rename from src/io/parquet/read/boolean/basic.rs rename to src/io/parquet/read/deserialize/boolean/basic.rs diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs similarity index 100% rename from src/io/parquet/read/boolean/mod.rs rename to src/io/parquet/read/deserialize/boolean/mod.rs diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs similarity index 100% rename from src/io/parquet/read/boolean/nested.rs rename to src/io/parquet/read/deserialize/boolean/nested.rs diff --git a/src/io/parquet/read/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs similarity index 100% rename from src/io/parquet/read/dictionary.rs rename to src/io/parquet/read/deserialize/dictionary.rs diff --git a/src/io/parquet/read/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs similarity index 95% rename from src/io/parquet/read/fixed_size_binary/basic.rs rename to src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index e3e8e593b7f..0aaa08bdf99 100644 --- a/src/io/parquet/read/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -7,19 +7,14 @@ use parquet2::{ }; use crate::{ - array::FixedSizeBinaryArray, - bitmap::MutableBitmap, - datatypes::DataType, - error::Result, - io::parquet::read::{ - utils::{ - dict_indices_decoder, extend_from_decoder, next, not_implemented, split_buffer, - Decoder, MaybeNext, OptionalPageValidity, PageState, - }, - DataPages, - }, + array::FixedSizeBinaryArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, }; +use super::super::utils::{ + dict_indices_decoder, extend_from_decoder, next, not_implemented, split_buffer, Decoder, + MaybeNext, OptionalPageValidity, PageState, +}; +use super::super::DataPages; use super::utils::FixedSizeBinary; struct Optional<'a> { diff --git a/src/io/parquet/read/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs similarity index 100% rename from src/io/parquet/read/fixed_size_binary/dictionary.rs rename to src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs similarity index 100% rename from src/io/parquet/read/fixed_size_binary/mod.rs rename to src/io/parquet/read/deserialize/fixed_size_binary/mod.rs diff --git a/src/io/parquet/read/fixed_size_binary/utils.rs b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs similarity index 96% rename from src/io/parquet/read/fixed_size_binary/utils.rs rename to src/io/parquet/read/deserialize/fixed_size_binary/utils.rs index dcf20b0110f..a4b7d047606 100644 --- a/src/io/parquet/read/fixed_size_binary/utils.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs @@ -1,4 +1,4 @@ -use crate::io::parquet::read::utils::Pushable; +use super::super::utils::Pushable; /// A [`Pushable`] for fixed sized binary data #[derive(Debug)] diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 4e8a6de31c3..a6a2b48dc15 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -1,494 +1,271 @@ //! APIs to read from Parquet format. -use std::sync::Arc; - -use parquet2::{ - schema::types::{ - LogicalType, ParquetType, PhysicalType, TimeUnit as ParquetTimeUnit, TimestampType, - }, - types::int96_to_i64_ns, -}; +mod binary; +mod boolean; +mod dictionary; +mod fixed_size_binary; +mod nested_utils; +mod null; +mod primitive; +mod simple; +mod utils; use crate::{ - array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, Utf8Array}, - datatypes::{DataType, IntervalUnit, TimeUnit}, + array::{Array, BinaryArray, ListArray, StructArray, Utf8Array}, + datatypes::{DataType, Field}, error::{ArrowError, Result}, }; -use super::binary; -use super::boolean; -use super::fixed_size_binary; -use super::null; -use super::primitive; -use super::{dyn_iter, iden, op, ArrayIter, DataPages}; +use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; +use simple::page_iter_to_arrays; + +use super::*; + +/// Creates a new iterator of compressed pages. +pub fn get_page_iterator( + column_metadata: &ColumnChunkMetaData, + reader: R, + pages_filter: Option, + buffer: Vec, +) -> Result> { + Ok(_get_page_iterator( + column_metadata, + reader, + pages_filter, + buffer, + )?) +} -pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( - pages: I, - type_: &ParquetType, +fn create_list( data_type: DataType, - chunk_size: usize, -) -> Result> { - use DataType::*; - - let (physical_type, logical_type) = if let ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } = type_ - { - (physical_type, logical_type) - } else { - return Err(ArrowError::InvalidArgumentError( - "page_iter_to_arrays can only be called with a parquet primitive type".into(), - )); - }; - - Ok(match data_type.to_logical_type() { - Null => null::iter_to_arrays(pages, data_type, chunk_size), - Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), - UInt8 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as u8, - ))), - UInt16 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as u16, - ))), - UInt32 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as u32, - ))), - Int8 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as i8, - ))), - Int16 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as i16, - ))), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32), - )), - - Timestamp(time_unit, _) => { - let time_unit = *time_unit; - return timestamp( - pages, - physical_type, - logical_type, + nested: &mut NestedState, + values: Arc, +) -> Result> { + Ok(match data_type { + DataType::List(_) => { + let (offsets, validity) = nested.nested.pop().unwrap().inner(); + + let offsets = offsets.iter().map(|x| *x as i32).collect::>(); + Arc::new(ListArray::::from_data( data_type, - chunk_size, - time_unit, - ); + offsets.into(), + values, + validity, + )) } + DataType::LargeList(_) => { + let (offsets, validity) = nested.nested.pop().unwrap().inner(); - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), - - Decimal(_, _) => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i32| x as i128, - ))), - PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i64| x as i128, - ))), - &PhysicalType::FixedLenByteArray(n) if n > 16 => { - return Err(ArrowError::NotYetImplemented(format!( - "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", - n - ))) - } - &PhysicalType::FixedLenByteArray(n) => { - let n = n as usize; - - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); - - let pages = pages.map(move |maybe_array| { - let array = maybe_array?; - let values = array - .values() - .chunks_exact(n) - .map(|value: &[u8]| { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value); - i128::from_be_bytes(bytes) >> (8 * (16 - n)) - }) - .collect::>(); - let validity = array.validity().cloned(); - - Ok(PrimitiveArray::::from_data( - data_type.clone(), - values.into(), - validity, - )) - }); - - let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); - - Box::new(arrays) as _ - } - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i64| x as i64, - ))), - UInt64 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: i64| x as u64, - ))), - - Float32 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: f32| x, - ))), - Float64 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - |x: f64| x, - ))), - - Binary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, - )), - LargeBinary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, - )), - Utf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, - )), - LargeUtf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, - )), - - Dictionary(key_type, _, _) => { - return match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, physical_type, logical_type, data_type, chunk_size) - }) + Arc::new(ListArray::::from_data( + data_type, offsets, values, validity, + )) } - - other => { + _ => { return Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from parquet still not implemented", - other + "Read nested datatype {:?}", + data_type ))) } }) } -fn timestamp<'a, I: 'a + DataPages>( - pages: I, - physical_type: &PhysicalType, - logical_type: &Option, - data_type: DataType, - chunk_size: usize, - time_unit: TimeUnit, -) -> Result> { - if physical_type == &PhysicalType::Int96 { - if time_unit == TimeUnit::Nanosecond { - return Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - int96_to_i64_ns, - )))); - } else { - return Err(ArrowError::nyi( - "Can't decode int96 to timestamp other than ns", - )); - } - }; - if physical_type != &PhysicalType::Int64 { - return Err(ArrowError::nyi( - "Can't decode a timestamp from a non-int64 parquet type", - )); - } - - let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); - - let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { - unit - } else { - return Ok(dyn_iter(iden(iter))); - }; +struct StructIterator<'a> { + iters: Vec>, + fields: Vec, +} - Ok(match (unit, time_unit) { - (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), +impl<'a> StructIterator<'a> { + pub fn new(iters: Vec>, fields: Vec) -> Self { + assert_eq!(iters.len(), fields.len()); + Self { iters, fields } + } +} - (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)), +impl<'a> Iterator for StructIterator<'a> { + type Item = Result<(NestedState, Arc)>; - (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), + fn next(&mut self) -> Option { + let values = self + .iters + .iter_mut() + .map(|iter| iter.next()) + .collect::>(); - (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)), - }) + if values.iter().any(|x| x.is_none()) { + return None; + } + let values = values + .into_iter() + .map(|x| x.unwrap().map(|x| x.1)) + .collect::>>(); + + match values { + Ok(values) => Some(Ok(( + NestedState::new(vec![]), // todo + Arc::new(StructArray::from_data( + DataType::Struct(self.fields.clone()), + values, + None, + )), + ))), + Err(e) => Some(Err(e)), + } + } } -fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( - pages: I, - physical_type: &PhysicalType, - logical_type: &Option, - data_type: DataType, +fn columns_to_iter_recursive<'a, I: 'a>( + mut columns: Vec, + mut types: Vec<&ParquetType>, + field: Field, + mut init: Vec, chunk_size: usize, - time_unit: TimeUnit, -) -> Result> { - if physical_type == &PhysicalType::Int96 { - if time_unit == TimeUnit::Nanosecond { - return Ok(dyn_iter(primitive::DictIter::::new( - pages, - DataType::Timestamp(TimeUnit::Nanosecond, None), +) -> Result> +where + I: DataPages, +{ + use DataType::*; + if init.len() == 1 && init[0].is_primitive() { + return Ok(Box::new( + page_iter_to_arrays( + columns.pop().unwrap(), + types.pop().unwrap(), + field.data_type, chunk_size, - int96_to_i64_ns, - ))); - } else { - return Err(ArrowError::nyi( - "Can't decode int96 to timestamp other than ns", - )); - } - }; - - let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { - unit - } else { - return Ok(dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - ))); - }; + )? + .map(|x| Ok((NestedState::new(vec![]), x?))), + )); + } - Ok(match (unit, time_unit) { - (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000, - )) + Ok(match field.data_type().to_logical_type() { + Boolean => { + types.pop(); + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size) } - (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000_000, - )) - } - (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, + Int16 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), chunk_size, - |x: i64| x * 1_000_000_000, - )) + |x: i32| x as i16, + ) } - - (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, + Int64 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), chunk_size, |x: i64| x, - )) + ) } - (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, + Utf8 => { + types.pop(); + binary::iter_to_arrays_nested::, _>( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), chunk_size, - |x: i64| x / 1_000, - )) + ) } - (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, + LargeBinary => { + types.pop(); + binary::iter_to_arrays_nested::, _>( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), chunk_size, - |x: i64| x / 1_000_000, - )) + ) } - - (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, + List(inner) => { + let iter = columns_to_iter_recursive( + vec![columns.pop().unwrap()], + types, + inner.as_ref().clone(), + init, chunk_size, - |x: i64| x * 1_000, - )) + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array)?; + Ok((nested, array)) + }); + Box::new(iter) as _ } - (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - )) - } - (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x / 1_000, - )) + Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + columns_to_iter_recursive( + vec![columns.pop().unwrap()], + vec![types.pop().unwrap()], + f.clone(), + vec![init.pop().unwrap()], + chunk_size, + ) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(StructIterator::new(columns, fields.clone())) } + _ => todo!(), + }) +} - (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x * 1_000_000, - )) +fn field_to_init(field: &Field) -> Vec { + use crate::datatypes::PhysicalType::*; + match field.data_type.to_physical_type() { + Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 + | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], + List | FixedSizeList | LargeList => { + let a = field.data_type().to_logical_type(); + let inner = if let DataType::List(inner) = a { + field_to_init(inner) + } else if let DataType::LargeList(inner) = a { + field_to_init(inner) + } else if let DataType::FixedSizeList(inner, _) = a { + field_to_init(inner) + } else { + unreachable!() + }; + inner + .into_iter() + .map(|x| InitNested::List(Box::new(x), field.is_nullable)) + .collect() } - (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x * 1_000, - )) + Struct => { + let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() { + fields.iter().rev().map(field_to_init).collect::>() + } else { + unreachable!() + }; + inner + .into_iter() + .flatten() + .map(|x| InitNested::Struct(Box::new(x), field.is_nullable)) + .collect() } - (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => { - dyn_iter(primitive::DictIter::::new( - pages, - data_type, - chunk_size, - |x: i64| x, - )) - } - }) + _ => todo!(), + } } -fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( - iter: I, - physical_type: &PhysicalType, - logical_type: &Option, - data_type: DataType, +/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s. +/// +/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. +pub fn column_iter_to_arrays<'a, I: 'a>( + columns: Vec, + types: Vec<&ParquetType>, + field: Field, chunk_size: usize, -) -> Result> { - use DataType::*; - let values_data_type = if let Dictionary(_, v, _) = &data_type { - v.as_ref() - } else { - panic!() - }; - - Ok(match values_data_type.to_logical_type() { - UInt8 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i32| x as u8, - )), - UInt16 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i32| x as u16, - )), - UInt32 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i32| x as u32, - )), - Int8 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i32| x as i8, - )), - Int16 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i32| x as i16, - )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { - x as i32 - }), - ), - - Timestamp(time_unit, _) => { - let time_unit = *time_unit; - return timestamp_dict::( - iter, - physical_type, - logical_type, - data_type, - chunk_size, - time_unit, - ); - } - - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), - ), - Float32 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f32| x, - )), - Float64 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f64| x, - )), - - Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( - iter, data_type, chunk_size, - )), - other => { - return Err(ArrowError::nyi(format!( - "Reading dictionaries of type {:?}", - other - ))) - } - }) +) -> Result> +where + I: DataPages, +{ + let init = field_to_init(&field); + + Ok(Box::new( + columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)), + )) } diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs similarity index 99% rename from src/io/parquet/read/nested_utils.rs rename to src/io/parquet/read/deserialize/nested_utils.rs index 8f9e2d32bf2..178ababd464 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -11,10 +11,8 @@ use crate::{ error::Result, }; -use super::{ - utils::{split_buffer, Decoder, MaybeNext, Pushable}, - DataPages, -}; +use super::super::DataPages; +use super::utils::{split_buffer, Decoder, MaybeNext, Pushable}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug + Send + Sync { diff --git a/src/io/parquet/read/null.rs b/src/io/parquet/read/deserialize/null.rs similarity index 95% rename from src/io/parquet/read/null.rs rename to src/io/parquet/read/deserialize/null.rs index 5bfb11d135b..08382ca6be1 100644 --- a/src/io/parquet/read/null.rs +++ b/src/io/parquet/read/deserialize/null.rs @@ -5,8 +5,7 @@ use crate::{ datatypes::DataType, }; -use super::ArrayIter; -use super::DataPages; +use super::super::{ArrayIter, DataPages}; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays<'a, I>(mut iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs similarity index 100% rename from src/io/parquet/read/primitive/basic.rs rename to src/io/parquet/read/deserialize/primitive/basic.rs diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs similarity index 98% rename from src/io/parquet/read/primitive/dictionary.rs rename to src/io/parquet/read/deserialize/primitive/dictionary.rs index 7022d2b121c..bf6e4d4b9b3 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -10,11 +10,11 @@ use crate::{ bitmap::MutableBitmap, datatypes::DataType, error::Result, - io::parquet::read::utils::MaybeNext, types::NativeType, }; use super::super::dictionary::*; +use super::super::utils::MaybeNext; use super::super::DataPages; #[inline] diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs similarity index 100% rename from src/io/parquet/read/primitive/mod.rs rename to src/io/parquet/read/deserialize/primitive/mod.rs diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs similarity index 95% rename from src/io/parquet/read/primitive/nested.rs rename to src/io/parquet/read/deserialize/primitive/nested.rs index 62b00119315..cbfc2ae778c 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -7,7 +7,7 @@ use parquet2::{ use crate::{ array::PrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, - io::parquet::read::utils::MaybeNext, types::NativeType, + types::NativeType, }; use super::super::nested_utils::*; @@ -207,12 +207,12 @@ where &self.decoder, ); match maybe_state { - MaybeNext::Some(Ok((nested, values, validity))) => { + utils::MaybeNext::Some(Ok((nested, values, validity))) => { Some(Ok((nested, finish(&self.data_type, values, validity)))) } - MaybeNext::Some(Err(e)) => Some(Err(e)), - MaybeNext::None => None, - MaybeNext::More => self.next(), + utils::MaybeNext::Some(Err(e)) => Some(Err(e)), + utils::MaybeNext::None => None, + utils::MaybeNext::More => self.next(), } } } diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs new file mode 100644 index 00000000000..58cc7dbca6f --- /dev/null +++ b/src/io/parquet/read/deserialize/simple.rs @@ -0,0 +1,531 @@ +use std::sync::Arc; + +use parquet2::{ + schema::types::{ + LogicalType, ParquetType, PhysicalType, TimeUnit as ParquetTimeUnit, TimestampType, + }, + types::int96_to_i64_ns, +}; + +use crate::{ + array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array}, + datatypes::{DataType, IntervalUnit, TimeUnit}, + error::{ArrowError, Result}, + types::NativeType, +}; + +use super::super::{ArrayIter, DataPages}; +use super::binary; +use super::boolean; +use super::fixed_size_binary; +use super::null; +use super::primitive; + +/// Converts an iterator of arrays to a trait object returning trait objects +#[inline] +fn dyn_iter<'a, A, I>(iter: I) -> ArrayIter<'a> +where + A: Array + 'static, + I: Iterator> + Send + Sync + 'a, +{ + Box::new(iter.map(|x| x.map(|x| Arc::new(x) as Arc))) +} + +/// Converts an iterator of [MutablePrimitiveArray] into an iterator of [PrimitiveArray] +#[inline] +fn iden(iter: I) -> impl Iterator>> +where + T: NativeType, + I: Iterator>>, +{ + iter.map(|x| x.map(|x| x.into())) +} + +#[inline] +fn op(iter: I, op: F) -> impl Iterator>> +where + T: NativeType, + I: Iterator>>, + F: Fn(T) -> T + Copy, +{ + iter.map(move |x| { + x.map(move |mut x| { + x.values_mut_slice().iter_mut().for_each(|x| *x = op(*x)); + x.into() + }) + }) +} + +/// An iterator adapter that maps an iterator of DataPages into an iterator of Arrays +/// of [`DataType`] `data_type` and `chunk_size`. +pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( + pages: I, + type_: &ParquetType, + data_type: DataType, + chunk_size: usize, +) -> Result> { + use DataType::*; + + let (physical_type, logical_type) = if let ParquetType::PrimitiveType { + physical_type, + logical_type, + .. + } = type_ + { + (physical_type, logical_type) + } else { + return Err(ArrowError::InvalidArgumentError( + "page_iter_to_arrays can only be called with a parquet primitive type".into(), + )); + }; + + Ok(match data_type.to_logical_type() { + Null => null::iter_to_arrays(pages, data_type, chunk_size), + Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), + UInt8 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as u8, + ))), + UInt16 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as u16, + ))), + UInt32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as u32, + ))), + Int8 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as i8, + ))), + Int16 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as i16, + ))), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden( + primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32), + )), + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp( + pages, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), + + Decimal(_, _) => match physical_type { + PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i32| x as i128, + ))), + PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i64| x as i128, + ))), + &PhysicalType::FixedLenByteArray(n) if n > 16 => { + return Err(ArrowError::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", + n + ))) + } + &PhysicalType::FixedLenByteArray(n) => { + let n = n as usize; + + let pages = + fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) + }) + .collect::>(); + let validity = array.validity().cloned(); + + Ok(PrimitiveArray::::from_data( + data_type.clone(), + values.into(), + validity, + )) + }); + + let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); + + Box::new(arrays) as _ + } + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i64| x as i64, + ))), + UInt64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: i64| x as u64, + ))), + + Float32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: f32| x, + ))), + Float64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + |x: f64| x, + ))), + + Binary => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + LargeBinary => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + Utf8 => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + LargeUtf8 => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + + Dictionary(key_type, _, _) => { + return match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(pages, physical_type, logical_type, data_type, chunk_size) + }) + } + + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Reading {:?} from parquet still not implemented", + other + ))) + } + }) +} + +fn timestamp<'a, I: 'a + DataPages>( + pages: I, + physical_type: &PhysicalType, + logical_type: &Option, + data_type: DataType, + chunk_size: usize, + time_unit: TimeUnit, +) -> Result> { + if physical_type == &PhysicalType::Int96 { + if time_unit == TimeUnit::Nanosecond { + return Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + int96_to_i64_ns, + )))); + } else { + return Err(ArrowError::nyi( + "Can't decode int96 to timestamp other than ns", + )); + } + }; + if physical_type != &PhysicalType::Int64 { + return Err(ArrowError::nyi( + "Can't decode a timestamp from a non-int64 parquet type", + )); + } + + let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return Ok(dyn_iter(iden(iter))); + }; + + Ok(match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)), + }) +} + +fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( + pages: I, + physical_type: &PhysicalType, + logical_type: &Option, + data_type: DataType, + chunk_size: usize, + time_unit: TimeUnit, +) -> Result> { + if physical_type == &PhysicalType::Int96 { + if time_unit == TimeUnit::Nanosecond { + return Ok(dyn_iter(primitive::DictIter::::new( + pages, + DataType::Timestamp(TimeUnit::Nanosecond, None), + chunk_size, + int96_to_i64_ns, + ))); + } else { + return Err(ArrowError::nyi( + "Can't decode int96 to timestamp other than ns", + )); + } + }; + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return Ok(dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + ))); + }; + + Ok(match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000_000_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + }) +} + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + physical_type: &PhysicalType, + logical_type: &Option, + data_type: DataType, + chunk_size: usize, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u8, + )), + UInt16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u32, + )), + Int8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i8, + )), + Int16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { + x as i32 + }), + ), + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), + ), + Float32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f32| x, + )), + Float64 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f64| x, + )), + + Utf8 | Binary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + iter, data_type, chunk_size, + )), + other => { + return Err(ArrowError::nyi(format!( + "Reading dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/deserialize/utils.rs similarity index 99% rename from src/io/parquet/read/utils.rs rename to src/io/parquet/read/deserialize/utils.rs index b86c8da58f3..c9a9604992c 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -9,7 +9,7 @@ use crate::bitmap::utils::BitmapIter; use crate::bitmap::MutableBitmap; use crate::error::ArrowError; -use super::DataPages; +use super::super::DataPages; #[derive(Debug)] pub struct BinaryIter<'a> { diff --git a/src/io/parquet/read/file.rs b/src/io/parquet/read/file.rs index 126253e6b2e..8251392c6c0 100644 --- a/src/io/parquet/read/file.rs +++ b/src/io/parquet/read/file.rs @@ -14,14 +14,13 @@ use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, Row type GroupFilter = Arc bool>; -/// An iterator of [`Chunk`] coming from row groups of a paquet file. +/// An iterator of [`Chunk`]s coming from row groups of a parquet file. /// -/// This can be thought of flatten chain of [`Iterator`] - each row group is sequentially +/// This can be thought of a flatten chain of [`Iterator`] - each row group is sequentially /// mapped to an [`Iterator`] and each iterator is iterated upon until either the limit /// or the last iterator ends. -/// /// # Implementation -/// Note that because +/// This iterator mixes IO-bounded and CPU-bounded operations. pub struct FileReader { row_groups: RowGroupReader, metadata: FileMetaData, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 9539652a7de..14bcbef3c02 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -1,12 +1,15 @@ //! APIs to read from Parquet format. #![allow(clippy::type_complexity)] -use std::{ - io::{Read, Seek}, - sync::Arc, -}; +mod deserialize; +mod file; +mod row_group; +pub mod schema; +pub mod statistics; use futures::{AsyncRead, AsyncSeek}; + +// re-exports of parquet2's relevant APIs pub use parquet2::{ error::ParquetError, fallible_streaming_iterator, @@ -26,97 +29,32 @@ pub use parquet2::{ FallibleStreamingIterator, }; -use crate::{ - array::{ - Array, BinaryArray, ListArray, MutablePrimitiveArray, PrimitiveArray, StructArray, - Utf8Array, - }, - datatypes::{DataType, Field}, - error::{ArrowError, Result}, - types::NativeType, -}; - -mod binary; -mod boolean; -mod deserialize; -mod dictionary; -mod file; -mod fixed_size_binary; -mod nested_utils; -mod null; -mod primitive; -mod row_group; -pub mod schema; -pub mod statistics; -mod utils; - +pub use deserialize::{column_iter_to_arrays, get_page_iterator}; pub use file::{FileReader, RowGroupReader}; pub use row_group::*; pub(crate) use schema::is_type_nullable; pub use schema::{infer_schema, FileMetaData}; -use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; -use deserialize::page_iter_to_arrays; +use std::{ + io::{Read, Seek}, + sync::Arc, +}; + +use crate::{array::Array, error::Result}; /// Trait describing a [`FallibleStreamingIterator`] of [`DataPage`] pub trait DataPages: FallibleStreamingIterator + Send + Sync { } + impl + Send + Sync> DataPages for I { } -/// Converts an iterator of arrays to a trait object returning trait objects -#[inline] -fn dyn_iter<'a, A, I>(iter: I) -> ArrayIter<'a> -where - A: Array + 'static, - I: Iterator> + Send + Sync + 'a, -{ - Box::new(iter.map(|x| x.map(|x| Arc::new(x) as Arc))) -} - -/// Converts an iterator of [MutablePrimitiveArray] into an iterator of [PrimitiveArray] -#[inline] -fn iden(iter: I) -> impl Iterator>> -where - T: NativeType, - I: Iterator>>, -{ - iter.map(|x| x.map(|x| x.into())) -} - -#[inline] -fn op(iter: I, op: F) -> impl Iterator>> -where - T: NativeType, - I: Iterator>>, - F: Fn(T) -> T + Copy, -{ - iter.map(move |x| { - x.map(move |mut x| { - x.values_mut_slice().iter_mut().for_each(|x| *x = op(*x)); - x.into() - }) - }) -} - -/// Creates a new iterator of compressed pages. -pub fn get_page_iterator( - column_metadata: &ColumnChunkMetaData, - reader: R, - pages_filter: Option, - buffer: Vec, -) -> Result> { - Ok(_get_page_iterator( - column_metadata, - reader, - pages_filter, - buffer, - )?) -} +/// Type def for a sharable, boxed dyn [`Iterator`] of arrays +pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; /// Reads parquets' metadata syncronously. pub fn read_metadata(reader: &mut R) -> Result { @@ -129,239 +67,3 @@ pub async fn read_metadata_async( ) -> Result { Ok(_read_metadata_async(reader).await?) } - -fn create_list( - data_type: DataType, - nested: &mut NestedState, - values: Arc, -) -> Result> { - Ok(match data_type { - DataType::List(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); - - let offsets = offsets.iter().map(|x| *x as i32).collect::>(); - Arc::new(ListArray::::from_data( - data_type, - offsets.into(), - values, - validity, - )) - } - DataType::LargeList(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); - - Arc::new(ListArray::::from_data( - data_type, offsets, values, validity, - )) - } - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }) -} - -struct StructIterator<'a> { - iters: Vec>, - fields: Vec, -} - -impl<'a> StructIterator<'a> { - pub fn new(iters: Vec>, fields: Vec) -> Self { - assert_eq!(iters.len(), fields.len()); - Self { iters, fields } - } -} - -impl<'a> Iterator for StructIterator<'a> { - type Item = Result<(NestedState, Arc)>; - - fn next(&mut self) -> Option { - let values = self - .iters - .iter_mut() - .map(|iter| iter.next()) - .collect::>(); - - if values.iter().any(|x| x.is_none()) { - return None; - } - let values = values - .into_iter() - .map(|x| x.unwrap().map(|x| x.1)) - .collect::>>(); - - match values { - Ok(values) => Some(Ok(( - NestedState::new(vec![]), // todo - Arc::new(StructArray::from_data( - DataType::Struct(self.fields.clone()), - values, - None, - )), - ))), - Err(e) => Some(Err(e)), - } - } -} - -fn columns_to_iter_recursive<'a, I: 'a>( - mut columns: Vec, - mut types: Vec<&ParquetType>, - field: Field, - mut init: Vec, - chunk_size: usize, -) -> Result> -where - I: DataPages, -{ - use DataType::*; - if init.len() == 1 && init[0].is_primitive() { - return Ok(Box::new( - page_iter_to_arrays( - columns.pop().unwrap(), - types.pop().unwrap(), - field.data_type, - chunk_size, - )? - .map(|x| Ok((NestedState::new(vec![]), x?))), - )); - } - - Ok(match field.data_type().to_logical_type() { - Boolean => { - types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size) - } - Int16 => { - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init.pop().unwrap(), - field.data_type().clone(), - chunk_size, - |x: i32| x as i16, - ) - } - Int64 => { - types.pop(); - primitive::iter_to_arrays_nested( - columns.pop().unwrap(), - init.pop().unwrap(), - field.data_type().clone(), - chunk_size, - |x: i64| x, - ) - } - Utf8 => { - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init.pop().unwrap(), - field.data_type().clone(), - chunk_size, - ) - } - LargeBinary => { - types.pop(); - binary::iter_to_arrays_nested::, _>( - columns.pop().unwrap(), - init.pop().unwrap(), - field.data_type().clone(), - chunk_size, - ) - } - List(inner) => { - let iter = columns_to_iter_recursive( - vec![columns.pop().unwrap()], - types, - inner.as_ref().clone(), - init, - chunk_size, - )?; - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array)?; - Ok((nested, array)) - }); - Box::new(iter) as _ - } - Struct(fields) => { - let columns = fields - .iter() - .rev() - .map(|f| { - columns_to_iter_recursive( - vec![columns.pop().unwrap()], - vec![types.pop().unwrap()], - f.clone(), - vec![init.pop().unwrap()], - chunk_size, - ) - }) - .collect::>>()?; - let columns = columns.into_iter().rev().collect(); - Box::new(StructIterator::new(columns, fields.clone())) - } - _ => todo!(), - }) -} - -fn field_to_init(field: &Field) -> Vec { - use crate::datatypes::PhysicalType::*; - match field.data_type.to_physical_type() { - Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], - List | FixedSizeList | LargeList => { - let a = field.data_type().to_logical_type(); - let inner = if let DataType::List(inner) = a { - field_to_init(inner) - } else if let DataType::LargeList(inner) = a { - field_to_init(inner) - } else if let DataType::FixedSizeList(inner, _) = a { - field_to_init(inner) - } else { - unreachable!() - }; - inner - .into_iter() - .map(|x| InitNested::List(Box::new(x), field.is_nullable)) - .collect() - } - Struct => { - let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() { - fields.iter().rev().map(field_to_init).collect::>() - } else { - unreachable!() - }; - inner - .into_iter() - .flatten() - .map(|x| InitNested::Struct(Box::new(x), field.is_nullable)) - .collect() - } - _ => todo!(), - } -} - -/// Returns an iterator of [`Array`] built from an iterator of column chunks. -pub fn column_iter_to_arrays<'a, I: 'static>( - columns: Vec, - types: Vec<&ParquetType>, - field: Field, - chunk_size: usize, -) -> Result> -where - I: DataPages, -{ - let init = field_to_init(&field); - - Ok(Box::new( - columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)), - )) -} - -/// Type def for a sharable, boxed dyn [`Iterator`] of arrays -pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 587544b6889..f83a65eadbe 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -87,7 +87,7 @@ impl Iterator for RowGroupDeserializer { } } -/// Returns all the parquet columns associated to `field_name`. +/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. /// For non-nested parquet types, this returns a single column pub(super) fn get_field_columns<'a>( columns: &'a [ColumnChunkMetaData],