Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for nested lists.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 23, 2021
1 parent 02cda0e commit 568a9d0
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 16 deletions.
2 changes: 1 addition & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def case_nested(size):
[[4, 5], [6]],
[],
[[7], None, [9]],
None,
[[], [None], None],
[[10]],
]
string = [
Expand Down
5 changes: 5 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ fn page_iter_to_array_nested<
LargeBinary | LargeUtf8 => {
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
List(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
Ok((create_list(data_type, &mut nested, values)?.into(), nested))
}
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
Expand Down
80 changes: 69 additions & 11 deletions src/io/parquet/read/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub trait Nested: std::fmt::Debug {
fn offsets(&mut self) -> &[i64];

fn close(&mut self, length: i64);

fn is_nullable(&self) -> bool;
}

#[derive(Debug, Default)]
Expand All @@ -41,6 +43,10 @@ impl Nested for NestedOptional {
*self.offsets.last().unwrap()
}

fn is_nullable(&self) -> bool {
true
}

fn push(&mut self, value: i64, is_valid: bool) {
self.offsets.push(value);
self.validity.push(is_valid);
Expand Down Expand Up @@ -74,6 +80,10 @@ impl Nested for NestedValid {
(offsets.into(), None)
}

fn is_nullable(&self) -> bool {
false
}

#[inline]
fn last_offset(&self) -> i64 {
*self.offsets.last().unwrap()
Expand Down Expand Up @@ -110,17 +120,65 @@ pub fn extend_offsets<R, D>(
R: Iterator<Item = u32>,
D: Iterator<Item = u32>,
{
assert_eq!(max_rep, 1);
let mut values_count = 0;
let mut values_count = vec![0; nested.len()];
let mut prev_def: u32 = 0;
let mut is_first = true;

rep_levels.zip(def_levels).for_each(|(rep, def)| {
if rep == 0 {
nested[0].push(values_count, def != 0);
}
if def == max_def || (is_nullable && def == max_def - 1) {
values_count += 1;
println!("({},{}),", rep, def);
let mut closures = max_rep - rep;
if prev_def <= 1 {
closures = 1;
};
if is_first {
// close on first run to ensure offsets start with 0.
closures = max_rep;
is_first = false;
}

nested
.iter_mut()
.zip(values_count.iter())
.enumerate()
.skip(rep as usize)
.take((rep + closures) as usize)
.for_each(|(depth, (nested, length))| {
let is_null = (def - rep) as usize == depth && depth == rep as usize;
nested.push(*length, !is_null);
});

values_count
.iter_mut()
.enumerate()
.for_each(|(depth, values)| {
if depth == 1 {
if def == max_def || (is_nullable && def == max_def - 1) {
*values += 1
}
} else if depth == 0 {
let a = nested
.get(depth + 1)
.map(|x| x.is_nullable())
.unwrap_or_default(); // todo: cumsum this
let condition = rep == 1
|| rep == 0
&& def >= max_def.saturating_sub((a as u32) + (is_nullable as u32));

if condition {
*values += 1;
}
}
});
prev_def = def;
});
nested[0].close(values_count);

// close validities
nested
.iter_mut()
.zip(values_count.iter())
.for_each(|(nested, length)| {
nested.close(*length);
});
}

pub fn is_nullable(type_: &ParquetType, container: &mut Vec<bool>) {
Expand Down Expand Up @@ -164,20 +222,20 @@ pub fn init_nested(base_type: &ParquetType, capacity: usize) -> (Vec<Box<dyn Nes

pub fn create_list(
data_type: DataType,
nested: &mut [Box<dyn Nested>],
nested: &mut Vec<Box<dyn Nested>>,
values: Arc<dyn Array>,
) -> Result<Box<dyn Array>> {
Ok(match data_type {
DataType::List(_) => {
let (offsets, validity) = nested[0].inner();
let (offsets, validity) = nested.pop().unwrap().inner();

let offsets = Buffer::<i32>::from_trusted_len_iter(offsets.iter().map(|x| *x as i32));
Box::new(ListArray::<i32>::from_data(
data_type, offsets, values, validity,
))
}
DataType::LargeList(_) => {
let (offsets, validity) = nested[0].inner();
let (offsets, validity) = nested.pop().unwrap().inner();

Box::new(ListArray::<i64>::from_data(
data_type, offsets, values, validity,
Expand Down
18 changes: 18 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box<dyn Array> {
Some(b"bbb".to_vec()),
Some(b"".to_vec()),
])),
7 => Arc::new(NullArray::from_data(DataType::Null, 1)),
_ => unreachable!(),
};

Expand Down Expand Up @@ -169,6 +170,23 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box<dyn Array> {
data_type, offsets, values, None,
))
}
7 => {
let data = [
Some(vec![Some(vec![Some(0), Some(1)])]),
None,
Some(vec![Some(vec![Some(2), None]), Some(vec![Some(3)])]),
Some(vec![Some(vec![Some(4), Some(5)]), Some(vec![Some(6)])]),
Some(vec![]),
Some(vec![Some(vec![Some(7)]), None, Some(vec![Some(9)])]),
Some(vec![Some(vec![]), Some(vec![None]), None]),
Some(vec![Some(vec![Some(10)])]),
];
let mut a =
MutableListArray::<i32, MutableListArray<i32, MutablePrimitiveArray<i64>>>::new();
a.try_extend(data).unwrap();
let array: ListArray<i32> = a.into();
Box::new(array)
}
_ => unreachable!(),
}
}
Expand Down
9 changes: 5 additions & 4 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ fn v1_nested_large_binary() -> Result<()> {
test_pyarrow_integration(6, 1, "nested", false, false)
}

#[test]
fn v2_nested_nested() -> Result<()> {
test_pyarrow_integration(7, 1, "nested", false, false)
}

#[test]
fn v1_decimal_9_nullable() -> Result<()> {
test_pyarrow_integration(7, 1, "basic", false, false)
Expand Down Expand Up @@ -291,10 +296,6 @@ fn v2_decimal_26_nullable() -> Result<()> {
fn v2_decimal_26_required() -> Result<()> {
test_pyarrow_integration(8, 2, "basic", false, true)
}
/*#[test]
fn v2_nested_nested() {
let _ = test_pyarrow_integration(7, 1, "nested",false, false);
}*/

#[test]
fn all_types() -> Result<()> {
Expand Down

0 comments on commit 568a9d0

Please sign in to comment.