Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 4, 2022
1 parent e4c6008 commit 99736ea
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 115 deletions.
257 changes: 147 additions & 110 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ pub use parquet2::{
};

use crate::{
array::{Array, BinaryArray, DictionaryKey, ListArray, PrimitiveArray, StructArray, Utf8Array},
array::{
Array, BinaryArray, DictionaryKey, ListArray, MutablePrimitiveArray, PrimitiveArray,
StructArray, Utf8Array,
},
datatypes::{DataType, Field, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::primitive::read_item,
types::NativeType,
};

mod binary;
Expand Down Expand Up @@ -72,6 +76,31 @@ where
Box::new(iter.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)))
}

/// Converts an iterator of [MutablePrimitiveArray] into an iterator of [PrimitiveArray]
#[inline]
fn iden<T, I>(iter: I) -> impl Iterator<Item = Result<PrimitiveArray<T>>>
where
T: NativeType,
I: Iterator<Item = Result<MutablePrimitiveArray<T>>>,
{
iter.map(|x| x.map(|x| x.into()))
}

#[inline]
fn op<T, I, F>(iter: I, op: F) -> impl Iterator<Item = Result<PrimitiveArray<T>>>
where
T: NativeType,
I: Iterator<Item = Result<MutablePrimitiveArray<T>>>,
F: Fn(T) -> T + Copy,
{
iter.map(move |x| {
x.map(move |mut x| {
x.values_mut_slice().iter_mut().for_each(|x| *x = op(*x));
x.into()
})
})
}

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<R: Read + Seek>(
column_metadata: &ColumnChunkMetaData,
Expand Down Expand Up @@ -220,125 +249,147 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
})
}

fn timestamp<'a, I: 'a + DataPages>(
pages: I,
type_: &ParquetType,
data_type: DataType,
chunk_size: usize,
time_unit: TimeUnit,
) -> Result<ArrayIter<'a>> {
let (physical_type, logical_type) = if let ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} = type_
{
(physical_type, logical_type)
} else {
unreachable!()
};

if physical_type == &PhysicalType::Int96 {
if time_unit == TimeUnit::Nanosecond {
return Ok(dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
read_item,
int96_to_i64_ns,
))));
} else {
return Err(ArrowError::nyi(
"Can't decode int96 to timestamp other than ns",
));
}
};
if physical_type != &PhysicalType::Int64 {
return Err(ArrowError::nyi(
"Can't decode a timestamp from a non-int64 parquet type",
));
}

let iter = primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x);

let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type {
unit
} else {
return Ok(dyn_iter(iden(iter)));
};

Ok(match (unit, time_unit) {
(ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)),

(ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000_000)),

(ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x * 1_000)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => dyn_iter(iden(iter)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => dyn_iter(op(iter, |x| x / 1_000)),

(ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000_000)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => dyn_iter(op(iter, |x| x * 1_000)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => dyn_iter(iden(iter)),
})
}

fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages: I,
type_: &ParquetType,
field: Field,
data_type: DataType,
chunk_size: usize,
) -> Result<ArrayIter<'a>> {
use DataType::*;
match field.data_type.to_logical_type() {
Null => Ok(null::iter_to_arrays(pages, field.data_type, chunk_size)),
Boolean => Ok(boolean::iter_to_arrays(pages, field.data_type, chunk_size)),
UInt8 => Ok(dyn_iter(primitive::Iter::new(
match data_type.to_logical_type() {
Null => Ok(null::iter_to_arrays(pages, data_type, chunk_size)),
Boolean => Ok(boolean::iter_to_arrays(pages, data_type, chunk_size)),
UInt8 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as u8,
))),
UInt16 => Ok(dyn_iter(primitive::Iter::new(
)))),
UInt16 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as u16,
))),
UInt32 => Ok(dyn_iter(primitive::Iter::new(
)))),
UInt32 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as u32,
))),
Int8 => Ok(dyn_iter(primitive::Iter::new(
)))),
Int8 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as i8,
))),
Int16 => Ok(dyn_iter(primitive::Iter::new(
)))),
Int16 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as i16,
)))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i32| x as i32),
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => Ok(dyn_iter(
primitive::Iter::new(pages, field.data_type, chunk_size, read_item, |x: i32| {
x as i32
}),
)),

