Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for decimal256 read/write in parquet (#1412)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason authored Mar 3, 2023
1 parent d06323a commit 9749aee
Show file tree
Hide file tree
Showing 16 changed files with 877 additions and 12 deletions.
21 changes: 20 additions & 1 deletion parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal_9", pa.decimal128(9, 0)),
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
pa.field("decimal256_9", pa.decimal256(9, 0)),
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("decimal256_39", pa.decimal256(39, 0)),
pa.field("decimal256_76", pa.decimal256(76, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
pa.field("timestamp_s_utc", pa.timestamp("s", "UTC")),
]
schema = pa.schema(fields)

return (
{
"int64": int64,
Expand All @@ -51,6 +55,11 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -83,6 +92,11 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal_9", pa.decimal128(9, 0), nullable=False),
pa.field("decimal_18", pa.decimal128(18, 0), nullable=False),
pa.field("decimal_26", pa.decimal128(26, 0), nullable=False),
pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False),
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False),
pa.field("decimal256_76", pa.decimal256(76, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -97,6 +111,11 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal_9": decimal,
"decimal_18": decimal,
"decimal_26": decimal,
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_76": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
73 changes: 73 additions & 0 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use ethnum::I256;
use parquet2::{
schema::types::{
PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
},
types::int96_to_i64_ns,
};

use crate::types::i256;
use crate::{
array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
Expand Down Expand Up @@ -228,6 +230,77 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(

Box::new(arrays) as _
}
(PhysicalType::Int32, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
num_rows,
chunk_size,
|x: i32| i256(I256::new(x as i128)),
))),
(PhysicalType::Int64, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
num_rows,
chunk_size,
|x: i64| i256(I256::new(x as i128)),
))),
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {
let n = *n;

let pages = fixed_size_binary::Iter::new(
pages,
DataType::FixedSizeBinary(n),
num_rows,
chunk_size,
);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i256>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {
let n = *n;

let pages = fixed_size_binary::Iter::new(
pages,
DataType::FixedSizeBinary(n),
num_rows,
chunk_size,
);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(super::super::convert_i256)
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i256>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}
(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"
)))
}
(PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new(
pages,
data_type,
Expand Down
11 changes: 11 additions & 0 deletions src/io/parquet/read/indexes/fixed_len_binary.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::indexes::PageIndex;

use crate::types::{i256, NativeType};
use crate::{
array::{Array, FixedSizeBinaryArray, MutableFixedSizeBinaryArray, PrimitiveArray},
datatypes::{DataType, PhysicalType, PrimitiveType},
Expand Down Expand Up @@ -42,6 +43,16 @@ fn deserialize_binary_iter<'a, I: TrustedLen<Item = Option<&'a Vec<u8>>>>(
})
})))
}
PhysicalType::Primitive(PrimitiveType::Int256) => {
Box::new(PrimitiveArray::from_trusted_len_iter(iter.map(|v| {
v.map(|x| {
let n = x.len();
let mut bytes = [0u8; 32];
bytes[..n].copy_from_slice(x);
i256::from_be_bytes(bytes)
})
})))
}
_ => {
let mut a = MutableFixedSizeBinaryArray::try_new(
data_type,
Expand Down
27 changes: 27 additions & 0 deletions src/io/parquet/read/indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,33 @@ fn deserialize(
))),
}
}
PhysicalType::Primitive(PrimitiveType::Int256) => {
let index = indexes.pop_front().unwrap();
match index.physical_type() {
ParquetPhysicalType::Int32 => {
let index = index.as_any().downcast_ref::<NativeIndex<i32>>().unwrap();
Ok(primitive::deserialize_i32(&index.indexes, data_type).into())
}
parquet2::schema::types::PhysicalType::Int64 => {
let index = index.as_any().downcast_ref::<NativeIndex<i64>>().unwrap();
Ok(
primitive::deserialize_i64(
&index.indexes,
&index.primitive_type,
data_type,
)
.into(),
)
}
parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => {
let index = index.as_any().downcast_ref::<FixedLenByteIndex>().unwrap();
Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into())
}
other => Err(Error::nyi(format!(
"Deserialize {other:?} to arrow's int64"
))),
}
}
PhysicalType::Primitive(PrimitiveType::UInt8)
| PhysicalType::Primitive(PrimitiveType::UInt16)
| PhysicalType::Primitive(PrimitiveType::UInt32)
Expand Down
15 changes: 14 additions & 1 deletion src/io/parquet/read/indexes/primitive.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use ethnum::I256;
use parquet2::indexes::PageIndex;
use parquet2::schema::types::{PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit};
use parquet2::types::int96_to_i64_ns;

