From 7d7d8e31b4a0c9e0dbba9f0b35b52696aea526c7 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 22 Oct 2021 22:38:36 +0000 Subject: [PATCH] Prepared for struct array. --- Cargo.toml | 4 +- src/io/parquet/read/mod.rs | 60 +++++++++++++++++++++++++---- src/io/parquet/read/record_batch.rs | 52 +++++++++---------------- 3 files changed, 75 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 73642d0f39c..bacf6ba3a64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,9 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "struct", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 1f6198c1e69..3bde88a9679 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -6,15 +6,17 @@ use std::{ }; use futures::{AsyncRead, AsyncSeek, Stream}; +use parquet2::read::MutStreamingIterator; pub use parquet2::{ error::ParquetError, fallible_streaming_iterator, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ - decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, - BasicDecompressor, Decompressor, PageFilter, PageIterator, + decompress, get_column_iterator, get_page_iterator as _get_page_iterator, + get_page_stream as _get_page_stream, read_metadata as _read_metadata, + read_metadata_async as _read_metadata_async, BasicDecompressor, Decompressor, PageFilter, + PageIterator, State, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, @@ -45,12 +47,12 @@ pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; /// Creates a new iterator of compressed pages. -pub fn get_page_iterator<'b, RR: Read + Seek>( +pub fn get_page_iterator( column_metadata: &ColumnChunkMetaData, - reader: &'b mut RR, + reader: R, pages_filter: Option, buffer: Vec, -) -> Result> { +) -> Result> { Ok(_get_page_iterator( column_metadata, reader, @@ -165,8 +167,52 @@ fn dict_read< } } +/// Returns an Array built from an iterator of column chunks +pub fn column_iter_to_array( + mut columns: I, + data_type: DataType, + mut buffer: Vec, +) -> Result<(Box, Vec, Vec)> +where + II: Iterator>, + I: MutStreamingIterator, +{ + let mut arrays = vec![]; + let page_buffer; + loop { + match columns.advance()? { + State::Some(mut new_iter) => { + if let Some((pages, metadata)) = new_iter.get() { + let data_type = schema::to_data_type(metadata.descriptor().type_())?.unwrap(); + let mut iterator = BasicDecompressor::new(pages, buffer); + let array = page_iter_to_array(&mut iterator, metadata, data_type)?; + buffer = iterator.into_inner(); + arrays.push(array) + } + columns = new_iter; + } + State::Finished(b) => { + page_buffer = b; + break; + } + } + } + + use crate::datatypes::PhysicalType::*; + Ok(match data_type.to_physical_type() { + Null => todo!(), + Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 | LargeUtf8 + | List | LargeList | FixedSizeList | Dictionary(_) => { + (arrays.pop().unwrap(), page_buffer, buffer) + } + Struct => todo!(), + Union => todo!(), + Map => todo!(), + }) +} + /// Converts an iterator of [`DataPage`] into a single [`Array`]. -pub fn page_iter_to_array>( +fn page_iter_to_array>( iter: &mut I, metadata: &ColumnChunkMetaData, data_type: DataType, diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index a49195fd8bd..fd73fde839f 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -1,18 +1,17 @@ use std::{ io::{Read, Seek}, - rc::Rc, sync::Arc, }; use crate::{ datatypes::{Field, Schema}, - error::{ArrowError, Result}, + error::Result, record_batch::RecordBatch, }; use super::{ - get_page_iterator, get_schema, page_iter_to_array, read_metadata, Decompressor, FileMetaData, - PageFilter, RowGroupMetaData, + column_iter_to_array, get_column_iterator, get_schema, read_metadata, FileMetaData, PageFilter, + RowGroupMetaData, }; type GroupFilter = Arc bool>; @@ -21,12 +20,11 @@ type GroupFilter = Arc bool>; pub struct RecordReader { reader: R, schema: Arc, - indices: Rc>, buffer: Vec, decompress_buffer: Vec, groups_filter: Option, pages_filter: Option, - metadata: Rc, + metadata: FileMetaData, current_group: usize, remaining_rows: usize, } @@ -46,29 +44,23 @@ impl RecordReader { let schema = get_schema(&metadata)?; let schema_metadata = schema.metadata; - let (indices, fields): (Vec, Vec) = if let Some(projection) = &projection { + let fields: Vec = if let Some(projection) = &projection { schema .fields .into_iter() .enumerate() .filter_map(|(index, f)| { if projection.iter().any(|&i| i == index) { - Some((index, f)) + Some(f) } else { None } }) - .unzip() + .collect() } else { - schema.fields.into_iter().enumerate().unzip() + schema.fields.into_iter().collect() }; - if let Some(projection) = &projection { - if indices.len() != projection.len() { - return Err(ArrowError::InvalidArgumentError("While reading parquet, some columns in the projection do not exist in the file".to_string())); - } - } - let schema = Arc::new(Schema { fields, metadata: schema_metadata, @@ -77,10 +69,9 @@ impl RecordReader { Ok(Self { reader, schema, - indices: Rc::new(indices), groups_filter, pages_filter, - metadata: Rc::new(metadata), + metadata, current_group: 0, buffer: vec![], decompress_buffer: vec![], @@ -95,7 +86,7 @@ impl RecordReader { /// Returns parquet's [`FileMetaData`]. pub fn metadata(&self) -> &FileMetaData { - self.metadata.as_ref() + &self.metadata } /// Sets the groups filter @@ -120,7 +111,7 @@ impl Iterator for RecordReader { } let row_group = self.current_group; - let metadata = self.metadata.clone(); + let metadata = &self.metadata; let group = &metadata.row_groups[row_group]; if let Some(groups_filter) = self.groups_filter.as_ref() { if !(groups_filter)(row_group, group) { @@ -128,7 +119,6 @@ impl Iterator for RecordReader { return self.next(); } } - let columns_meta = group.columns(); // todo: avoid these clones. let schema = self.schema().clone(); @@ -138,21 +128,18 @@ impl Iterator for RecordReader { let a = schema.fields().iter().enumerate().try_fold( (b1, b2, Vec::with_capacity(schema.fields().len())), - |(b1, b2, mut columns), (column, field)| { - // column according to the file's indexing - let column = self.indices[column]; - let column_metadata = &columns_meta[column]; - let pages = get_page_iterator( - column_metadata, + |(b1, b2, mut columns), (field_index, field)| { + let column_iter = get_column_iterator( &mut self.reader, + &self.metadata, + row_group, + field_index, self.pages_filter.clone(), b1, - )?; - - let mut pages = Decompressor::new(pages, b2); + ); - let array = - page_iter_to_array(&mut pages, column_metadata, field.data_type().clone())?; + let (array, b1, b2) = + column_iter_to_array(column_iter, field.data_type().clone(), b2)?; let array = if array.len() > remaining_rows { array.slice(0, remaining_rows) @@ -161,7 +148,6 @@ impl Iterator for RecordReader { }; columns.push(array.into()); - let (b1, b2) = pages.into_buffers(); Result::Ok((b1, b2, columns)) }, );