Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
support decimal256(9,_), decimal256(18,_), decimal256(38,_)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Feb 28, 2023
1 parent 977aa07 commit dfa251d
Show file tree
Hide file tree
Showing 14 changed files with 486 additions and 63 deletions.
17 changes: 12 additions & 5 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal_9", pa.decimal128(9, 0)),
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
pa.field("decimal_39", pa.decimal256(39, 0)),
pa.field("decimal256_9", pa.decimal256(9, 0)),
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
pa.field("timestamp_s_utc", pa.timestamp("s", "UTC")),
]
schema = pa.schema(fields)

return (
{
"int64": int64,
Expand All @@ -52,7 +53,9 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"decimal_39": decimal,
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -85,7 +88,9 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal_9", pa.decimal128(9, 0), nullable=False),
pa.field("decimal_18", pa.decimal128(18, 0), nullable=False),
pa.field("decimal_26", pa.decimal128(26, 0), nullable=False),
pa.field("decimal_39", pa.decimal256(39, 0), nullable=False),
pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False),
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -100,7 +105,9 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"decimal_39": decimal,
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
51 changes: 46 additions & 5 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethnum::I256;
use parquet2::{
schema::types::{
PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
Expand Down Expand Up @@ -229,12 +230,47 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"
)))
(PhysicalType::Int32, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
num_rows,
chunk_size,
|x: i32| i256(I256::new(x as i128)),
))),
(PhysicalType::Int64, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
num_rows,
chunk_size,
|x: i64| i256(I256::new(x as i128)),
))),
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {
let n = *n;

let pages = fixed_size_binary::Iter::new(
pages,
DataType::FixedSizeBinary(n),
num_rows,
chunk_size,
);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i256>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) => {
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {
let n = *n;

let pages = fixed_size_binary::Iter::new(
Expand All @@ -260,6 +296,11 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"
)))
}
(PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
Expand Down
15 changes: 15 additions & 0 deletions src/io/parquet/read/indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ fn deserialize(
PhysicalType::Primitive(PrimitiveType::Int256) => {
let index = indexes.pop_front().unwrap();
match index.physical_type() {
ParquetPhysicalType::Int32 => {
let index = index.as_any().downcast_ref::<NativeIndex<i32>>().unwrap();
Ok(primitive::deserialize_i32(&index.indexes, data_type).into())
}
parquet2::schema::types::PhysicalType::Int64 => {
let index = index.as_any().downcast_ref::<NativeIndex<i64>>().unwrap();
Ok(
primitive::deserialize_i64(
&index.indexes,
&index.primitive_type,
data_type,
)
.into(),
)
}
parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => {
let index = index.as_any().downcast_ref::<FixedLenByteIndex>().unwrap();
Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into())
Expand Down
15 changes: 14 additions & 1 deletion src/io/parquet/read/indexes/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use ethnum::I256;
use parquet2::indexes::PageIndex;
use parquet2::schema::types::{PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit};
use parquet2::types::int96_to_i64_ns;

use crate::array::{Array, MutablePrimitiveArray, PrimitiveArray};
use crate::datatypes::{DataType, TimeUnit};
use crate::trusted_len::TrustedLen;
use crate::types::NativeType;
use crate::types::{i256, NativeType};

use super::ColumnPageStatistics;

Expand All @@ -32,6 +33,12 @@ fn deserialize_int32<I: TrustedLen<Item = Option<i32>>>(
PrimitiveArray::<i128>::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128)))
.to(data_type),
),
Decimal256(_, _) => Box::new(
PrimitiveArray::<i256>::from_trusted_len_iter(
iter.map(|x| x.map(|x| i256(I256::new(x.into())))),
)
.to(data_type),
) as _,
_ => Box::new(PrimitiveArray::<i32>::from_trusted_len_iter(iter).to(data_type)),
}
}
Expand Down Expand Up @@ -110,6 +117,12 @@ fn deserialize_int64<I: TrustedLen<Item = Option<i64>>>(
PrimitiveArray::<i128>::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128)))
.to(data_type),
) as _,
Decimal256(_, _) => Box::new(
PrimitiveArray::<i256>::from_trusted_len_iter(
iter.map(|x| x.map(|x| i256(I256::new(x.into())))),
)
.to(data_type),
) as _,
Timestamp(time_unit, _) => {
let mut array =
MutablePrimitiveArray::<i64>::from_trusted_len_iter(iter).to(data_type.clone());
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
/// any physical column.
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec<Field> {
println!("in parquet_to_arrow_schema");
fields.iter().filter_map(to_field).collect::<Vec<_>>()
}

Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use self::metadata::parse_key_value_metadata;
/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded,
/// indicating that that the file's arrow metadata was incorrectly written.
pub fn infer_schema(file_metadata: &FileMetaData) -> Result<Schema> {
println!("in infer_schema");
let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata());

let schema = read_schema_from_metadata(&mut metadata)?;
Expand Down
31 changes: 31 additions & 0 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethnum::I256;
use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
Expand Down Expand Up @@ -28,6 +29,36 @@ pub(super) fn push_i128(
Ok(())
}

pub(super) fn push_i256_with_i128(
from: Option<&dyn ParquetStatistics>,
n: usize,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| {
s.min_value
.as_deref()
.map(|x| i256(I256::new(convert_i128(x, n))))
}));
max.push(from.and_then(|s| {
s.max_value
.as_deref()
.map(|x| i256(I256::new(convert_i128(x, n))))
}));

Ok(())
}

pub(super) fn push_i256(
from: Option<&dyn ParquetStatistics>,
n: usize,
Expand Down
18 changes: 17 additions & 1 deletion src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! APIs exposing `parquet2`'s statistics as arrow's statistics.
use ethnum::I256;
use std::collections::VecDeque;
use std::sync::Arc;

Expand All @@ -17,6 +18,7 @@ use crate::datatypes::IntervalUnit;
use crate::datatypes::{DataType, Field, PhysicalType};
use crate::error::Error;
use crate::error::Result;
use crate::types::i256;

mod binary;
mod boolean;
Expand Down Expand Up @@ -516,10 +518,24 @@ fn push(
_ => unreachable!(),
},
Decimal256(_, _) => match physical_type {
ParquetPhysicalType::Int32 => {
primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::Int64 => {
println!("in push static Decimal256");
primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => {
println!("in push FixedLenByteArray n {:?}", n);
fixlen::push_i256_with_i128(from, *n, min, max)
}
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max),
ParquetPhysicalType::FixedLenByteArray(n) => {
println!("in push FixedLenByteArray n {:?}", n);
fixlen::push_i256(from, *n, min, max)
}
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
Expand Down
22 changes: 22 additions & 0 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ pub(super) fn build_statistics_decimal(
}
}

pub(super) fn build_statistics_decimal256_with_i128(
array: &PrimitiveArray<i256>,
primitive_type: PrimitiveType,
size: usize,
) -> FixedLenStatistics {
FixedLenStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max()
.map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()),
min_value: array
.iter()
.flatten()
.min()
.map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()),
}
}

pub(super) fn build_statistics_decimal256(
array: &PrimitiveArray<i256>,
primitive_type: PrimitiveType,
Expand Down
Loading

0 comments on commit dfa251d

Please sign in to comment.