diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index d97ff1edc9d..4aef907c614 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -114,6 +114,26 @@ def case_nested(size): [[4, 5], [6]], [], [[7], None, [9]], + [[], [None], None], + [[10]], + ] + items_required_nested = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], None, [9]], + None, + [[10]], + ] + items_required_nested_2 = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], [8], [9]], None, [[10]], ] @@ -140,6 +160,10 @@ def case_nested(size): pa.field("list_utf8", pa.list_(pa.utf8())), pa.field("list_large_binary", pa.list_(pa.large_binary())), pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))), + pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))), + pa.field( + "list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64())) + ), ] schema = pa.schema(fields) return ( @@ -152,6 +176,8 @@ def case_nested(size): "list_utf8": string * size, "list_large_binary": string * size, "list_nested_i64": items_nested * size, + "list_nested_inner_required_i64": items_required_nested * size, + "list_nested_inner_required_required_i64": items_required_nested_2 * size, }, schema, f"nested_nullable_{size*10}.parquet", diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 92f4d899f0e..e85f8c63637 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -214,6 +214,11 @@ fn page_iter_to_array_nested< LargeBinary | LargeUtf8 => { binary::iter_to_array_nested::(iter, metadata, data_type) } + List(ref inner) => { + let (values, mut nested) = + page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?; + Ok((create_list(data_type, &mut nested, values)?.into(), nested)) + } other => Err(ArrowError::NotYetImplemented(format!( "Reading {:?} from parquet still not implemented", other diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index 04037f05ba7..e3bc873b62e 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -21,6 +21,8 @@ pub trait Nested: std::fmt::Debug { fn offsets(&mut self) -> &[i64]; fn close(&mut self, length: i64); + + fn is_nullable(&self) -> bool; } #[derive(Debug, Default)] @@ -41,6 +43,10 @@ impl Nested for NestedOptional { *self.offsets.last().unwrap() } + fn is_nullable(&self) -> bool { + true + } + fn push(&mut self, value: i64, is_valid: bool) { self.offsets.push(value); self.validity.push(is_valid); @@ -74,6 +80,10 @@ impl Nested for NestedValid { (offsets.into(), None) } + fn is_nullable(&self) -> bool { + false + } + #[inline] fn last_offset(&self) -> i64 { *self.offsets.last().unwrap() @@ -110,17 +120,64 @@ pub fn extend_offsets( R: Iterator, D: Iterator, { - assert_eq!(max_rep, 1); - let mut values_count = 0; + let mut values_count = vec![0; nested.len()]; + let mut prev_def: u32 = 0; + let mut is_first = true; + rep_levels.zip(def_levels).for_each(|(rep, def)| { - if rep == 0 { - nested[0].push(values_count, def != 0); - } - if def == max_def || (is_nullable && def == max_def - 1) { - values_count += 1; + let mut closures = max_rep - rep; + if prev_def <= 1 { + closures = 1; + }; + if is_first { + // close on first run to ensure offsets start with 0. + closures = max_rep; + is_first = false; } + + nested + .iter_mut() + .zip(values_count.iter()) + .enumerate() + .skip(rep as usize) + .take((rep + closures) as usize) + .for_each(|(depth, (nested, length))| { + let is_null = (def - rep) as usize == depth && depth == rep as usize; + nested.push(*length, !is_null); + }); + + values_count + .iter_mut() + .enumerate() + .for_each(|(depth, values)| { + if depth == 1 { + if def == max_def || (is_nullable && def == max_def - 1) { + *values += 1 + } + } else if depth == 0 { + let a = nested + .get(depth + 1) + .map(|x| x.is_nullable()) + .unwrap_or_default(); // todo: cumsum this + let condition = rep == 1 + || rep == 0 + && def >= max_def.saturating_sub((a as u32) + (is_nullable as u32)); + + if condition { + *values += 1; + } + } + }); + prev_def = def; }); - nested[0].close(values_count); + + // close validities + nested + .iter_mut() + .zip(values_count.iter()) + .for_each(|(nested, length)| { + nested.close(*length); + }); } pub fn is_nullable(type_: &ParquetType, container: &mut Vec) { @@ -164,12 +221,12 @@ pub fn init_nested(base_type: &ParquetType, capacity: usize) -> (Vec], + nested: &mut Vec>, values: Arc, ) -> Result> { Ok(match data_type { DataType::List(_) => { - let (offsets, validity) = nested[0].inner(); + let (offsets, validity) = nested.pop().unwrap().inner(); let offsets = Buffer::::from_trusted_len_iter(offsets.iter().map(|x| *x as i32)); Box::new(ListArray::::from_data( @@ -177,7 +234,7 @@ pub fn create_list( )) } DataType::LargeList(_) => { - let (offsets, validity) = nested[0].inner(); + let (offsets, validity) = nested.pop().unwrap().inner(); Box::new(ListArray::::from_data( data_type, offsets, values, validity, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index e82135fd812..083c9d3f114 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -139,6 +139,7 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { Some(b"bbb".to_vec()), Some(b"".to_vec()), ])), + 7 | 8 | 9 => Arc::new(NullArray::from_data(DataType::Null, 1)), _ => unreachable!(), }; @@ -169,6 +170,61 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { data_type, offsets, values, None, )) } + 7 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), None]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![Some(vec![Some(7)]), None, Some(vec![Some(9)])]), + Some(vec![Some(vec![]), Some(vec![None]), None]), + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + 8 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), Some(3)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![Some(vec![Some(7)]), None, Some(vec![Some(9)])]), + None, + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + 9 => { + let data = [ + Some(vec![Some(vec![Some(0), Some(1)])]), + None, + Some(vec![Some(vec![Some(2), Some(3)]), Some(vec![Some(3)])]), + Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]), + Some(vec![]), + Some(vec![ + Some(vec![Some(7)]), + Some(vec![Some(8)]), + Some(vec![Some(9)]), + ]), + None, + Some(vec![Some(vec![Some(10)])]), + ]; + let mut a = + MutableListArray::>>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index bfad685eb49..7bf1b9a3e9a 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -232,6 +232,21 @@ fn v1_nested_large_binary() -> Result<()> { test_pyarrow_integration(6, 1, "nested", false, false) } +#[test] +fn v2_nested_nested() -> Result<()> { + test_pyarrow_integration(7, 2, "nested", false, false) +} + +#[test] +fn v2_nested_nested_required() -> Result<()> { + test_pyarrow_integration(8, 2, "nested", false, false) +} + +#[test] +fn v2_nested_nested_required_required() -> Result<()> { + test_pyarrow_integration(9, 2, "nested", false, false) +} + #[test] fn v1_decimal_9_nullable() -> Result<()> { test_pyarrow_integration(7, 1, "basic", false, false) @@ -291,10 +306,6 @@ fn v2_decimal_26_nullable() -> Result<()> { fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true) } -/*#[test] -fn v2_nested_nested() { - let _ = test_pyarrow_integration(7, 1, "nested",false, false); -}*/ #[test] fn all_types() -> Result<()> {