Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
support decimal256(39,0)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Mar 1, 2023
1 parent 2512564 commit fc945dc
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 14 deletions.
4 changes: 4 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("decimal256_39", pa.decimal256(39, 0)),
pa.field("decimal256_75", pa.decimal256(75, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
Expand All @@ -58,6 +59,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_75": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -94,6 +96,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False),
pa.field("decimal256_75", pa.decimal256(75, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -112,6 +115,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"decimal256_75": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl From<PrimitiveType> for DataType {
PrimitiveType::UInt32 => DataType::UInt32,
PrimitiveType::UInt64 => DataType::UInt64,
PrimitiveType::Int128 => DataType::Decimal(32, 32),
PrimitiveType::Int256 => DataType::Decimal256(32, 32),
PrimitiveType::Int256 => DataType::Decimal256(64, 64),
PrimitiveType::Float16 => DataType::Float16,
PrimitiveType::Float32 => DataType::Float32,
PrimitiveType::Float64 => DataType::Float64,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| super::super::convert_i256(value, n))
.map(|value: &[u8]| super::super::convert_i256(value))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

Expand Down
41 changes: 38 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,44 @@ fn convert_i128(value: &[u8], n: usize) -> i128 {
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}

fn convert_i256(value: &[u8], n: usize) -> i256 {
fn convert_min_max_i256(value: &[u8]) -> i256 {
let mut bytes = [0u8; 32];
bytes[..n].copy_from_slice(value);
let mut neg_bytes = [255u8; 32];
if value[0] >= 128 {
neg_bytes[32-value.len()..].copy_from_slice(value);
from_be_bytes(neg_bytes)
} else {
bytes[32-value.len()..].copy_from_slice(value);
from_be_bytes(bytes)
}
}

fn convert_i256(value: &[u8]) -> i256 {
let bytes = [0u8; 32];
let neg_bytes = [255u8; 32];

i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n)))
fn deal_bytes(value: &[u8], mut bytes: [u8; 32]) -> i256 {
if value.len() < 32 {
bytes[32-value.len()..].copy_from_slice(value);
from_be_bytes(bytes)
} else {
bytes.copy_from_slice(value);
i256::from_be_bytes(bytes)
}
}

if value[0] >= 128 {
deal_bytes(value, neg_bytes)
} else {
deal_bytes(value, bytes)
}
}

fn from_be_bytes(bytes: [u8; 32]) -> i256 {
let (a, b) = bytes.split_at(16);
let a: [u8; 16] = a.try_into().unwrap();
let b: [u8; 16] = b.try_into().unwrap();
let a = i128::from_be_bytes(a);
let b = i128::from_be_bytes(b);
i256(ethnum::I256::from_words(a, b))
}
8 changes: 4 additions & 4 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::io::parquet::read::convert_min_max_i256;
use crate::types::{days_ms, i256};

use super::super::{convert_days_ms, convert_i128, convert_i256};
use super::super::{convert_days_ms, convert_i128};

pub(super) fn push_i128(
from: Option<&dyn ParquetStatistics>,
Expand Down Expand Up @@ -61,7 +62,6 @@ pub(super) fn push_i256_with_i128(

pub(super) fn push_i256(
from: Option<&dyn ParquetStatistics>,
n: usize,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
Expand All @@ -75,8 +75,8 @@ pub(super) fn push_i256(
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n))));
min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_min_max_i256(x))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_min_max_i256(x))));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ fn push(
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max),
ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max),
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
Expand Down
6 changes: 4 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ pub fn array_to_page_simple(
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
} else {
let size = decimal_length_from_precision(precision);
//let size = decimal_length_from_precision(precision);
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
Expand All @@ -532,7 +533,8 @@ pub fn array_to_page_simple(
};
let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[32 - size..];
//let bytes = &x.to_be_bytes()[32 - size..];
let bytes = &x.to_be_bytes();
values.extend_from_slice(bytes)
});
let array = FixedSizeBinaryArray::new(
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,9 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
None,
)?)
} else {
let len = decimal_length_from_precision(precision);
Ok(ParquetType::try_from_primitive(
name,
PhysicalType::FixedLenByteArray(len),
PhysicalType::FixedLenByteArray(32),
repetition,
None,
None,
Expand Down

0 comments on commit fc945dc

Please sign in to comment.