diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index f4a5b9ed2bb..603c3bcdddb 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -33,6 +33,8 @@ pub trait Nested: std::fmt::Debug + Send + Sync { /// number of rows fn len(&self) -> usize; + fn len1(&self) -> usize; + /// number of values associated to the primitive type this nested tracks fn num_values(&self) -> usize; } @@ -80,6 +82,10 @@ impl Nested for NestedPrimitive { self.length } + fn len1(&self) -> usize { + self.length + } + fn num_values(&self) -> usize { self.length } @@ -124,6 +130,10 @@ impl Nested for NestedOptional { self.offsets.len().saturating_sub(1) } + fn len1(&self) -> usize { + self.offsets.len() + } + fn num_values(&self) -> usize { self.offsets.last().copied().unwrap_or(0) as usize } @@ -173,6 +183,10 @@ impl Nested for NestedValid { self.offsets.len().saturating_sub(1) } + fn len1(&self) -> usize { + self.offsets.len() + } + fn num_values(&self) -> usize { self.offsets.last().copied().unwrap_or(0) as usize } @@ -254,7 +268,6 @@ fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec fn init_nested(init: &InitNested, capacity: usize) -> NestedState { let mut container = vec![]; init_nested_recursive(init, capacity, &mut container); - println!("{:?}", container); NestedState::new(container) } @@ -419,21 +432,13 @@ pub fn extend_offsets1<'a>( } fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { - let mut values_count = vec![0; nested.depth()]; + let max_depth = nested.depth() - 1; + let mut values_count = vec![0; max_depth + 1]; - let mut def_threshold = page.max_def_level; - let thres = nested - .nested - .iter() - .rev() - .map(|nested| { - let is_nullable = nested.is_nullable(); - def_threshold -= is_nullable as u32; - def_threshold - }) - .collect::>(); + let is_optional = nested.nested.last().unwrap().is_nullable(); + let max_def = page.max_def_level; - let rate = if page.max_def_level == 1 { 1 } else { 2 }; + let rate = if max_def == 1 { 1 } else { 2 }; let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); @@ -453,24 +458,28 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi .enumerate() .zip(values_count.iter()) .skip(rep as usize) + .take(max_depth as usize - rep as usize) .take(closures as usize) .for_each(|((depth, nested), length)| { let is_null = def - rep == depth as u32; nested.push(*length, !is_null); }); + // add to the primitive + if (is_optional && def >= max_def - 1) || (!is_optional && def == max_def) { + let is_valid = def == max_def; + let length = values_count.last_mut().unwrap(); + nested.nested.last_mut().unwrap().push(*length, is_valid); + *length += 1; + } + values_count .iter_mut() - .zip(thres.iter()) - .enumerate() - .for_each(|(depth, (values, thre))| { - if depth == 1 { - if def >= *thre { - *values += 1 - } - } else if depth == 0 && def >= *thre { - *values += 1; - } + .rev() + .skip(1) + .zip(nested.nested.iter().rev()) + .for_each(|(length, nested)| { + *length = nested.len1() as i64; }); } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 1dd211a92dc..470fe1f782b 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -254,7 +254,6 @@ fn v1_nested_large_binary() -> Result<()> { } #[test] -#[ignore] // todo fn v2_nested_nested() -> Result<()> { test_pyarrow_integration(7, 2, "nested", false, false, None) }