Timestamp(TimeUnit::Nanosecond, None) => match type_ {
ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} => match (physical_type, logical_type) {
(PhysicalType::Int96, _) => Ok(dyn_iter(primitive::Iter::new(
pages,
DataType::Timestamp(TimeUnit::Nanosecond, None),
chunk_size,
read_item,
int96_to_i64_ns,
))),
(_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => Ok(match unit {
ParquetTimeUnit::MILLIS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x * 1_000_000,
)),
ParquetTimeUnit::MICROS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x * 1_000,
)),
ParquetTimeUnit::NANOS(_) => dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x,
)),
}),
_ => Ok(dyn_iter(primitive::Iter::new(
pages,
field.data_type,
chunk_size,
read_item,
|x: i64| x,
))),
},
_ => unreachable!(),
},
Timestamp(time_unit, None) => {
let time_unit = *time_unit;
timestamp(pages, type_, data_type, chunk_size, time_unit)
}

FixedSizeBinary(_) => Ok(Box::new(
fixed_size_binary::BinaryArrayIterator::new(pages, field.data_type, chunk_size)
fixed_size_binary::BinaryArrayIterator::new(pages, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as _)),
)),

Decimal(_, _) => match type_ {
ParquetType::PrimitiveType { physical_type, .. } => Ok(match physical_type {
PhysicalType::Int32 => dyn_iter(primitive::Iter::new(
PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i32| x as i128,
)),
PhysicalType::Int64 => dyn_iter(primitive::Iter::new(
))),
PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i64| x as i128,
)),
))),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
Expand Down Expand Up @@ -372,7 +423,7 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(
let validity = array.validity().cloned();

Ok(PrimitiveArray::<i128>::from_data(
field.data_type.clone(),
data_type.clone(),
values.into(),
validity,
))
Expand All @@ -388,63 +439,49 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>(
},

// INT64
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(dyn_iter(
primitive::Iter::new(pages, field.data_type, chunk_size, read_item, |x: i64| {
x as i64
}),
)),
UInt64 => Ok(dyn_iter(primitive::Iter::new(
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => Ok(dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, read_item, |x: i64| x as i64),
))),
UInt64 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: i64| x as u64,
))),
)))),

Float32 => Ok(dyn_iter(primitive::Iter::new(
Float32 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: f32| x,
))),
Float64 => Ok(dyn_iter(primitive::Iter::new(
)))),
Float64 => Ok(dyn_iter(iden(primitive::Iter::new(
pages,
field.data_type,
data_type,
chunk_size,
read_item,
|x: f64| x,
))),
)))),

Binary => Ok(binary::iter_to_arrays::<i32, BinaryArray<i32>, _>(
pages,
field.data_type,
chunk_size,
pages, data_type, chunk_size,
)),
LargeBinary => Ok(binary::iter_to_arrays::<i64, BinaryArray<i64>, _>(
pages,
field.data_type,
chunk_size,
pages, data_type, chunk_size,
)),
Utf8 => Ok(binary::iter_to_arrays::<i32, Utf8Array<i32>, _>(
pages,
field.data_type,
chunk_size,
pages, data_type, chunk_size,
)),
LargeUtf8 => Ok(binary::iter_to_arrays::<i64, Utf8Array<i64>, _>(
pages,
field.data_type,
chunk_size,
pages, data_type, chunk_size,
)),

Dictionary(key_type, _, _) => match_integer_type!(key_type, |$K| {
dict_read::<$K, _>(pages, type_, field.data_type, chunk_size)
dict_read::<$K, _>(pages, type_, data_type, chunk_size)
}),

/*LargeList(inner) | List(inner) => {
let data_type = inner.data_type.clone();
page_iter_to_arrays_nested(pages, type_, field, data_type, chunk_size)
}*/
other => Err(ArrowError::NotYetImplemented(format!(
"Reading {:?} from parquet still not implemented",
other
Expand Down Expand Up @@ -545,7 +582,7 @@ where
page_iter_to_arrays(
columns.pop().unwrap(),
types.pop().unwrap(),
field,
field.data_type,
chunk_size,
)?
.map(|x| Ok((NestedState::new(vec![]), x?))),
Expand Down
Loading

0 comments on commit 99736ea

Please sign in to comment.