From 9f440504796e6368f320ddfee0596e4e9e2ce492 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 5 Mar 2022 08:07:33 +0000 Subject: [PATCH] Fixed edge case in dremel --- parquet_integration/write_parquet.py | 20 ++++++++- .../parquet/read/deserialize/nested_utils.rs | 13 +++--- tests/it/io/parquet/mod.rs | 42 +++++++++++++++++++ tests/it/io/parquet/read.rs | 12 ++++++ 4 files changed, 80 insertions(+), 7 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index a4a0cb8ca0e..92c94b30186 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -238,6 +238,24 @@ def case_struct() -> Tuple[dict, pa.Schema, str]: ) +def case_nested_edge(): + simple = [[0, 1]] + null = [None] + fields = [ + pa.field("simple", pa.list_(pa.int64())), + pa.field("null", pa.list_(pa.field("item", pa.int64(), True))), + ] + schema = pa.schema(fields) + return ( + { + "simple": simple, + "null": null, + }, + schema, + f"nested_edge_nullable_10.parquet", + ) + + def write_pyarrow( case, page_version: int, @@ -276,7 +294,7 @@ def write_pyarrow( ) -for case in [case_basic_nullable, case_basic_required, case_nested, case_struct]: +for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge]: for version in [1, 2]: for use_dict in [True, False]: for compression in ["lz4", None, "snappy"]: diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 8c9b0eeca4f..97b6b3fefd6 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -459,14 +459,12 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi cum_sum[i + 1] = cum_sum[i] + delta; } - let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); + let iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); let mut rows = 0; - while rows < additional { - // unwrap is ok because by definition there has to be a closing statement - let (rep, def) = iter.next().unwrap(); + for (rep, def) in iter { if rep == 0 { - rows += 1 + rows += 1; } for (depth, (nest, length)) in nested.iter_mut().zip(values_count.iter()).enumerate() { @@ -479,7 +477,10 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi for (depth, nest) in nested.iter().enumerate().skip(1) { values_count[depth - 1] = nest.len() as i64 } - values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64 + values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64; + if rows == additional + 1 { + break; + } } // close validities diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 2bf5e3372f7..5e1b59c7488 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -42,6 +42,28 @@ pub fn read_column( )) } +pub fn pyarrow_nested_edge(column: &str) -> Box { + match column { + "simple" => { + // [[0, 1]] + let data = [Some(vec![Some(0), Some(1)])]; + let mut a = MutableListArray::>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + "null" => { + // [None] + let data = [None::>>]; + let mut a = MutableListArray::>::new(); + a.try_extend(data).unwrap(); + let array: ListArray = a.into(); + Box::new(array) + } + _ => todo!(), + } +} + pub fn pyarrow_nested_nullable(column: &str) -> Box { let offsets = Buffer::from_slice([0, 2, 2, 5, 8, 8, 11, 11, 12]); @@ -569,6 +591,26 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Option Option> { + Some(match column { + "simple" => Box::new(PrimitiveStatistics:: { + data_type: DataType::Int64, + distinct_count: None, + null_count: Some(0), + min_value: Some(0), + max_value: Some(1), + }), + "null" => Box::new(PrimitiveStatistics:: { + data_type: DataType::Int64, + distinct_count: None, + null_count: Some(0), + min_value: None, + max_value: None, + }), + _ => unreachable!(), + }) +} + pub fn pyarrow_struct(column: &str) -> Box { let boolean = [ Some(true), diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index fd0358d6cef..16c52ead2e5 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -36,6 +36,7 @@ fn test_pyarrow_integration( ("basic", true) => pyarrow_required(column), ("basic", false) => pyarrow_nullable(column), ("nested", false) => pyarrow_nested_nullable(column), + ("nested_edge", false) => pyarrow_nested_edge(column), ("struct", false) => pyarrow_struct(column), _ => unreachable!(), }; @@ -44,6 +45,7 @@ fn test_pyarrow_integration( ("basic", true) => pyarrow_required_statistics(column), ("basic", false) => pyarrow_nullable_statistics(column), ("nested", false) => pyarrow_nested_nullable_statistics(column), + ("nested_edge", false) => pyarrow_nested_edge_statistics(column), ("struct", false) => pyarrow_struct_statistics(column), _ => unreachable!(), }; @@ -400,6 +402,16 @@ fn v1_struct_struct_optional() -> Result<()> { test_pyarrow_integration("struct_struct", 1, "struct", false, false, None) } +#[test] +fn v1_nested_edge_1() -> Result<()> { + test_pyarrow_integration("simple", 1, "nested_edge", false, false, None) +} + +#[test] +fn v1_nested_edge_2() -> Result<()> { + test_pyarrow_integration("null", 1, "nested_edge", false, false, None) +} + #[test] fn all_types() -> Result<()> { let path = "testing/parquet-testing/data/alltypes_plain.parquet";