diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index d97ff1edc9d..4aef907c614 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -114,6 +114,26 @@ def case_nested(size): [[4, 5], [6]], [], [[7], None, [9]], + [[], [None], None], + [[10]], + ] + items_required_nested = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], None, [9]], + None, + [[10]], + ] + items_required_nested_2 = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], [8], [9]], None, [[10]], ] @@ -140,6 +160,10 @@ def case_nested(size): pa.field("list_utf8", pa.list_(pa.utf8())), pa.field("list_large_binary", pa.list_(pa.large_binary())), pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))), + pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))), + pa.field( + "list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64())) + ), ] schema = pa.schema(fields) return ( @@ -152,6 +176,8 @@ def case_nested(size): "list_utf8": string * size, "list_large_binary": string * size, "list_nested_i64": items_nested * size, + "list_nested_inner_required_i64": items_required_nested * size, + "list_nested_inner_required_required_i64": items_required_nested_2 * size, }, schema, f"nested_nullable_{size*10}.parquet", diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 655e77144eb..5a88b772522 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -7,7 +7,7 @@ use parquet2::{ }; use crate::{ - array::{Array, BinaryArray, Offset, Utf8Array}, + array::{Array, Offset}, bitmap::{utils::BitmapIter, MutableBitmap}, buffer::MutableBuffer, datatypes::DataType, @@ -15,6 +15,7 @@ use crate::{ }; use super::super::utils; +use super::utils::finish_array; /// Assumptions: No rep levels #[allow(clippy::too_many_arguments)] @@ -325,21 +326,7 @@ where )? } - Ok(match data_type { - DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - _ => unreachable!(), - }) + Ok(finish_array(data_type.clone(), offsets, values, validity)) } pub async fn stream_to_array( @@ -371,19 +358,5 @@ where )? } - Ok(match data_type { - DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( - data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )), - _ => unreachable!(), - }) + Ok(finish_array(data_type.clone(), offsets, values, validity)) } diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 1bfd2b04235..e37090c94f7 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,6 +1,7 @@ mod basic; mod dictionary; mod nested; +mod utils; pub use basic::iter_to_array; pub use basic::stream_to_array; diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index 0482074c480..8640d4c84b3 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -11,8 +11,9 @@ use parquet2::{ use super::super::nested_utils::*; use super::super::utils; use super::basic::read_plain_required; +use super::utils::finish_array; use crate::{ - array::{Array, BinaryArray, Offset, Utf8Array}, + array::{Array, Offset}, bitmap::MutableBitmap, buffer::MutableBuffer, datatypes::DataType, @@ -150,7 +151,7 @@ pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, data_type: DataType, -) -> Result> +) -> Result<(Arc, Vec>)> where O: Offset, ArrowError: From, @@ -176,32 +177,7 @@ where )? } - let inner_data_type = match data_type { - DataType::List(ref inner) => inner.data_type(), - DataType::LargeList(ref inner) => inner.data_type(), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - - let values = match inner_data_type { - DataType::LargeBinary | DataType::Binary => Arc::new(BinaryArray::from_data( - inner_data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )) as Arc, - DataType::LargeUtf8 | DataType::Utf8 => Arc::new(Utf8Array::from_data( - inner_data_type.clone(), - offsets.into(), - values.into(), - validity.into(), - )) as Arc, - _ => unreachable!(), - }; + let values = finish_array(data_type, offsets, values, validity).into(); - create_list(data_type, &mut nested, values) + Ok((values, nested)) } diff --git a/src/io/parquet/read/binary/utils.rs b/src/io/parquet/read/binary/utils.rs new file mode 100644 index 00000000000..3b72782b4e7 --- /dev/null +++ b/src/io/parquet/read/binary/utils.rs @@ -0,0 +1,29 @@ +use crate::{ + array::{Array, BinaryArray, Offset, Utf8Array}, + bitmap::MutableBitmap, + buffer::MutableBuffer, + datatypes::DataType, +}; + +pub(super) fn finish_array( + data_type: DataType, + offsets: MutableBuffer, + values: MutableBuffer, + validity: MutableBitmap, +) -> Box { + match data_type { + DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( + data_type, + offsets.into(), + values.into(), + validity.into(), + )), + DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( + data_type, + offsets.into(), + values.into(), + validity.into(), + )), + _ => unreachable!(), + } +} diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 5d99bbb2f7d..ac89cac8d4f 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -135,7 +135,7 @@ pub fn iter_to_array( mut iter: I, metadata: &ColumnChunkMetaData, data_type: DataType, -) -> Result> +) -> Result<(Arc, Vec>)> where ArrowError: From, I: FallibleStreamingIterator, @@ -157,22 +157,11 @@ where )? } - let inner_data_type = match data_type { - DataType::List(ref inner) => inner.data_type(), - DataType::LargeList(ref inner) => inner.data_type(), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - let values = Arc::new(BooleanArray::from_data( - inner_data_type.clone(), + data_type, values.into(), validity.into(), )); - create_list(data_type, &mut nested, values) + Ok((values, nested)) } diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 1f6198c1e69..e85f8c63637 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -1,4 +1,6 @@ //! APIs to read from Parquet format. +#![allow(clippy::type_complexity)] + use std::{ convert::TryInto, io::{Read, Seek}, @@ -28,6 +30,7 @@ use crate::{ array::{Array, DictionaryKey, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, + io::parquet::read::nested_utils::create_list, }; mod binary; @@ -44,6 +47,8 @@ pub use record_batch::RecordReader; pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; +use self::nested_utils::Nested; + /// Creates a new iterator of compressed pages. pub fn get_page_iterator<'b, RR: Read + Seek>( column_metadata: &ColumnChunkMetaData, @@ -165,6 +170,62 @@ fn dict_read< } } +fn page_iter_to_array_nested< + I: FallibleStreamingIterator, +>( + iter: &mut I, + metadata: &ColumnChunkMetaData, + data_type: DataType, +) -> Result<(Arc, Vec>)> { + use DataType::*; + match data_type { + UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8), + UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16), + UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32), + Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8), + Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16), + Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32), + + Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int96 => primitive::iter_to_array_nested( + iter, + metadata, + DataType::Timestamp(TimeUnit::Nanosecond, None), + int96_to_i64_ns, + ), + _ => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x), + }, + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x) + } + UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64), + + Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x), + Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x), + + Boolean => boolean::iter_to_array_nested(iter, metadata, data_type), + + Binary | Utf8 => binary::iter_to_array_nested::(iter, metadata, data_type), + LargeBinary | LargeUtf8 => { + binary::iter_to_array_nested::(iter, metadata, data_type) + } + List(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + Ok((create_list(data_type, &mut nested, values)?.into(), nested)) + } + other => Err(ArrowError::NotYetImplemented(format!( + "Reading {:?} from parquet still not implemented", + other + ))), + } +} + /// Converts an iterator of [`DataPage`] into a single [`Array`]. pub fn page_iter_to_array>( iter: &mut I, @@ -255,47 +316,16 @@ pub fn page_iter_to_array unreachable!(), }, - List(ref inner) => match inner.data_type() { - UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8), - UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16), - UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32), - Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8), - Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16), - Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32), - - Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { - ParquetType::PrimitiveType { physical_type, .. } => match physical_type { - PhysicalType::Int96 => primitive::iter_to_array_nested( - iter, - metadata, - DataType::Timestamp(TimeUnit::Nanosecond, None), - int96_to_i64_ns, - ), - _ => primitive::iter_to_array(iter, metadata, data_type, |x: i64| x), - }, - _ => unreachable!(), - }, - - // INT64 - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { - primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x) - } - UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64), - - Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x), - Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x), - - Boolean => boolean::iter_to_array_nested(iter, metadata, data_type), - - Binary | Utf8 => binary::iter_to_array_nested::(iter, metadata, data_type), - LargeBinary | LargeUtf8 => { - binary::iter_to_array_nested::(iter, metadata, data_type) - } - other => Err(ArrowError::NotYetImplemented(format!( - "Reading {:?} from parquet still not implemented", - other - ))), - }, + List(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + create_list(data_type, &mut nested, values) + } + LargeList(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + create_list(data_type, &mut nested, values) + } Dictionary(ref key, _) => match key.as_ref() { Int8 => dict_read::(iter, metadata, data_type), diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index bdf7ff589e3..e3bc873b62e 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -10,6 +10,7 @@ use crate::{ error::{ArrowError, Result}, }; +/// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug { fn inner(&mut self) -> (Buffer, Option); @@ -20,6 +21,8 @@ pub trait Nested: std::fmt::Debug { fn offsets(&mut self) -> &[i64]; fn close(&mut self, length: i64); + + fn is_nullable(&self) -> bool; } #[derive(Debug, Default)] @@ -40,6 +43,10 @@ impl Nested for NestedOptional { *self.offsets.last().unwrap() } + fn is_nullable(&self) -> bool { + true + } + fn push(&mut self, value: i64, is_valid: bool) { self.offsets.push(value); self.validity.push(is_valid); @@ -73,6 +80,10 @@ impl Nested for NestedValid { (offsets.into(), None) } + fn is_nullable(&self) -> bool { + false + } + #[inline] fn last_offset(&self) -> i64 { *self.offsets.last().unwrap() @@ -109,17 +120,64 @@ pub fn extend_offsets( R: Iterator, D: Iterator, { - assert_eq!(max_rep, 1); - let mut values_count = 0; + let mut values_count = vec![0; nested.len()]; + let mut prev_def: u32 = 0; + let mut is_first = true; + rep_levels.zip(def_levels).for_each(|(rep, def)| { - if rep == 0 { - nested[0].push(values_count, def != 0); - } - if def == max_def || (is_nullable && def == max_def - 1) { - values_count += 1; + let mut closures = max_rep - rep; + if prev_def <= 1 { + closures = 1; + }; + if is_first { + // close on first run to ensure offsets start with 0. + closures = max_rep; + is_first = false; } + + nested + .iter_mut() + .zip(values_count.iter()) + .enumerate() + .skip(rep as usize) + .take((rep + closures) as usize) + .for_each(|(depth, (nested, length))| { + let is_null = (def - rep) as usize == depth && depth == rep as usize; + nested.push(*length, !is_null); + }); + + values_count + .iter_mut() + .enumerate() + .for_each(|(depth, values)| { + if depth == 1 { + if def == max_def || (is_nullable && def == max_def - 1) { + *values += 1 + } + } else if depth == 0 { + let a = nested + .get(depth + 1) + .map(|x| x.is_nullable()) + .unwrap_or_default(); // todo: cumsum this + let condition = rep == 1 + || rep == 0 + && def >= max_def.saturating_sub((a as u32) + (is_nullable as u32)); + + if condition { + *values += 1; + } + } + }); + prev_def = def; }); - nested[0].close(values_count); + + // close validities + nested + .iter_mut() + .zip(values_count.iter()) + .for_each(|(nested, length)| { + nested.close(*length); + }); } pub fn is_nullable(type_: &ParquetType, container: &mut Vec) { @@ -163,12 +221,12 @@ pub fn init_nested(base_type: &ParquetType, capacity: usize) -> (Vec], + nested: &mut Vec>, values: Arc, ) -> Result> { Ok(match data_type { DataType::List(_) => { - let (offsets, validity) = nested[0].inner(); + let (offsets, validity) = nested.pop().unwrap().inner(); let offsets = Buffer::::from_trusted_len_iter(offsets.iter().map(|x| *x as i32)); Box::new(ListArray::::from_data( @@ -176,7 +234,7 @@ pub fn create_list( )) } DataType::LargeList(_) => { - let (offsets, validity) = nested[0].inner(); + let (offsets, validity) = nested.pop().unwrap().inner(); Box::new(ListArray::::from_data( data_type, offsets, values, validity, diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 748c1285224..7839988127a 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -100,7 +100,7 @@ pub fn iter_to_array_nested( metadata: &ColumnChunkMetaData, data_type: DataType, op: F, -) -> Result> +) -> Result<(Arc, Vec>)> where ArrowError: From, T: NativeType, @@ -127,24 +127,10 @@ where )? } - let values = match data_type { - DataType::List(ref inner) => Arc::new(PrimitiveArray::::from_data( - inner.data_type().clone(), - values.into(), - validity.into(), - )), - DataType::LargeList(ref inner) => Arc::new(PrimitiveArray::::from_data( - inner.data_type().clone(), - values.into(), - validity.into(), - )), - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }; - - create_list(data_type, &mut nested, values) + let values = Arc::new(PrimitiveArray::::from_data( + data_type, + values.into(), + validity.into(), + )); + Ok((values, nested)) } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index e82135fd812..083c9d3f114 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -139,6 +139,7 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { Some(b"bbb".to_vec()), Some(b"".to_vec()), ])), + 7 | 8 | 9 => Arc::new(NullArray::from_data(DataType::Null, 1)), _ => unreachable!(), }; @@ -169,6 +170,61 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { data_type, offsets, values, None, )) } + 7 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), None]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![Some(vec![Some(7)]), None, Some(vec![Some(9)])]), + Some(vec![Some(vec![]), Some(vec![None]), None]), + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + 8 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), Some(3)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![Some(vec![Some(7)]), None, Some(vec![Some(9)])]), + None, + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + 9 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), Some(3)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![ + Some(vec![Some(7)]), + Some(vec![Some(8)]), + Some(vec![Some(9)]), + ]), + None, + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index bfad685eb49..7bf1b9a3e9a 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -232,6 +232,21 @@ fn v1_nested_large_binary() -> Result<()> { test_pyarrow_integration(6, 1, "nested", false, false) } +#[test] +fn v2_nested_nested() -> Result<()> { + test_pyarrow_integration(7, 2, "nested", false, false) +} + +#[test] +fn v2_nested_nested_required() -> Result<()> { + test_pyarrow_integration(8, 2, "nested", false, false) +} + +#[test] +fn v2_nested_nested_required_required() -> Result<()> { + test_pyarrow_integration(9, 2, "nested", false, false) +} + #[test] fn v1_decimal_9_nullable() -> Result<()> { test_pyarrow_integration(7, 1, "basic", false, false) @@ -291,10 +306,6 @@ fn v2_decimal_26_nullable() -> Result<()> { fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true) } -/*#[test] -fn v2_nested_nested() { - let _ = test_pyarrow_integration(7, 1, "nested",false, false); -}*/ #[test] fn all_types() -> Result<()> {