From fc945dc353950b47384c991689686ecd96cef6f7 Mon Sep 17 00:00:00 2001 From: taichong Date: Wed, 1 Mar 2023 19:35:51 +0800 Subject: [PATCH] support decimal256(39,0) --- parquet_integration/write_parquet.py | 4 +++ src/datatypes/mod.rs | 2 +- src/io/parquet/read/deserialize/simple.rs | 2 +- src/io/parquet/read/mod.rs | 41 +++++++++++++++++++++-- src/io/parquet/read/statistics/fixlen.rs | 8 ++--- src/io/parquet/read/statistics/mod.rs | 2 +- src/io/parquet/write/mod.rs | 6 ++-- src/io/parquet/write/schema.rs | 3 +- 8 files changed, 54 insertions(+), 14 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index d6e6dfe1b51..b0829ad4659 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -36,6 +36,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_18", pa.decimal256(18, 0)), pa.field("decimal256_26", pa.decimal256(26, 0)), pa.field("decimal256_39", pa.decimal256(39, 0)), + pa.field("decimal256_75", pa.decimal256(75, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), @@ -58,6 +59,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal256_18": decimal, "decimal256_26": decimal, "decimal256_39": decimal, + "decimal256_75": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -94,6 +96,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False), pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False), pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False), + pa.field("decimal256_75", pa.decimal256(75, 0), nullable=False), ] schema = pa.schema(fields) @@ -112,6 +115,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal256_18": decimal, "decimal256_26": decimal, "decimal256_39": decimal, + "decimal256_75": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index dafbb57848e..f5a13cc2040 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -302,7 +302,7 @@ impl From for DataType { PrimitiveType::UInt32 => DataType::UInt32, PrimitiveType::UInt64 => DataType::UInt64, PrimitiveType::Int128 => DataType::Decimal(32, 32), - PrimitiveType::Int256 => DataType::Decimal256(32, 32), + PrimitiveType::Int256 => DataType::Decimal256(64, 64), PrimitiveType::Float16 => DataType::Float16, PrimitiveType::Float32 => DataType::Float32, PrimitiveType::Float64 => DataType::Float64, diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 0b1e3926239..586b3ad9249 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -285,7 +285,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( let values = array .values() .chunks_exact(n) - .map(|value: &[u8]| super::super::convert_i256(value, n)) + .map(|value: &[u8]| super::super::convert_i256(value)) .collect::>(); let validity = array.validity().cloned(); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index b9d60c68cf0..90b7ab3ff79 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -83,9 +83,44 @@ fn convert_i128(value: &[u8], n: usize) -> i128 { i128::from_be_bytes(bytes) >> (8 * (16 - n)) } -fn convert_i256(value: &[u8], n: usize) -> i256 { +fn convert_min_max_i256(value: &[u8]) -> i256 { let mut bytes = [0u8; 32]; - bytes[..n].copy_from_slice(value); + let mut neg_bytes = [255u8; 32]; + if value[0] >= 128 { + neg_bytes[32-value.len()..].copy_from_slice(value); + from_be_bytes(neg_bytes) + } else { + bytes[32-value.len()..].copy_from_slice(value); + from_be_bytes(bytes) + } +} + +fn convert_i256(value: &[u8]) -> i256 { + let bytes = [0u8; 32]; + let neg_bytes = [255u8; 32]; - i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n))) + fn deal_bytes(value: &[u8], mut bytes: [u8; 32]) -> i256 { + if value.len() < 32 { + bytes[32-value.len()..].copy_from_slice(value); + from_be_bytes(bytes) + } else { + bytes.copy_from_slice(value); + i256::from_be_bytes(bytes) + } + } + + if value[0] >= 128 { + deal_bytes(value, neg_bytes) + } else { + deal_bytes(value, bytes) + } } + +fn from_be_bytes(bytes: [u8; 32]) -> i256 { + let (a, b) = bytes.split_at(16); + let a: [u8; 16] = a.try_into().unwrap(); + let b: [u8; 16] = b.try_into().unwrap(); + let a = i128::from_be_bytes(a); + let b = i128::from_be_bytes(b); + i256(ethnum::I256::from_words(a, b)) +} \ No newline at end of file diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index 1362d22bb97..ed884c9be4e 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -3,9 +3,10 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; +use crate::io::parquet::read::convert_min_max_i256; use crate::types::{days_ms, i256}; -use super::super::{convert_days_ms, convert_i128, convert_i256}; +use super::super::{convert_days_ms, convert_i128}; pub(super) fn push_i128( from: Option<&dyn ParquetStatistics>, @@ -61,7 +62,6 @@ pub(super) fn push_i256_with_i128( pub(super) fn push_i256( from: Option<&dyn ParquetStatistics>, - n: usize, min: &mut dyn MutableArray, max: &mut dyn MutableArray, ) -> Result<()> { @@ -75,8 +75,8 @@ pub(super) fn push_i256( .unwrap(); let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); - min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n)))); - max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n)))); + min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_min_max_i256(x)))); + max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_min_max_i256(x)))); Ok(()) } diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 27f137f0f34..f3c1ed9e8de 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -530,7 +530,7 @@ fn push( ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), )), - ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), + ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max), _ => unreachable!(), }, Binary => binary::push::(from, min, max), diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 34c07a535b6..2d7e1c26208 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -518,7 +518,8 @@ pub fn array_to_page_simple( ); fixed_len_bytes::array_to_page(&array, options, type_, statistics) } else { - let size = decimal_length_from_precision(precision); + //let size = decimal_length_from_precision(precision); + let size = 32; let array = array .as_any() .downcast_ref::>() @@ -532,7 +533,8 @@ pub fn array_to_page_simple( }; let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { - let bytes = &x.to_be_bytes()[32 - size..]; + //let bytes = &x.to_be_bytes()[32 - size..]; + let bytes = &x.to_be_bytes(); values.extend_from_slice(bytes) }); let array = FixedSizeBinaryArray::new( diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 68e7786e49a..f4af963a8ed 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -329,10 +329,9 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } else { - let len = decimal_length_from_precision(precision); Ok(ParquetType::try_from_primitive( name, - PhysicalType::FixedLenByteArray(len), + PhysicalType::FixedLenByteArray(32), repetition, None, None,