From f20d806ecddb7d6827f965c159f4af906b6f5cd2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 24 Oct 2021 06:38:24 +0000 Subject: [PATCH] Fixed error in indexing. --- src/io/parquet/read/mod.rs | 31 ++++++++++++++++++++++++----- src/io/parquet/read/record_batch.rs | 22 +++++++++++++++----- tests/it/io/parquet/read.rs | 3 ++- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d650b307103..4c9c8b59dac 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -166,7 +166,26 @@ fn dict_read< } } -/// Returns an Array built from an iterator of column chunks +fn column_datatype(data_type: &DataType, column: usize) -> DataType { + use crate::datatypes::PhysicalType::*; + match data_type.to_physical_type() { + Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 + | LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => data_type.clone(), + Struct => { + // todo: this won't work for nested structs because we need to flatten the column ids + if let DataType::Struct(v) = data_type { + v[column].data_type().clone() + } else { + unreachable!() + } + } + Union => todo!(), + Map => todo!(), + } +} + +/// Returns an [`Array`] built from an iterator of column chunks. It also returns +/// the two buffers used to decompress and deserialize pages (to be re-used). #[allow(clippy::type_complexity)] pub fn column_iter_to_array( mut columns: I, @@ -179,16 +198,19 @@ where { let mut arrays = vec![]; let page_buffer; + let mut column = 0; loop { match columns.advance()? { State::Some(mut new_iter) => { + let data_type = column_datatype(&data_type, column); if let Some((pages, metadata)) = new_iter.get() { - let data_type = schema::to_data_type(metadata.descriptor().type_())?.unwrap(); + println!("{:?}", data_type); let mut iterator = BasicDecompressor::new(pages, buffer); let array = page_iter_to_array(&mut iterator, metadata, data_type)?; buffer = iterator.into_inner(); arrays.push(array) } + column += 1; columns = new_iter; } State::Finished(b) => { @@ -200,9 +222,8 @@ where use crate::datatypes::PhysicalType::*; Ok(match data_type.to_physical_type() { - Null => todo!(), - Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 | LargeUtf8 - | List | LargeList | FixedSizeList | Dictionary(_) => { + Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 + | LargeUtf8 | List | LargeList | FixedSizeList | Dictionary(_) => { (arrays.pop().unwrap(), page_buffer, buffer) } Struct => todo!(), diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index fd73fde839f..d13da3f898f 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -5,7 +5,7 @@ use std::{ use crate::{ datatypes::{Field, Schema}, - error::Result, + error::{ArrowError, Result}, record_batch::RecordBatch, }; @@ -20,6 +20,7 @@ type GroupFilter = Arc bool>; pub struct RecordReader { reader: R, schema: Arc, + indices: Vec, buffer: Vec, decompress_buffer: Vec, groups_filter: Option, @@ -44,23 +45,32 @@ impl RecordReader { let schema = get_schema(&metadata)?; let schema_metadata = schema.metadata; - let fields: Vec = if let Some(projection) = &projection { + let (indices, fields): (Vec, Vec) = if let Some(projection) = &projection { schema .fields .into_iter() .enumerate() .filter_map(|(index, f)| { if projection.iter().any(|&i| i == index) { - Some(f) + Some((index, f)) } else { None } }) - .collect() + .unzip() } else { - schema.fields.into_iter().collect() + schema.fields.into_iter().enumerate().unzip() }; + if let Some(projection) = &projection { + if indices.len() != projection.len() { + return Err(ArrowError::InvalidArgumentError( + "While reading parquet, some columns in the projection do not exist in the file" + .to_string(), + )); + } + } + let schema = Arc::new(Schema { fields, metadata: schema_metadata, @@ -69,6 +79,7 @@ impl RecordReader { Ok(Self { reader, schema, + indices, groups_filter, pages_filter, metadata, @@ -129,6 +140,7 @@ impl Iterator for RecordReader { let a = schema.fields().iter().enumerate().try_fold( (b1, b2, Vec::with_capacity(schema.fields().len())), |(b1, b2, mut columns), (field_index, field)| { + let field_index = self.indices[field_index]; // project into the original schema let column_iter = get_column_iterator( &mut self.reader, &self.metadata, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index a76623bea0c..9d7945ca3ec 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -297,7 +297,8 @@ fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true) } -fn v1_struct() -> Result<()> { +#[test] +fn v1_struct_optional() -> Result<()> { test_pyarrow_integration(0, 1, "struct", false, false) }