Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 4, 2022
1 parent f35e02a commit e4c6008
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 62 deletions.
82 changes: 46 additions & 36 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ impl<I: FallibleStreamingIterator<Item = DataPage, Error = ParquetError> + Send
{
}

/// Converts an iterator of arrays to a trait object returning trait objects
#[inline]
fn dyn_iter<'a, A, I>(iter: I) -> ArrayIter<'a>
where
A: Array + 'static,
I: Iterator<Item = Result<A>> + Send + Sync + 'a,
{
Box::new(iter.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)))
}

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<R: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -220,90 +230,90 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(
match field.data_type.to_logical_type() {
Null => Ok(null::iter_to_arrays(pages, field.data_type, chunk_size)),
Boolean => Ok(boolean::iter_to_arrays(pages, field.data_type, chunk_size)),
UInt8 => Ok(primitive::iter_to_arrays(
UInt8 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as u8,
)),
UInt16 => Ok(primitive::iter_to_arrays(
))),
UInt16 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as u16,
)),
UInt32 => Ok(primitive::iter_to_arrays(
))),
UInt32 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as u32,
)),
Int8 => Ok(primitive::iter_to_arrays(
))),
Int8 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as i8,
)),
Int16 => Ok(primitive::iter_to_arrays(
))),
Int16 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as i16,
)),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(
primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i32| {
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(dyn_iter(
primitive::Iter::new(pages, field.data_type, chunk_size, read_item, |x: i32| {
x as i32
}),
),
)),

Timestamp(TimeUnit::Nanosecond, None) => match type_ {
ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} => match (physical_type, logical_type) {
(PhysicalType::Int96, _) => Ok(primitive::iter_to_arrays(
(PhysicalType::Int96, _) => Ok(dyn_iter(primitive::Iter::new(
pages,
DataType::Timestamp(TimeUnit::Nanosecond, None),
chunk_size,
read_item,
int96_to_i64_ns,
)),
))),
(_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => Ok(match unit {
ParquetTimeUnit::MILLIS(_) => primitive::iter_to_arrays(
ParquetTimeUnit::MILLIS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x * 1_000_000,
),
ParquetTimeUnit::MICROS(_) => primitive::iter_to_arrays(
)),
ParquetTimeUnit::MICROS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x * 1_000,
),
ParquetTimeUnit::NANOS(_) => primitive::iter_to_arrays(
)),
ParquetTimeUnit::NANOS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x,
),
)),
}),
_ => Ok(primitive::iter_to_arrays(
_ => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x,
)),
))),
},
_ => unreachable!(),
},
Expand All @@ -315,20 +325,20 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(

Decimal(_, _) => match type_ {
ParquetType::PrimitiveType { physical_type, .. } => Ok(match physical_type {
PhysicalType::Int32 => primitive::iter_to_arrays(
PhysicalType::Int32 => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i32| x as i128,
),
PhysicalType::Int64 => primitive::iter_to_arrays(
)),
PhysicalType::Int64 => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x as i128,
),
)),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
Expand Down Expand Up @@ -378,33 +388,33 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(
primitive::iter_to_arrays(pages, field.data_type, chunk_size, read_item, |x: i64| {
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(dyn_iter(
primitive::Iter::new(pages, field.data_type, chunk_size, read_item, |x: i64| {
x as i64
}),
),
UInt64 => Ok(primitive::iter_to_arrays(
)),
UInt64 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x as u64,
)),
))),

Float32 => Ok(primitive::iter_to_arrays(
Float32 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: f32| x,
)),
Float64 => Ok(primitive::iter_to_arrays(
))),
Float64 => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: f64| x,
)),
))),

Binary => Ok(binary::iter_to_arrays::<i32, BinaryArray<i32>, _>(
pages,
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub(super) fn finish<T: NativeType>(

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct PrimitiveArrayIterator<T, I, P, G, F>
pub struct Iter<T, I, P, G, F>
where
I: DataPages,
T: NativeType,
Expand All @@ -290,7 +290,7 @@ where
phantom: std::marker::PhantomData<P>,
}

impl<T, I, P, G, F> PrimitiveArrayIterator<T, I, P, G, F>
impl<T, I, P, G, F> Iter<T, I, P, G, F>
where
I: DataPages,
T: NativeType,
Expand All @@ -312,7 +312,7 @@ where
}
}

impl<T, I, P, G, F> Iterator for PrimitiveArrayIterator<T, I, P, G, F>
impl<T, I, P, G, F> Iterator for Iter<T, I, P, G, F>
where
I: DataPages,
T: NativeType,
Expand Down
24 changes: 1 addition & 23 deletions src/io/parquet/read/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,11 @@ use std::sync::Arc;

use crate::{array::Array, datatypes::DataType};

use super::ArrayIter;
use super::{nested_utils::*, DataPages};

use basic::PrimitiveArrayIterator;
pub use basic::Iter;
use nested::ArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I, T, P, G, F>(
iter: I,
data_type: DataType,
chunk_size: usize,
op1: G,
op2: F,
) -> ArrayIter<'a>
where
I: 'a + DataPages,
T: crate::types::NativeType,
P: parquet2::types::NativeType,
G: 'a + Copy + Send + Sync + for<'b> Fn(&'b [u8]) -> P,
F: 'a + Copy + Send + Sync + Fn(P) -> T,
{
Box::new(
PrimitiveArrayIterator::<T, I, P, G, F>::new(iter, data_type, chunk_size, op1, op2)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I, T, P, G, F>(
iter: I,
Expand Down

0 comments on commit e4c6008

Please sign in to comment.