Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 13, 2021
1 parent 4cf86d6 commit 12bcd2a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
8 changes: 5 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use parquet2::{

use crate::{
array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
datatypes::{DataType, Field, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::nested_utils::{create_list, init_nested},
};
Expand Down Expand Up @@ -366,15 +366,17 @@ fn finish_array(data_type: DataType, arrays: &mut VecDeque<Box<dyn Array>>) -> B
#[allow(clippy::type_complexity)]
pub fn column_iter_to_array<II, I>(
mut columns: I,
data_type: DataType,
field: &Field,
mut buffer: Vec<u8>,
) -> Result<(Box<dyn Array>, Vec<u8>, Vec<u8>)>
where
II: Iterator<Item = std::result::Result<CompressedDataPage, ParquetError>>,
I: ColumnChunkIter<II>,
{
let mut nested_info = vec![];
init_nested(columns.field(), 0, &mut nested_info);
init_nested(field, 0, &mut nested_info);

let data_type = field.data_type().clone();

let mut arrays = VecDeque::new();
let page_buffer;
Expand Down
54 changes: 32 additions & 22 deletions src/io/parquet/read/nested_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::sync::Arc;

use parquet2::schema::{types::ParquetType, Repetition};

use crate::{
array::{Array, ListArray},
bitmap::{Bitmap, MutableBitmap},
buffer::{Buffer, MutableBuffer},
datatypes::DataType,
datatypes::{DataType, Field},
error::{ArrowError, Result},
};

Expand Down Expand Up @@ -214,29 +212,41 @@ pub fn extend_offsets<R, D>(
});
}

pub fn init_nested(field: &ParquetType, capacity: usize, container: &mut Vec<Box<dyn Nested>>) {
match field {
ParquetType::PrimitiveType { basic_info, .. } => {
container.push(
Box::new(NestedPrimitive::new(super::schema::is_nullable(basic_info)))
as Box<dyn Nested>,
);
pub fn init_nested(field: &Field, capacity: usize, container: &mut Vec<Box<dyn Nested>>) {
let is_nullable = field.is_nullable();

use crate::datatypes::PhysicalType::*;
match field.data_type().to_physical_type() {
Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) => {
container.push(Box::new(NestedPrimitive::new(is_nullable)) as Box<dyn Nested>)
}
ParquetType::GroupType {
basic_info, fields, ..
} => {
if basic_info.repetition() != &Repetition::Repeated {
let item = if super::schema::is_nullable(basic_info) {
Box::new(NestedOptional::with_capacity(capacity)) as Box<dyn Nested>
} else {
Box::new(NestedValid::with_capacity(capacity)) as Box<dyn Nested>
};
container.push(item);
List | LargeList | FixedSizeList => {
if is_nullable {
container.push(Box::new(NestedOptional::with_capacity(capacity)) as Box<dyn Nested>)
} else {
container.push(Box::new(NestedValid::with_capacity(capacity)) as Box<dyn Nested>)
}
for field in fields {
init_nested(field, capacity, container)
match field.data_type().to_logical_type() {
DataType::List(ref inner)
| DataType::List(ref inner)
| DataType::FixedSizeList(ref inner, _) => {
init_nested(inner.as_ref(), capacity, container)
}
_ => unreachable!(),
};
}
Struct => {
container.push(Box::new(NestedPrimitive::new(is_nullable)) as Box<dyn Nested>);
if let DataType::Struct(fields) = field.data_type().to_logical_type() {
fields
.iter()
.for_each(|field| init_nested(field, capacity, container));
} else {
unreachable!()
}
}
_ => todo!(),
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/read/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ impl<R: Read + Seek> Iterator for RecordReader<R> {
b1,
);

let (array, b1, b2) =
column_iter_to_array(column_iter, field.data_type().clone(), b2)?;
let (array, b1, b2) = column_iter_to_array(column_iter, field, b2)?;

let array = if array.len() > remaining_rows {
array.slice(0, remaining_rows)
Expand Down

0 comments on commit 12bcd2a

Please sign in to comment.