From 485ce5668808d0d22b4cf7055d21cb0f5957c76d Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 4 Feb 2022 21:03:30 +0000 Subject: [PATCH 1/2] Simpler --- src/io/parquet/read/binary/basic.rs | 6 +- src/io/parquet/read/binary/dictionary.rs | 26 +- src/io/parquet/read/binary/mod.rs | 18 +- src/io/parquet/read/boolean/basic.rs | 6 +- src/io/parquet/read/boolean/mod.rs | 15 +- src/io/parquet/read/deserialize/mod.rs | 397 ++++++++++++++++++ .../parquet/read/fixed_size_binary/basic.rs | 6 +- .../read/fixed_size_binary/dictionary.rs | 23 +- src/io/parquet/read/fixed_size_binary/mod.rs | 4 +- src/io/parquet/read/mod.rs | 370 +--------------- src/io/parquet/read/primitive/dictionary.rs | 31 +- src/io/parquet/read/primitive/mod.rs | 2 +- 12 files changed, 433 insertions(+), 471 deletions(-) create mode 100644 src/io/parquet/read/deserialize/mod.rs diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index ce6be424cc7..e53dab3b07d 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -277,7 +277,7 @@ pub(super) fn finish>( ) } -pub struct BinaryArrayIterator, I: DataPages> { +pub struct Iter, I: DataPages> { iter: I, data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, @@ -285,7 +285,7 @@ pub struct BinaryArrayIterator, I: DataPages> phantom_a: std::marker::PhantomData, } -impl, I: DataPages> BinaryArrayIterator { +impl, I: DataPages> Iter { pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { Self { iter, @@ -297,7 +297,7 @@ impl, I: DataPages> BinaryArrayIterator, I: DataPages> Iterator for BinaryArrayIterator { +impl, I: DataPages> Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index c47ed4c4b73..71e7d5a4b47 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -3,9 +3,7 @@ use std::{collections::VecDeque, sync::Arc}; use parquet2::page::BinaryPageDict; use crate::{ - array::{ - Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array, - }, + array::{BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array}, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::{ArrowError, Result}, @@ -14,12 +12,11 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; -use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation #[derive(Debug)] -pub struct ArrayIterator +pub struct DictIter where I: DataPages, O: Offset, @@ -33,13 +30,13 @@ where phantom: std::marker::PhantomData, } -impl ArrayIterator +impl DictIter where K: DictionaryKey, O: Offset, I: DataPages, { - fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), @@ -55,7 +52,7 @@ where } } -impl Iterator for ArrayIterator +impl Iterator for DictIter where I: DataPages, O: Offset, @@ -162,16 +159,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, K, O, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> -where - I: 'a + DataPages, - O: Offset, - K: DictionaryKey, -{ - Box::new( - ArrayIterator::::new(iter, data_type, chunk_size) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 421328d66b2..7c43154073c 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -3,8 +3,6 @@ mod dictionary; mod nested; mod utils; -pub use dictionary::iter_to_arrays as iter_to_dict_arrays; - use std::sync::Arc; use crate::{ @@ -14,25 +12,13 @@ use crate::{ use self::basic::TraitBinaryArray; use self::nested::ArrayIterator; -use super::ArrayIter; use super::{ nested_utils::{InitNested, NestedArrayIter}, DataPages, }; -use basic::BinaryArrayIterator; -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, O, A, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> -where - I: 'a + DataPages, - A: TraitBinaryArray, - O: Offset, -{ - Box::new( - BinaryArrayIterator::::new(iter, data_type, chunk_size) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} +pub use basic::Iter; +pub use dictionary::DictIter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, O, A, I>( diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 6fee13c3a0c..8172356411b 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -140,14 +140,14 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct BooleanArrayIterator { +pub struct Iter { iter: I, data_type: DataType, items: VecDeque<(MutableBitmap, MutableBitmap)>, chunk_size: usize, } -impl BooleanArrayIterator { +impl Iter { pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { Self { iter, @@ -158,7 +158,7 @@ impl BooleanArrayIterator { } } -impl Iterator for BooleanArrayIterator { +impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 42bd4a8d2b9..61aeeb97757 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -3,26 +3,15 @@ mod nested; use std::sync::Arc; -use crate::{array::Array, datatypes::DataType}; +use crate::array::Array; -use self::basic::BooleanArrayIterator; use self::nested::ArrayIterator; -use super::ArrayIter; use super::{ nested_utils::{InitNested, NestedArrayIter}, DataPages, }; -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> -where - I: DataPages, -{ - Box::new( - BooleanArrayIterator::new(iter, data_type, chunk_size) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} +pub use self::basic::Iter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I: 'a>( diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs new file mode 100644 index 00000000000..a310980e617 --- /dev/null +++ b/src/io/parquet/read/deserialize/mod.rs @@ -0,0 +1,397 @@ +//! APIs to read from Parquet format. +use std::sync::Arc; + +use parquet2::{ + schema::types::{ + LogicalType, ParquetType, PhysicalType, TimeUnit as ParquetTimeUnit, TimestampType, + }, + types::int96_to_i64_ns, +}; + +use crate::{ + array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, Utf8Array}, + datatypes::{DataType, IntervalUnit, TimeUnit}, + error::{ArrowError, Result}, + io::parquet::read::primitive::read_item, +}; + +use super::binary; +use super::boolean; +use super::fixed_size_binary; +use super::null; +use super::primitive; +use super::{dyn_iter, iden, op, ArrayIter, DataPages}; + +pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( + pages: I, + type_: &ParquetType, + data_type: DataType, + chunk_size: usize, +) -> Result> { + use DataType::*; + + Ok(match data_type.to_logical_type() { + Null => null::iter_to_arrays(pages, data_type, chunk_size), + Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), + UInt8 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as u8, + ))), + UInt16 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as u16, + ))), + UInt32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as u32, + ))), + Int8 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as i8, + ))), + Int16 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as i16, + ))), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden( + primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32), + )), + + Timestamp(time_unit, None) => { + let time_unit = *time_unit; + return timestamp(pages, type_, data_type, chunk_size, time_unit); + } + + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), + + Decimal(_, _) => match type_ { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as i128, + ))), + PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i64| x as i128, + ))), + &PhysicalType::FixedLenByteArray(n) if n > 16 => { + return Err(ArrowError::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", + n + ))) + } + &PhysicalType::FixedLenByteArray(n) => { + let n = n as usize; + + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + chunk_size, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) + }) + .collect::>(); + let validity = array.validity().cloned(); + + Ok(PrimitiveArray::::from_data( + data_type.clone(), + values.into(), + validity, + )) + }); + + let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); + + Box::new(arrays) as _ + } + _ => unreachable!(), + }, + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter(iden( + primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64), + )), + UInt64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i64| x as u64, + ))), + + Float32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: f32| x, + ))), + Float64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: f64| x, + ))), + + Binary => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + LargeBinary => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + Utf8 => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + LargeUtf8 => dyn_iter(binary::Iter::, _>::new( + pages, data_type, chunk_size, + )), + + Dictionary(key_type, _, _) => { + return match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(pages, type_, data_type, chunk_size) + }) + } + + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Reading {:?} from parquet still not implemented", + other + ))) + } + }) +} + +fn timestamp<'a, I: 'a + DataPages>( + pages: I, + type_: &ParquetType, + data_type: DataType, + chunk_size: usize, + time_unit: TimeUnit, +) -> Result> { + let (physical_type, logical_type) = if let ParquetType::PrimitiveType { + physical_type, + logical_type, + .. + } = type_ + { + (physical_type, logical_type) + } else { + unreachable!() + }; + + if physical_type == &PhysicalType::Int96 { + if time_unit == TimeUnit::Nanosecond { + return Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + int96_to_i64_ns, + )))); + } else { + return Err(ArrowError::nyi( + "Can't decode int96 to timestamp other than ns", + )); + } + }; + if physical_type != &PhysicalType::Int64 { + return Err(ArrowError::nyi( + "Can't decode a timestamp from a non-int64 parquet type", + )); + } + + let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x); + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return Ok(dyn_iter(iden(iter))); + }; + + Ok(match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)), + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)), + }) +} + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + type_: &ParquetType, + data_type: DataType, + chunk_size: usize, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u8, + )), + UInt16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u32, + )), + Int8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i8, + )), + Int16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { + x as i32 + }), + ), + + Timestamp(TimeUnit::Nanosecond, None) => match type_ { + ParquetType::PrimitiveType { + physical_type, + logical_type, + .. + } => match (physical_type, logical_type) { + (PhysicalType::Int96, _) => dyn_iter(primitive::DictIter::::new( + iter, + DataType::Timestamp(TimeUnit::Nanosecond, None), + chunk_size, + int96_to_i64_ns, + )), + (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit { + ParquetTimeUnit::MILLIS(_) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i64| x * 1_000_000, + )) + } + ParquetTimeUnit::MICROS(_) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i64| x * 1_000, + )) + } + ParquetTimeUnit::NANOS(_) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i64| x, + )) + } + }, + _ => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i64| x, + )), + }, + _ => unreachable!(), + }, + + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), + ), + Float32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f32| x, + )), + Float64 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f64| x, + )), + + Utf8 | Binary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + iter, data_type, chunk_size, + )), + other => { + return Err(ArrowError::nyi(format!( + "Reading dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/fixed_size_binary/basic.rs b/src/io/parquet/read/fixed_size_binary/basic.rs index 90eda3a67a1..0c7801ac5fa 100644 --- a/src/io/parquet/read/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/fixed_size_binary/basic.rs @@ -214,7 +214,7 @@ fn finish( FixedSizeBinaryArray::from_data(data_type.clone(), values.values.into(), validity.into()) } -pub struct BinaryArrayIterator { +pub struct Iter { iter: I, data_type: DataType, size: usize, @@ -222,7 +222,7 @@ pub struct BinaryArrayIterator { chunk_size: usize, } -impl BinaryArrayIterator { +impl Iter { pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { let size = FixedSizeBinaryArray::get_size(&data_type); Self { @@ -235,7 +235,7 @@ impl BinaryArrayIterator { } } -impl Iterator for BinaryArrayIterator { +impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { diff --git a/src/io/parquet/read/fixed_size_binary/dictionary.rs b/src/io/parquet/read/fixed_size_binary/dictionary.rs index b1dd9e4ef44..94dbf71cebc 100644 --- a/src/io/parquet/read/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/fixed_size_binary/dictionary.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, sync::Arc}; use parquet2::page::FixedLenByteArrayPageDict; use crate::{ - array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}, + array::{DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}, bitmap::MutableBitmap, datatypes::DataType, error::{ArrowError, Result}, @@ -12,12 +12,11 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; -use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation #[derive(Debug)] -pub struct ArrayIterator +pub struct DictIter where I: DataPages, K: DictionaryKey, @@ -29,12 +28,12 @@ where chunk_size: usize, } -impl ArrayIterator +impl DictIter where K: DictionaryKey, I: DataPages, { - fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), @@ -49,7 +48,7 @@ where } } -impl Iterator for ArrayIterator +impl Iterator for DictIter where I: DataPages, K: DictionaryKey, @@ -138,15 +137,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, K, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> -where - I: 'a + DataPages, - K: DictionaryKey, -{ - Box::new( - ArrayIterator::::new(iter, data_type, chunk_size) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} diff --git a/src/io/parquet/read/fixed_size_binary/mod.rs b/src/io/parquet/read/fixed_size_binary/mod.rs index a0f2b3121cc..8173065d37c 100644 --- a/src/io/parquet/read/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/fixed_size_binary/mod.rs @@ -2,5 +2,5 @@ mod basic; mod dictionary; mod utils; -pub use basic::BinaryArrayIterator; -pub use dictionary::iter_to_arrays as iter_to_dict_arrays; +pub use basic::Iter; +pub use dictionary::DictIter; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d00976bc989..24c86444446 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -28,10 +28,10 @@ pub use parquet2::{ use crate::{ array::{ - Array, BinaryArray, DictionaryKey, ListArray, MutablePrimitiveArray, PrimitiveArray, - StructArray, Utf8Array, + Array, BinaryArray, ListArray, MutablePrimitiveArray, PrimitiveArray, StructArray, + Utf8Array, }, - datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + datatypes::{DataType, Field}, error::{ArrowError, Result}, io::parquet::read::primitive::read_item, types::NativeType, @@ -39,6 +39,7 @@ use crate::{ mod binary; mod boolean; +mod deserialize; mod dictionary; mod file; mod fixed_size_binary; @@ -56,6 +57,7 @@ pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; +use deserialize::page_iter_to_arrays; pub trait DataPages: FallibleStreamingIterator + Send + Sync @@ -128,368 +130,6 @@ pub async fn read_metadata_async( Ok(_read_metadata_async(reader).await?) } -fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( - iter: I, - type_: &ParquetType, - data_type: DataType, - chunk_size: usize, -) -> Result> { - use DataType::*; - let values_data_type = if let Dictionary(_, v, _) = &data_type { - v.as_ref() - } else { - panic!() - }; - - Ok(match values_data_type.to_logical_type() { - UInt8 => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as u8, - ), - UInt16 => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as u16, - ), - UInt32 => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as u32, - ), - Int8 => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as i8, - ), - Int16 => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as i16, - ), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i32| x as i32, - ) - } - - Timestamp(TimeUnit::Nanosecond, None) => match type_ { - ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } => match (physical_type, logical_type) { - (PhysicalType::Int96, _) => primitive::iter_to_dict_arrays::( - iter, - DataType::Timestamp(TimeUnit::Nanosecond, None), - chunk_size, - int96_to_i64_ns, - ), - (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit { - ParquetTimeUnit::MILLIS(_) => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i64| x * 1_000_000, - ), - ParquetTimeUnit::MICROS(_) => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i64| x * 1_000, - ), - ParquetTimeUnit::NANOS(_) => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i64| x, - ), - }, - _ => primitive::iter_to_dict_arrays::( - iter, - data_type, - chunk_size, - |x: i64| x, - ), - }, - _ => unreachable!(), - }, - - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { - primitive::iter_to_dict_arrays::(iter, data_type, chunk_size, |x: i64| x) - } - Float32 => { - primitive::iter_to_dict_arrays::(iter, data_type, chunk_size, |x: f32| x) - } - Float64 => { - primitive::iter_to_dict_arrays::(iter, data_type, chunk_size, |x: f64| x) - } - - Utf8 | Binary => binary::iter_to_dict_arrays::(iter, data_type, chunk_size), - LargeUtf8 | LargeBinary => { - binary::iter_to_dict_arrays::(iter, data_type, chunk_size) - } - FixedSizeBinary(_) => { - fixed_size_binary::iter_to_dict_arrays::(iter, data_type, chunk_size) - } - other => { - return Err(ArrowError::nyi(format!( - "Reading dictionaries of type {:?}", - other - ))) - } - }) -} - -fn timestamp<'a, I: 'a + DataPages>( - pages: I, - type_: &ParquetType, - data_type: DataType, - chunk_size: usize, - time_unit: TimeUnit, -) -> Result> { - let (physical_type, logical_type) = if let ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } = type_ - { - (physical_type, logical_type) - } else { - unreachable!() - }; - - if physical_type == &PhysicalType::Int96 { - if time_unit == TimeUnit::Nanosecond { - return Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - int96_to_i64_ns, - )))); - } else { - return Err(ArrowError::nyi( - "Can't decode int96 to timestamp other than ns", - )); - } - }; - if physical_type != &PhysicalType::Int64 { - return Err(ArrowError::nyi( - "Can't decode a timestamp from a non-int64 parquet type", - )); - } - - let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x); - - let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { - unit - } else { - return Ok(dyn_iter(iden(iter))); - }; - - Ok(match (unit, time_unit) { - (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)), - - (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)), - - (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)), - - (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)), - (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)), - (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)), - }) -} - -fn page_iter_to_arrays<'a, I: 'a + DataPages>( - pages: I, - type_: &ParquetType, - data_type: DataType, - chunk_size: usize, -) -> Result> { - use DataType::*; - - match data_type.to_logical_type() { - Null => Ok(null::iter_to_arrays(pages, data_type, chunk_size)), - Boolean => Ok(boolean::iter_to_arrays(pages, data_type, chunk_size)), - UInt8 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as u8, - )))), - UInt16 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as u16, - )))), - UInt32 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as u32, - )))), - Int8 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as i8, - )))), - Int16 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as i16, - )))), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32), - ))), - - Timestamp(time_unit, None) => { - let time_unit = *time_unit; - timestamp(pages, type_, data_type, chunk_size, time_unit) - } - - FixedSizeBinary(_) => Ok(Box::new( - fixed_size_binary::BinaryArrayIterator::new(pages, data_type, chunk_size) - .map(|x| x.map(|x| Arc::new(x) as _)), - )), - - Decimal(_, _) => match type_ { - ParquetType::PrimitiveType { physical_type, .. } => Ok(match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as i128, - ))), - PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i64| x as i128, - ))), - &PhysicalType::FixedLenByteArray(n) if n > 16 => { - return Err(ArrowError::NotYetImplemented(format!( - "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", - n - ))) - } - &PhysicalType::FixedLenByteArray(n) => { - let n = n as usize; - - let pages = fixed_size_binary::BinaryArrayIterator::new( - pages, - DataType::FixedSizeBinary(n), - chunk_size, - ); - - let pages = pages.map(move |maybe_array| { - let array = maybe_array?; - let values = array - .values() - .chunks_exact(n) - .map(|value: &[u8]| { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value); - i128::from_be_bytes(bytes) >> (8 * (16 - n)) - }) - .collect::>(); - let validity = array.validity().cloned(); - - Ok(PrimitiveArray::::from_data( - data_type.clone(), - values.into(), - validity, - )) - }); - - let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); - - Box::new(arrays) as _ - } - _ => unreachable!(), - }), - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(dyn_iter(iden( - primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64), - ))), - UInt64 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i64| x as u64, - )))), - - Float32 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: f32| x, - )))), - Float64 => Ok(dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: f64| x, - )))), - - Binary => Ok(binary::iter_to_arrays::, _>( - pages, data_type, chunk_size, - )), - LargeBinary => Ok(binary::iter_to_arrays::, _>( - pages, data_type, chunk_size, - )), - Utf8 => Ok(binary::iter_to_arrays::, _>( - pages, data_type, chunk_size, - )), - LargeUtf8 => Ok(binary::iter_to_arrays::, _>( - pages, data_type, chunk_size, - )), - - Dictionary(key_type, _, _) => match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, type_, data_type, chunk_size) - }), - - other => Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from parquet still not implemented", - other - ))), - } -} - fn create_list( data_type: DataType, nested: &mut NestedState, diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index f2fc336139c..84c1379f697 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -3,7 +3,7 @@ use std::{collections::VecDeque, sync::Arc}; use parquet2::{page::PrimitivePageDict, types::NativeType as ParquetNativeType}; use crate::{ - array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, + array::{DictionaryArray, DictionaryKey, PrimitiveArray}, bitmap::MutableBitmap, datatypes::DataType, error::{ArrowError, Result}, @@ -13,12 +13,11 @@ use crate::{ use super::super::dictionary::*; use super::super::utils; use super::super::utils::Decoder; -use super::super::ArrayIter; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct ArrayIterator +pub struct DictIter where I: DataPages, T: NativeType, @@ -35,7 +34,7 @@ where phantom: std::marker::PhantomData

, } -impl ArrayIterator +impl DictIter where K: DictionaryKey, I: DataPages, @@ -44,7 +43,7 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: usize, op: F) -> Self { let data_type = match data_type { DataType::Dictionary(_, values, _) => *values, _ => data_type, @@ -61,7 +60,7 @@ where } } -impl Iterator for ArrayIterator +impl Iterator for DictIter where I: DataPages, T: NativeType, @@ -154,23 +153,3 @@ where } } } - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays<'a, K, I, T, P, F>( - iter: I, - data_type: DataType, - chunk_size: usize, - op: F, -) -> ArrayIter<'a> -where - I: 'a + DataPages, - K: DictionaryKey, - T: NativeType, - P: ParquetNativeType, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - ArrayIterator::::new(iter, data_type, chunk_size, op) - .map(|x| x.map(|x| Arc::new(x) as Arc)), - ) -} diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 29e9f4bb27a..a6cf2ad21c5 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -3,7 +3,7 @@ mod dictionary; mod nested; mod utils; -pub use dictionary::iter_to_arrays as iter_to_dict_arrays; +pub use dictionary::DictIter; pub use utils::read_item; use std::sync::Arc; From 6c29f698624bbacc7a37421280835776de3b689a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 4 Feb 2022 21:57:51 +0000 Subject: [PATCH 2/2] Simplify code --- src/io/parquet/read/binary/dictionary.rs | 142 ++++++------------ src/io/parquet/read/dictionary.rs | 91 ++++++++++- .../read/fixed_size_binary/dictionary.rs | 113 ++++---------- src/io/parquet/read/primitive/dictionary.rs | 119 +++++---------- 4 files changed, 192 insertions(+), 273 deletions(-) diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index 71e7d5a4b47..1a95b8a1dcc 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -1,17 +1,16 @@ use std::{collections::VecDeque, sync::Arc}; -use parquet2::page::BinaryPageDict; +use parquet2::page::{BinaryPageDict, DictPage}; use crate::{ - array::{BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array}, + array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array}, bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, - error::{ArrowError, Result}, + error::Result, + io::parquet::read::utils::MaybeNext, }; use super::super::dictionary::*; -use super::super::utils; -use super::super::utils::Decoder; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation @@ -52,6 +51,32 @@ where } } +fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Arc { + let dict = dict.as_any().downcast_ref::().unwrap(); + let offsets = dict + .offsets() + .iter() + .map(|x| O::from_usize(*x as usize).unwrap()) + .collect::>(); + let values = dict.values().to_vec(); + + match data_type.to_physical_type() { + PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Arc::new(Utf8Array::::from_data( + data_type, + offsets.into(), + values.into(), + None, + )) as _, + PhysicalType::Binary | PhysicalType::LargeBinary => Arc::new(BinaryArray::::from_data( + data_type, + offsets.into(), + values.into(), + None, + )) as _, + _ => unreachable!(), + } +} + impl Iterator for DictIter where I: DataPages, @@ -61,101 +86,18 @@ where type Item = Result>; fn next(&mut self) -> Option { - // back[a1, a2, a3, ...]front - if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - let keys = finish_key(values, validity); - let values = self.values.unwrap(); - Ok(DictionaryArray::from_data(keys, values)) - }); - } - match (self.items.pop_back(), self.iter.next()) { - (_, Err(e)) => Some(Err(e.into())), - (None, Ok(None)) => None, - (state, Ok(Some(page))) => { - // consume the dictionary page - if let Some(dict) = page.dictionary_page() { - let dict = dict.as_any().downcast_ref::().unwrap(); - self.values = match &mut self.values { - Dict::Empty => { - let offsets = dict - .offsets() - .iter() - .map(|x| O::from_usize(*x as usize).unwrap()) - .collect::>(); - let values = dict.values().to_vec(); - - let array = match self.data_type.to_physical_type() { - PhysicalType::Utf8 | PhysicalType::LargeUtf8 => { - Arc::new(Utf8Array::::from_data( - self.data_type.clone(), - offsets.into(), - values.into(), - None, - )) as _ - } - PhysicalType::Binary | PhysicalType::LargeBinary => { - Arc::new(BinaryArray::::from_data( - self.data_type.clone(), - offsets.into(), - values.into(), - None, - )) as _ - } - _ => unreachable!(), - }; - - Dict::Complete(array) - } - _ => unreachable!(), - }; - } else { - return Some(Err(ArrowError::nyi( - "dictionary arrays from non-dict-encoded pages", - ))); - } - - let maybe_array = { - // there is a new page => consume the page from the start - let maybe_page = PrimitiveDecoder::default().build_state(page); - let page = match maybe_page { - Ok(page) => page, - Err(e) => return Some(Err(e)), - }; - - utils::extend_from_new_page::, _, _>( - page, - state, - self.chunk_size, - &mut self.items, - &PrimitiveDecoder::default(), - ) - }; - match maybe_array { - Ok(Some((values, validity))) => { - let keys = PrimitiveArray::from_data( - K::PRIMITIVE.into(), - values.into(), - validity.into(), - ); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } - Ok(None) => self.next(), - Err(e) => Some(Err(e)), - } - } - (Some((values, validity)), Ok(None)) => { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - debug_assert!(values.len() <= self.chunk_size); - - let keys = finish_key(values, validity); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } + let maybe_state = next_dict( + &mut self.iter, + &mut self.items, + &mut self.values, + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } } diff --git a/src/io/parquet/read/dictionary.rs b/src/io/parquet/read/dictionary.rs index ab3af462a4c..b9d1e9f21a2 100644 --- a/src/io/parquet/read/dictionary.rs +++ b/src/io/parquet/read/dictionary.rs @@ -1,17 +1,20 @@ -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc}; use parquet2::{ encoding::{hybrid_rle::HybridRleDecoder, Encoding}, - page::DataPage, + page::{DataPage, DictPage}, schema::Repetition, }; -use super::utils; use crate::{ - array::{Array, DictionaryKey, PrimitiveArray}, + array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, bitmap::MutableBitmap, - error::Result, - io::parquet::read::utils::{extend_from_decoder, OptionalPageValidity}, + error::{ArrowError, Result}, +}; + +use super::{ + utils::{self, extend_from_decoder, Decoder, MaybeNext, OptionalPageValidity}, + DataPages, }; // The state of a `DataPage` of `Primitive` parquet primitive type @@ -170,3 +173,79 @@ impl Dict { pub fn finish_key(values: Vec, validity: MutableBitmap) -> PrimitiveArray { PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()) } + +#[inline] +pub(super) fn next_dict< + 'a, + K: DictionaryKey, + I: DataPages, + F: Fn(&dyn DictPage) -> Arc, +>( + iter: &'a mut I, + items: &mut VecDeque<(Vec, MutableBitmap)>, + dict: &mut Dict, + chunk_size: usize, + read_dict: F, +) -> MaybeNext>> { + if items.len() > 1 { + let (values, validity) = items.pop_back().unwrap(); + let keys = finish_key(values, validity); + return MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap()))); + } + match (items.pop_back(), iter.next()) { + (_, Err(e)) => MaybeNext::Some(Err(e.into())), + (None, Ok(None)) => MaybeNext::None, + (state, Ok(Some(page))) => { + // consume the dictionary page + if let Some(dict_page) = page.dictionary_page() { + *dict = match dict { + Dict::Empty => Dict::Complete(read_dict(dict_page.as_ref())), + _ => unreachable!(), + }; + } else { + return MaybeNext::Some(Err(ArrowError::nyi( + "dictionary arrays from non-dict-encoded pages", + ))); + } + + let maybe_array = { + // there is a new page => consume the page from the start + let maybe_page = PrimitiveDecoder::default().build_state(page); + let page = match maybe_page { + Ok(page) => page, + Err(e) => return MaybeNext::Some(Err(e)), + }; + + utils::extend_from_new_page::, _, _>( + page, + state, + chunk_size, + items, + &PrimitiveDecoder::default(), + ) + }; + match maybe_array { + Ok(Some((values, validity))) => { + let keys = PrimitiveArray::from_data( + K::PRIMITIVE.into(), + values.into(), + validity.into(), + ); + + MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap()))) + } + Ok(None) => MaybeNext::More, + Err(e) => MaybeNext::Some(Err(e)), + } + } + (Some((values, validity)), Ok(None)) => { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + debug_assert!(values.len() <= chunk_size); + + let keys = finish_key(values, validity); + + MaybeNext::Some(Ok(DictionaryArray::from_data(keys, dict.unwrap()))) + } + } +} diff --git a/src/io/parquet/read/fixed_size_binary/dictionary.rs b/src/io/parquet/read/fixed_size_binary/dictionary.rs index 94dbf71cebc..4dc4fbcc81c 100644 --- a/src/io/parquet/read/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/fixed_size_binary/dictionary.rs @@ -1,17 +1,16 @@ use std::{collections::VecDeque, sync::Arc}; -use parquet2::page::FixedLenByteArrayPageDict; +use parquet2::page::{DictPage, FixedLenByteArrayPageDict}; use crate::{ - array::{DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}, + array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray}, bitmap::MutableBitmap, datatypes::DataType, - error::{ArrowError, Result}, + error::Result, }; use super::super::dictionary::*; -use super::super::utils; -use super::super::utils::Decoder; +use super::super::utils::MaybeNext; use super::super::DataPages; /// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation @@ -48,6 +47,20 @@ where } } +fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Arc { + let dict = dict + .as_any() + .downcast_ref::() + .unwrap(); + let values = dict.values().to_vec(); + + Arc::new(FixedSizeBinaryArray::from_data( + data_type, + values.into(), + None, + )) +} + impl Iterator for DictIter where I: DataPages, @@ -56,84 +69,18 @@ where type Item = Result>; fn next(&mut self) -> Option { - // back[a1, a2, a3, ...]front - if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - let keys = finish_key(values, validity); - let values = self.values.unwrap(); - Ok(DictionaryArray::from_data(keys, values)) - }); - } - match (self.items.pop_back(), self.iter.next()) { - (_, Err(e)) => Some(Err(e.into())), - (None, Ok(None)) => None, - (state, Ok(Some(page))) => { - // consume the dictionary page - if let Some(dict) = page.dictionary_page() { - let dict = dict - .as_any() - .downcast_ref::() - .unwrap(); - self.values = match &mut self.values { - Dict::Empty => { - let values = dict.values().to_vec(); - - let array = Arc::new(FixedSizeBinaryArray::from_data( - self.data_type.clone(), - values.into(), - None, - )) as _; - Dict::Complete(array) - } - _ => unreachable!(), - }; - } else { - return Some(Err(ArrowError::nyi( - "dictionary arrays from non-dict-encoded pages", - ))); - } - - let maybe_array = { - // there is a new page => consume the page from the start - let maybe_page = PrimitiveDecoder::default().build_state(page); - let page = match maybe_page { - Ok(page) => page, - Err(e) => return Some(Err(e)), - }; - - utils::extend_from_new_page::, _, _>( - page, - state, - self.chunk_size, - &mut self.items, - &PrimitiveDecoder::default(), - ) - }; - match maybe_array { - Ok(Some((values, validity))) => { - let keys = PrimitiveArray::from_data( - K::PRIMITIVE.into(), - values.into(), - validity.into(), - ); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } - Ok(None) => self.next(), - Err(e) => Some(Err(e)), - } - } - (Some((values, validity)), Ok(None)) => { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - debug_assert!(values.len() <= self.chunk_size); - - let keys = finish_key(values, validity); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } + let maybe_state = next_dict( + &mut self.iter, + &mut self.items, + &mut self.values, + self.chunk_size, + |dict| read_dict(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } } diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 84c1379f697..7022d2b121c 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -1,20 +1,38 @@ use std::{collections::VecDeque, sync::Arc}; -use parquet2::{page::PrimitivePageDict, types::NativeType as ParquetNativeType}; +use parquet2::{ + page::{DictPage, PrimitivePageDict}, + types::NativeType as ParquetNativeType, +}; use crate::{ - array::{DictionaryArray, DictionaryKey, PrimitiveArray}, + array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, bitmap::MutableBitmap, datatypes::DataType, - error::{ArrowError, Result}, + error::Result, + io::parquet::read::utils::MaybeNext, types::NativeType, }; use super::super::dictionary::*; -use super::super::utils; -use super::super::utils::Decoder; use super::super::DataPages; +#[inline] +fn read_dict(data_type: DataType, op: F, dict: &dyn DictPage) -> Arc +where + T: NativeType, + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + let dict = dict + .as_any() + .downcast_ref::>() + .unwrap(); + let values = dict.values().iter().map(|x| (op)(*x)).collect::>(); + + Arc::new(PrimitiveArray::from_data(data_type, values.into(), None)) +} + /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] pub struct DictIter @@ -71,85 +89,18 @@ where type Item = Result>; fn next(&mut self) -> Option { - // back[a1, a2, a3, ...]front - if self.items.len() > 1 { - return self.items.pop_back().map(|(values, validity)| { - let keys = finish_key(values, validity); - let values = self.values.unwrap(); - Ok(DictionaryArray::from_data(keys, values)) - }); - } - match (self.items.pop_back(), self.iter.next()) { - (_, Err(e)) => Some(Err(e.into())), - (None, Ok(None)) => None, - (state, Ok(Some(page))) => { - // consume the dictionary page - if let Some(dict) = page.dictionary_page() { - let dict = dict - .as_any() - .downcast_ref::>() - .unwrap(); - self.values = match &mut self.values { - Dict::Empty => { - let values = dict - .values() - .iter() - .map(|x| (self.op)(*x)) - .collect::>(); - - Dict::Complete(Arc::new(PrimitiveArray::from_data( - self.data_type.clone(), - values.into(), - None, - )) as _) - } - _ => unreachable!(), - }; - } else { - return Some(Err(ArrowError::nyi( - "dictionary arrays from non-dict-encoded pages", - ))); - } - - let maybe_array = { - // there is a new page => consume the page from the start - let decoder = PrimitiveDecoder::default(); - let maybe_page = decoder.build_state(page); - let page = match maybe_page { - Ok(page) => page, - Err(e) => return Some(Err(e)), - }; - - utils::extend_from_new_page::, _, _>( - page, - state, - self.chunk_size, - &mut self.items, - &PrimitiveDecoder::default(), - ) - }; - match maybe_array { - Ok(Some((values, validity))) => { - let keys = finish_key(values, validity); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } - Ok(None) => self.next(), - Err(e) => Some(Err(e)), - } - } - (Some((values, validity)), Ok(None)) => { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - debug_assert!(values.len() <= self.chunk_size); - - let keys = - PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()); - - let values = self.values.unwrap(); - Some(Ok(DictionaryArray::from_data(keys, values))) - } + let maybe_state = next_dict( + &mut self.iter, + &mut self.items, + &mut self.values, + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), self.op, dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), } } }