use crate::array::{Array, MutablePrimitiveArray, PrimitiveArray};
use crate::datatypes::{DataType, TimeUnit};
use crate::trusted_len::TrustedLen;
use crate::types::NativeType;
use crate::types::{i256, NativeType};

use super::ColumnPageStatistics;

Expand All @@ -32,6 +33,12 @@ fn deserialize_int32<I: TrustedLen<Item = Option<i32>>>(
PrimitiveArray::<i128>::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128)))
.to(data_type),
),
Decimal256(_, _) => Box::new(
PrimitiveArray::<i256>::from_trusted_len_iter(
iter.map(|x| x.map(|x| i256(I256::new(x.into())))),
)
.to(data_type),
) as _,
_ => Box::new(PrimitiveArray::<i32>::from_trusted_len_iter(iter).to(data_type)),
}
}
Expand Down Expand Up @@ -110,6 +117,12 @@ fn deserialize_int64<I: TrustedLen<Item = Option<i64>>>(
PrimitiveArray::<i128>::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128)))
.to(data_type),
) as _,
Decimal256(_, _) => Box::new(
PrimitiveArray::<i256>::from_trusted_len_iter(
iter.map(|x| x.map(|x| i256(I256::new(x.into())))),
)
.to(data_type),
) as _,
Timestamp(time_unit, _) => {
let mut array =
MutablePrimitiveArray::<i64>::from_trusted_len_iter(iter).to(data_type.clone());
Expand Down
13 changes: 13 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use parquet2::{

use crate::{array::Array, error::Result};

use crate::types::{i256, NativeType};
pub use deserialize::{
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
InitNested, NestedArrayIter, NestedState, StructIterator,
Expand Down Expand Up @@ -81,3 +82,15 @@ fn convert_i128(value: &[u8], n: usize) -> i128 {
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8]) -> i256 {
if value[0] >= 128 {
let mut neg_bytes = [255u8; 32];
neg_bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(neg_bytes)
} else {
let mut bytes = [0u8; 32];
bytes[32 - value.len()..].copy_from_slice(value);
i256::from_be_bytes(bytes)
}
}
55 changes: 54 additions & 1 deletion src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use ethnum::I256;
use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::types::days_ms;
use crate::io::parquet::read::convert_i256;
use crate::types::{days_ms, i256};

use super::super::{convert_days_ms, convert_i128};

Expand All @@ -28,6 +30,57 @@ pub(super) fn push_i128(
Ok(())
}

pub(super) fn push_i256_with_i128(
from: Option<&dyn ParquetStatistics>,
n: usize,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| {
s.min_value
.as_deref()
.map(|x| i256(I256::new(convert_i128(x, n))))
}));
max.push(from.and_then(|s| {
s.max_value
.as_deref()
.map(|x| i256(I256::new(convert_i128(x, n))))
}));

Ok(())
}

pub(super) fn push_i256(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i256>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(convert_i256)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_i256)));

Ok(())
}

pub(super) fn push(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
Expand Down
18 changes: 18 additions & 0 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! APIs exposing `parquet2`'s statistics as arrow's statistics.
use ethnum::I256;
use std::collections::VecDeque;
use std::sync::Arc;

Expand All @@ -17,6 +18,7 @@ use crate::datatypes::IntervalUnit;
use crate::datatypes::{DataType, Field, PhysicalType};
use crate::error::Error;
use crate::error::Result;
use crate::types::i256;

mod binary;
mod boolean;
Expand Down Expand Up @@ -515,6 +517,22 @@ fn push(
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i128(from, *n, min, max),
_ => unreachable!(),
},
Decimal256(_, _) => match physical_type {
ParquetPhysicalType::Int32 => {
primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::Int64 => {
primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => {
fixlen::push_i256_with_i128(from, *n, min, max)
}
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max),
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
LargeBinary => binary::push::<i64>(from, min, max),
Utf8 => utf8::push::<i32>(from, min, max),
Expand Down
Loading

0 comments on commit 9749aee

Please sign in to comment.