From e36876bcc4879baaa94c4df48588819b36665d97 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 8 Aug 2021 21:46:19 +0000 Subject: [PATCH] Added support for async parquet. --- src/io/parquet/read/binary/basic.rs | 45 ++++++++++++++ src/io/parquet/read/binary/mod.rs | 1 + src/io/parquet/read/boolean/basic.rs | 26 ++++++++ src/io/parquet/read/boolean/mod.rs | 1 + src/io/parquet/read/mod.rs | 92 ++++++++++++++++++++++------ src/io/parquet/read/primitive/mod.rs | 43 +++++++++++++ 6 files changed, 189 insertions(+), 19 deletions(-) diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 8ceca3ebbb3..a418c00ac59 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, @@ -353,3 +354,47 @@ where _ => unreachable!(), }) } + +pub async fn stream_to_array( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: &DataType, +) -> Result> +where + ArrowError: From, + O: Offset, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(0); + let mut offsets = MutableBuffer::::with_capacity(1 + capacity); + offsets.push(O::default()); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut offsets, + &mut values, + &mut validity, + )? + } + + Ok(match data_type { + DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( + offsets.into(), + values.into(), + validity.into(), + )), + DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( + offsets.into(), + values.into(), + validity.into(), + )), + _ => unreachable!(), + }) +} diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 13b47154377..912f1bb026b 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::iter_to_array; +pub use basic::stream_to_array; pub use nested::iter_to_array as iter_to_array_nested; diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 8e3e155d797..221da32be06 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -5,6 +5,8 @@ use crate::{ }; use super::super::utils; + +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, @@ -86,6 +88,30 @@ where Ok(BooleanArray::from_data(values.into(), validity.into())) } +pub async fn stream_to_array(pages: I, metadata: &ColumnChunkMetaData) -> Result +where + ArrowError: From, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBitmap::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut values, + &mut validity, + )? + } + + Ok(BooleanArray::from_data(values.into(), validity.into())) +} + fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 13b47154377..912f1bb026b 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::iter_to_array; +pub use basic::stream_to_array; pub use nested::iter_to_array as iter_to_array_nested; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 8cddeb220bf..a9e64c81d5f 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -1,24 +1,6 @@ use std::io::{Read, Seek}; -mod binary; -mod boolean; -mod fixed_size_binary; -mod nested_utils; -mod primitive; -mod record_batch; -pub mod schema; -pub mod statistics; -mod utils; - -use crate::{ - array::Array, - datatypes::{DataType, IntervalUnit, TimeUnit}, - error::{ArrowError, Result}, -}; - -pub use record_batch::RecordReader; -pub use schema::{get_schema, is_type_nullable, FileMetaData}; - +use futures::Stream; pub use parquet2::{ error::ParquetError, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, @@ -34,6 +16,25 @@ pub use parquet2::{ types::int96_to_i64_ns, }; +use crate::{ + array::Array, + datatypes::{DataType, IntervalUnit, TimeUnit}, + error::{ArrowError, Result}, +}; + +mod binary; +mod boolean; +mod fixed_size_binary; +mod nested_utils; +mod primitive; +mod record_batch; +pub mod schema; +pub mod statistics; +mod utils; + +pub use record_batch::RecordReader; +pub use schema::{get_schema, is_type_nullable, FileMetaData}; + /// Creates a new iterator of compressed pages. pub fn get_page_iterator<'b, RR: Read + Seek>( metadata: &FileMetaData, @@ -163,6 +164,59 @@ pub fn page_iter_to_array< } } +// Converts an async stream of compressed data pages into an [`Array`]. +pub async fn page_stream_to_array>>( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, +) -> Result> { + use DataType::*; + match data_type { + // INT32 + UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await, + UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await, + UInt32 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u32).await, + Int8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i8).await, + Int16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i16).await, + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i32).await + } + + Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int96 => { + primitive::stream_to_array( + pages, + metadata, + DataType::Timestamp(TimeUnit::Nanosecond, None), + int96_to_i64_ns, + ) + .await + } + _ => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await, + }, + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await + } + UInt64 => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x as u64).await, + + Float32 => primitive::stream_to_array(pages, metadata, data_type, |x: f32| x).await, + Float64 => primitive::stream_to_array(pages, metadata, data_type, |x: f64| x).await, + + Boolean => Ok(Box::new(boolean::stream_to_array(pages, metadata).await?)), + + Binary | Utf8 => binary::stream_to_array::(pages, metadata, &data_type).await, + LargeBinary | LargeUtf8 => { + binary::stream_to_array::(pages, metadata, &data_type).await + } + _ => todo!(), + } +} + #[cfg(test)] mod tests { use std::fs::File; diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index de4332b8074..3ea354e656f 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -5,6 +5,7 @@ mod utils; use std::sync::Arc; +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType}; use super::nested_utils::*; @@ -59,6 +60,48 @@ where ))) } +pub async fn stream_to_array( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, + op: F, +) -> Result> +where + ArrowError: From, + T: NativeType, + E: Clone, + A: ArrowNativeType, + F: Copy + Fn(T) -> A, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + basic::extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut values, + &mut validity, + op, + )? + } + + let data_type = match data_type { + DataType::Dictionary(_, values) => values.as_ref().clone(), + _ => data_type, + }; + + Ok(Box::new(PrimitiveArray::from_data( + data_type, + values.into(), + validity.into(), + ))) +} + pub fn iter_to_array_nested( mut iter: I, metadata: &ColumnChunkMetaData,