Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added read of 2-level nested lists from parquet #548

Merged
merged 2 commits into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ def case_nested(size):
[[4, 5], [6]],
[],
[[7], None, [9]],
[[], [None], None],
[[10]],
]
items_required_nested = [
[[0, 1]],
None,
[[2, 3], [3]],
[[4, 5], [6]],
[],
[[7], None, [9]],
None,
[[10]],
]
items_required_nested_2 = [
[[0, 1]],
None,
[[2, 3], [3]],
[[4, 5], [6]],
[],
[[7], [8], [9]],
None,
[[10]],
]
Expand All @@ -140,6 +160,10 @@ def case_nested(size):
pa.field("list_utf8", pa.list_(pa.utf8())),
pa.field("list_large_binary", pa.list_(pa.large_binary())),
pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))),
pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))),
pa.field(
"list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64()))
),
]
schema = pa.schema(fields)
return (
Expand All @@ -152,6 +176,8 @@ def case_nested(size):
"list_utf8": string * size,
"list_large_binary": string * size,
"list_nested_i64": items_nested * size,
"list_nested_inner_required_i64": items_required_nested * size,
"list_nested_inner_required_required_i64": items_required_nested_2 * size,
},
schema,
f"nested_nullable_{size*10}.parquet",
Expand Down
35 changes: 4 additions & 31 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use parquet2::{
};

use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
array::{Array, Offset},
bitmap::{utils::BitmapIter, MutableBitmap},
buffer::MutableBuffer,
datatypes::DataType,
error::{ArrowError, Result},
};

use super::super::utils;
use super::utils::finish_array;

/// Assumptions: No rep levels
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -325,21 +326,7 @@ where
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
Ok(finish_array(data_type.clone(), offsets, values, validity))
}

pub async fn stream_to_array<O, I, E>(
Expand Down Expand Up @@ -371,19 +358,5 @@ where
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
Ok(finish_array(data_type.clone(), offsets, values, validity))
}
1 change: 1 addition & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod basic;
mod dictionary;
mod nested;
mod utils;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
Expand Down
34 changes: 5 additions & 29 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use parquet2::{
use super::super::nested_utils::*;
use super::super::utils;
use super::basic::read_plain_required;
use super::utils::finish_array;
use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
array::{Array, Offset},
bitmap::MutableBitmap,
buffer::MutableBuffer,
datatypes::DataType,
Expand Down Expand Up @@ -150,7 +151,7 @@ pub fn iter_to_array<O, I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>>
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)>
where
O: Offset,
ArrowError: From<E>,
Expand All @@ -176,32 +177,7 @@ where
)?
}

let inner_data_type = match data_type {
DataType::List(ref inner) => inner.data_type(),
DataType::LargeList(ref inner) => inner.data_type(),
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Read nested datatype {:?}",
data_type
)))
}
};

let values = match inner_data_type {
DataType::LargeBinary | DataType::Binary => Arc::new(BinaryArray::from_data(
inner_data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)) as Arc<dyn Array>,
DataType::LargeUtf8 | DataType::Utf8 => Arc::new(Utf8Array::from_data(
inner_data_type.clone(),
offsets.into(),
values.into(),
validity.into(),
)) as Arc<dyn Array>,
_ => unreachable!(),
};
let values = finish_array(data_type, offsets, values, validity).into();

create_list(data_type, &mut nested, values)
Ok((values, nested))
}
29 changes: 29 additions & 0 deletions src/io/parquet/read/binary/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::{
array::{Array, BinaryArray, Offset, Utf8Array},
bitmap::MutableBitmap,
buffer::MutableBuffer,
datatypes::DataType,
};

pub(super) fn finish_array<O: Offset>(
data_type: DataType,
offsets: MutableBuffer<O>,
values: MutableBuffer<u8>,
validity: MutableBitmap,
) -> Box<dyn Array> {
match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
data_type,
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
data_type,
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
}
}
17 changes: 3 additions & 14 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn iter_to_array<I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>>
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)>
where
ArrowError: From<E>,
I: FallibleStreamingIterator<Item = DataPage, Error = E>,
Expand All @@ -157,22 +157,11 @@ where
)?
}

let inner_data_type = match data_type {
DataType::List(ref inner) => inner.data_type(),
DataType::LargeList(ref inner) => inner.data_type(),
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Read nested datatype {:?}",
data_type
)))
}
};

let values = Arc::new(BooleanArray::from_data(
inner_data_type.clone(),
data_type,
values.into(),
validity.into(),
));

create_list(data_type, &mut nested, values)
Ok((values, nested))
}
112 changes: 71 additions & 41 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! APIs to read from Parquet format.
#![allow(clippy::type_complexity)]

use std::{
convert::TryInto,
io::{Read, Seek},
Expand Down Expand Up @@ -28,6 +30,7 @@ use crate::{
array::{Array, DictionaryKey, PrimitiveArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::nested_utils::create_list,
};

mod binary;
Expand All @@ -44,6 +47,8 @@ pub use record_batch::RecordReader;
pub(crate) use schema::is_type_nullable;
pub use schema::{get_schema, FileMetaData};

use self::nested_utils::Nested;

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<'b, RR: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -165,6 +170,62 @@ fn dict_read<
}
}

fn page_iter_to_array_nested<
I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<(Arc<dyn Array>, Vec<Box<dyn Nested>>)> {
use DataType::*;
match data_type {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32),
Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8),
Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16),
Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32),

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_array_nested(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x),
},
_ => unreachable!(),
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x)
}
UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64),

Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x),
Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x),

Boolean => boolean::iter_to_array_nested(iter, metadata, data_type),

Binary | Utf8 => binary::iter_to_array_nested::<i32, _, _>(iter, metadata, data_type),
LargeBinary | LargeUtf8 => {
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
List(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
Ok((create_list(data_type, &mut nested, values)?.into(), nested))
}
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
))),
}
}

/// Converts an iterator of [`DataPage`] into a single [`Array`].
pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError>>(
iter: &mut I,
Expand Down Expand Up @@ -255,47 +316,16 @@ pub fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error =
},
_ => unreachable!(),
},
List(ref inner) => match inner.data_type() {
UInt8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u8),
UInt16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u16),
UInt32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as u32),
Int8 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i8),
Int16 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i16),
Int32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i32| x as i32),

Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_array_nested(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_array(iter, metadata, data_type, |x: i64| x),
},
_ => unreachable!(),
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x)
}
UInt64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: i64| x as u64),

Float32 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f32| x),
Float64 => primitive::iter_to_array_nested(iter, metadata, data_type, |x: f64| x),

Boolean => boolean::iter_to_array_nested(iter, metadata, data_type),

Binary | Utf8 => binary::iter_to_array_nested::<i32, _, _>(iter, metadata, data_type),
LargeBinary | LargeUtf8 => {
binary::iter_to_array_nested::<i64, _, _>(iter, metadata, data_type)
}
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
))),
},
List(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
create_list(data_type, &mut nested, values)
}
LargeList(ref inner) => {
let (values, mut nested) =
page_iter_to_array_nested(iter, metadata, inner.data_type().clone())?;
create_list(data_type, &mut nested, values)
}

Dictionary(ref key, _) => match key.as_ref() {
Int8 => dict_read::<i8, _>(iter, metadata, data_type),
Expand Down
Loading