From 8018a7fe951e8b44480eb3203bf3f51c3f0add7d Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Fri, 3 Mar 2023 16:20:44 +0800 Subject: [PATCH] Added support for decimal256 read/write in parquet (#1412) --- parquet_integration/write_parquet.py | 21 +- src/io/parquet/read/deserialize/simple.rs | 73 ++++++ .../parquet/read/indexes/fixed_len_binary.rs | 11 + src/io/parquet/read/indexes/mod.rs | 27 +++ src/io/parquet/read/indexes/primitive.rs | 15 +- src/io/parquet/read/mod.rs | 13 ++ src/io/parquet/read/statistics/fixlen.rs | 55 ++++- src/io/parquet/read/statistics/mod.rs | 18 ++ src/io/parquet/write/fixed_len_bytes.rs | 45 ++++ src/io/parquet/write/mod.rs | 81 +++++++ src/io/parquet/write/schema.rs | 44 ++++ src/types/native.rs | 12 +- tests/it/compute/sort/row/mod.rs | 6 +- tests/it/io/parquet/mod.rs | 122 ++++++++++ tests/it/io/parquet/read.rs | 125 ++++++++++ tests/it/io/parquet/write.rs | 221 ++++++++++++++++++ 16 files changed, 877 insertions(+), 12 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 1bb262e5439..acfd819d57c 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -32,13 +32,17 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0)), pa.field("decimal_18", pa.decimal128(18, 0)), pa.field("decimal_26", pa.decimal128(26, 0)), + pa.field("decimal256_9", pa.decimal256(9, 0)), + pa.field("decimal256_18", pa.decimal256(18, 0)), + pa.field("decimal256_26", pa.decimal256(26, 0)), + pa.field("decimal256_39", pa.decimal256(39, 0)), + pa.field("decimal256_76", pa.decimal256(76, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), pa.field("timestamp_s_utc", pa.timestamp("s", "UTC")), ] schema = pa.schema(fields) - return ( { "int64": int64, @@ -51,6 +55,11 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, + "decimal256_9": decimal, + "decimal256_18": decimal, + "decimal256_26": decimal, + "decimal256_39": decimal, + "decimal256_76": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -83,6 +92,11 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), + pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False), + pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False), + pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False), + pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False), + pa.field("decimal256_76", pa.decimal256(76, 0), nullable=False), ] schema = pa.schema(fields) @@ -97,6 +111,11 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, + "decimal256_9": decimal, + "decimal256_18": decimal, + "decimal256_26": decimal, + "decimal256_39": decimal, + "decimal256_76": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 95563836d69..b4b614980e8 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -1,3 +1,4 @@ +use ethnum::I256; use parquet2::{ schema::types::{ PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, @@ -5,6 +6,7 @@ use parquet2::{ types::int96_to_i64_ns, }; +use crate::types::i256; use crate::{ array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, @@ -228,6 +230,77 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } + (PhysicalType::Int32, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i32| i256(I256::new(x as i128)), + ))), + (PhysicalType::Int64, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i64| i256(I256::new(x as i128)), + ))), + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => { + let n = *n; + + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n)))) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => { + let n = *n; + + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(super::super::convert_i256) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => { + return Err(Error::NotYetImplemented(format!( + "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" + ))) + } (PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, diff --git a/src/io/parquet/read/indexes/fixed_len_binary.rs b/src/io/parquet/read/indexes/fixed_len_binary.rs index 16e3348746c..f10ba552ef3 100644 --- a/src/io/parquet/read/indexes/fixed_len_binary.rs +++ b/src/io/parquet/read/indexes/fixed_len_binary.rs @@ -1,5 +1,6 @@ use parquet2::indexes::PageIndex; +use crate::types::{i256, NativeType}; use crate::{ array::{Array, FixedSizeBinaryArray, MutableFixedSizeBinaryArray, PrimitiveArray}, datatypes::{DataType, PhysicalType, PrimitiveType}, @@ -42,6 +43,16 @@ fn deserialize_binary_iter<'a, I: TrustedLen>>>( }) }))) } + PhysicalType::Primitive(PrimitiveType::Int256) => { + Box::new(PrimitiveArray::from_trusted_len_iter(iter.map(|v| { + v.map(|x| { + let n = x.len(); + let mut bytes = [0u8; 32]; + bytes[..n].copy_from_slice(x); + i256::from_be_bytes(bytes) + }) + }))) + } _ => { let mut a = MutableFixedSizeBinaryArray::try_new( data_type, diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index e040478e12c..08a8dde5d8c 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -108,6 +108,33 @@ fn deserialize( ))), } } + PhysicalType::Primitive(PrimitiveType::Int256) => { + let index = indexes.pop_front().unwrap(); + match index.physical_type() { + ParquetPhysicalType::Int32 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_i32(&index.indexes, data_type).into()) + } + parquet2::schema::types::PhysicalType::Int64 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok( + primitive::deserialize_i64( + &index.indexes, + &index.primitive_type, + data_type, + ) + .into(), + ) + } + parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => { + let index = index.as_any().downcast_ref::().unwrap(); + Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into()) + } + other => Err(Error::nyi(format!( + "Deserialize {other:?} to arrow's int64" + ))), + } + } PhysicalType::Primitive(PrimitiveType::UInt8) | PhysicalType::Primitive(PrimitiveType::UInt16) | PhysicalType::Primitive(PrimitiveType::UInt32) diff --git a/src/io/parquet/read/indexes/primitive.rs b/src/io/parquet/read/indexes/primitive.rs index 636661f5223..96fdc0b7b5a 100644 --- a/src/io/parquet/read/indexes/primitive.rs +++ b/src/io/parquet/read/indexes/primitive.rs @@ -1,3 +1,4 @@ +use ethnum::I256; use parquet2::indexes::PageIndex; use parquet2::schema::types::{PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit}; use parquet2::types::int96_to_i64_ns; @@ -5,7 +6,7 @@ use parquet2::types::int96_to_i64_ns; use crate::array::{Array, MutablePrimitiveArray, PrimitiveArray}; use crate::datatypes::{DataType, TimeUnit}; use crate::trusted_len::TrustedLen; -use crate::types::NativeType; +use crate::types::{i256, NativeType}; use super::ColumnPageStatistics; @@ -32,6 +33,12 @@ fn deserialize_int32>>( PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) .to(data_type), ), + Decimal256(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter( + iter.map(|x| x.map(|x| i256(I256::new(x.into())))), + ) + .to(data_type), + ) as _, _ => Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)), } } @@ -110,6 +117,12 @@ fn deserialize_int64>>( PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) .to(data_type), ) as _, + Decimal256(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter( + iter.map(|x| x.map(|x| i256(I256::new(x.into())))), + ) + .to(data_type), + ) as _, Timestamp(time_unit, _) => { let mut array = MutablePrimitiveArray::::from_trusted_len_iter(iter).to(data_type.clone()); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 1171b9cf018..baaffd6d446 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -34,6 +34,7 @@ pub use parquet2::{ use crate::{array::Array, error::Result}; +use crate::types::{i256, NativeType}; pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, InitNested, NestedArrayIter, NestedState, StructIterator, @@ -81,3 +82,15 @@ fn convert_i128(value: &[u8], n: usize) -> i128 { bytes[..n].copy_from_slice(value); i128::from_be_bytes(bytes) >> (8 * (16 - n)) } + +fn convert_i256(value: &[u8]) -> i256 { + if value[0] >= 128 { + let mut neg_bytes = [255u8; 32]; + neg_bytes[32 - value.len()..].copy_from_slice(value); + i256::from_be_bytes(neg_bytes) + } else { + let mut bytes = [0u8; 32]; + bytes[32 - value.len()..].copy_from_slice(value); + i256::from_be_bytes(bytes) + } +} diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index f90ca9f4f34..04d881da3b1 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -1,8 +1,10 @@ +use ethnum::I256; use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; -use crate::types::days_ms; +use crate::io::parquet::read::convert_i256; +use crate::types::{days_ms, i256}; use super::super::{convert_days_ms, convert_i128}; @@ -28,6 +30,57 @@ pub(super) fn push_i128( Ok(()) } +pub(super) fn push_i256_with_i128( + from: Option<&dyn ParquetStatistics>, + n: usize, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| { + s.min_value + .as_deref() + .map(|x| i256(I256::new(convert_i128(x, n)))) + })); + max.push(from.and_then(|s| { + s.max_value + .as_deref() + .map(|x| i256(I256::new(convert_i128(x, n)))) + })); + + Ok(()) +} + +pub(super) fn push_i256( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| s.min_value.as_deref().map(convert_i256))); + max.push(from.and_then(|s| s.max_value.as_deref().map(convert_i256))); + + Ok(()) +} + pub(super) fn push( from: Option<&dyn ParquetStatistics>, min: &mut dyn MutableArray, diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 927695d2905..f3c1ed9e8de 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -1,4 +1,5 @@ //! APIs exposing `parquet2`'s statistics as arrow's statistics. +use ethnum::I256; use std::collections::VecDeque; use std::sync::Arc; @@ -17,6 +18,7 @@ use crate::datatypes::IntervalUnit; use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Error; use crate::error::Result; +use crate::types::i256; mod binary; mod boolean; @@ -515,6 +517,22 @@ fn push( ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i128(from, *n, min, max), _ => unreachable!(), }, + Decimal256(_, _) => match physical_type { + ParquetPhysicalType::Int32 => { + primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into())))) + } + ParquetPhysicalType::Int64 => { + primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into())))) + } + ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => { + fixlen::push_i256_with_i128(from, *n, min, max) + } + ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( + format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), + )), + ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max), + _ => unreachable!(), + }, Binary => binary::push::(from, min, max), LargeBinary => binary::push::(from, min, max), Utf8 => utf8::push::(from, min, max), diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index db871bb817e..91b641da17f 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -6,6 +6,7 @@ use parquet2::{ }; use super::{binary::ord_binary, utils, WriteOptions}; +use crate::types::i256; use crate::{ array::{Array, FixedSizeBinaryArray, PrimitiveArray}, error::Result, @@ -103,3 +104,47 @@ pub(super) fn build_statistics_decimal( .map(|x| x.to_be_bytes()[16 - size..].to_vec()), } } + +pub(super) fn build_statistics_decimal256_with_i128( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), + } +} + +pub(super) fn build_statistics_decimal256( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), + } +} diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 0d928748793..d2723a4808b 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -30,6 +30,7 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{Error, Result}; use crate::types::days_ms; +use crate::types::i256; use crate::types::NativeType; pub use nested::write_rep_and_def; @@ -463,6 +464,86 @@ pub fn array_to_page_simple( fixed_len_bytes::array_to_page(array, options, type_, statistics) } + DataType::Decimal256(precision, _) => { + let type_ = type_; + let precision = *precision; + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + if precision <= 9 { + let values = array + .values() + .iter() + .map(|x| x.0.as_i32()) + .collect::>() + .into(); + + let array = + PrimitiveArray::::new(DataType::Int32, values, array.validity().cloned()); + primitive::array_to_page_integer::(&array, options, type_, encoding) + } else if precision <= 18 { + let values = array + .values() + .iter() + .map(|x| x.0.as_i64()) + .collect::>() + .into(); + + let array = + PrimitiveArray::::new(DataType::Int64, values, array.validity().cloned()); + primitive::array_to_page_integer::(&array, options, type_, encoding) + } else if precision <= 38 { + let size = decimal_length_from_precision(precision); + let statistics = if options.write_statistics { + let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( + array, + type_.clone(), + size, + ); + Some(stats) + } else { + None + }; + + let mut values = Vec::::with_capacity(size * array.len()); + array.values().iter().for_each(|x| { + let bytes = &x.0.low().to_be_bytes()[16 - size..]; + values.extend_from_slice(bytes) + }); + let array = FixedSizeBinaryArray::new( + DataType::FixedSizeBinary(size), + values.into(), + array.validity().cloned(), + ); + fixed_len_bytes::array_to_page(&array, options, type_, statistics) + } else { + let size = 32; + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let statistics = if options.write_statistics { + let stats = + fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); + Some(stats) + } else { + None + }; + let mut values = Vec::::with_capacity(size * array.len()); + array.values().iter().for_each(|x| { + let bytes = &x.to_be_bytes(); + values.extend_from_slice(bytes) + }); + let array = FixedSizeBinaryArray::new( + DataType::FixedSizeBinary(size), + values.into(), + array.validity().cloned(), + ); + + fixed_len_bytes::array_to_page(&array, options, type_, statistics) + } + } DataType::Decimal(precision, _) => { let type_ = type_; let precision = *precision; diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index d5008603bb5..48dd853ea43 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -295,6 +295,50 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } + DataType::Decimal256(precision, scale) => { + let precision = *precision; + let scale = *scale; + let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale)); + + if precision <= 9 { + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::Int32, + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else if precision <= 18 { + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::Int64, + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else if precision <= 38 { + let len = decimal_length_from_precision(precision); + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::FixedLenByteArray(len), + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else { + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::FixedLenByteArray(32), + repetition, + None, + None, + None, + )?) + } + } DataType::Interval(_) => Ok(ParquetType::try_from_primitive( name, PhysicalType::FixedLenByteArray(12), diff --git a/src/types/native.rs b/src/types/native.rs index 19c1697d1cf..f66ceb8403e 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -513,7 +513,7 @@ impl NativeType for f16 { } /// Physical representation of a decimal -#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, PartialOrd, Ord)] #[allow(non_camel_case_types)] #[repr(C)] pub struct i256(pub ethnum::I256); @@ -577,14 +577,14 @@ impl NativeType for i256 { let mut bytes = [0u8; 32]; let (a, b) = self.0.into_words(); - let b = b.to_be_bytes(); + let a = a.to_be_bytes(); (0..16).for_each(|i| { - bytes[i] = b[i]; + bytes[i] = a[i]; }); - let a = a.to_be_bytes(); + let b = b.to_be_bytes(); (0..16).for_each(|i| { - bytes[i + 16] = a[i]; + bytes[i + 16] = b[i]; }); bytes @@ -592,7 +592,7 @@ impl NativeType for i256 { #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { - let (b, a) = bytes.split_at(16); + let (a, b) = bytes.split_at(16); let a: [u8; 16] = a.try_into().unwrap(); let b: [u8; 16] = b.try_into().unwrap(); let a = i128::from_be_bytes(a); diff --git a/tests/it/compute/sort/row/mod.rs b/tests/it/compute/sort/row/mod.rs index d82d548623c..4931689a192 100644 --- a/tests/it/compute/sort/row/mod.rs +++ b/tests/it/compute/sort/row/mod.rs @@ -68,12 +68,12 @@ fn test_decimal256() { let col = Int256Array::from_iter([ None, Some(i256::from_words(i128::MIN, i128::MIN)), - Some(i256::from_words(0, -1)), - Some(i256::from_words(i128::MAX, -1)), - Some(i256::from_words(i128::MAX, 0)), Some(i256::from_words(0, 46_i128)), + Some(i256::from_words(0, -1)), Some(i256::from_words(5, 46_i128)), + Some(i256::from_words(i128::MAX, 0)), Some(i256::from_words(i128::MAX, i128::MAX)), + Some(i256::from_words(i128::MAX, -1)), ]) .to(DataType::Decimal256(76, 7)) .to_boxed(); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index bd0ff01e9ec..cdf5b41573a 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,5 +1,7 @@ +use ethnum::AsI256; use std::io::{Cursor, Read, Seek}; +use arrow2::types::i256; use arrow2::{ array::*, bitmap::Bitmap, @@ -528,6 +530,41 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } + "decimal256_9" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(9, 0))) + } + "decimal256_18" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(18, 0))) + } + "decimal256_26" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) + } + "decimal256_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } + "decimal256_76" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(76, 0))) + } "timestamp_us" => Box::new( PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), @@ -614,6 +651,56 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(26, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(26, 0))), }, + "decimal256_9" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(9, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(9, 0)), + ), + }, + "decimal256_18" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(18, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(18, 0)), + ), + }, + "decimal256_26" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(26, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(26, 0)), + ), + }, + "decimal256_39" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(39, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)), + ), + }, + "decimal256_76" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(76, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(76, 0)), + ), + }, "timestamp_us" => Statistics { distinct_count: UInt64Array::from([None]).boxed(), null_count: UInt64Array::from([Some(3)]).boxed(), @@ -694,6 +781,41 @@ pub fn pyarrow_required(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } + "decimal256_9" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(9, 0))) + } + "decimal256_18" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(18, 0))) + } + "decimal256_26" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) + } + "decimal256_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } + "decimal256_76" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(76, 0))) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 93a9e428185..786bdf6f96d 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -401,6 +401,56 @@ fn v1_decimal_26_required() -> Result<()> { test_pyarrow_integration("decimal_26", 1, "basic", false, true, None) } +#[test] +fn v1_decimal256_9_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_9", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_9_required() -> Result<()> { + test_pyarrow_integration("decimal256_9", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_18_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_18", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_18_required() -> Result<()> { + test_pyarrow_integration("decimal256_18", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_26_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_26", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_26_required() -> Result<()> { + test_pyarrow_integration("decimal256_26", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_39", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_39_required() -> Result<()> { + test_pyarrow_integration("decimal256_39", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_76_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_76", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_76_required() -> Result<()> { + test_pyarrow_integration("decimal256_76", 1, "basic", false, true, None) +} + #[test] fn v2_decimal_9_nullable() -> Result<()> { test_pyarrow_integration("decimal_9", 2, "basic", false, false, None) @@ -436,6 +486,31 @@ fn v2_decimal_26_nullable() -> Result<()> { test_pyarrow_integration("decimal_26", 2, "basic", false, false, None) } +#[test] +fn v2_decimal256_9_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_18_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_26_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_76_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", false, false, None) +} + #[test] fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None) @@ -466,6 +541,56 @@ fn v2_decimal_26_required_dict() -> Result<()> { test_pyarrow_integration("decimal_26", 2, "basic", true, true, None) } +#[test] +fn v2_decimal256_9_required() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_9_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_18_required() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_18_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_26_required() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_26_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_39_required() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_39_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_76_required() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_76_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", true, true, None) +} + #[test] fn v1_struct_required_optional() -> Result<()> { test_pyarrow_integration("struct", 1, "struct", false, false, None) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index a38e82fd65c..439710eb243 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -71,6 +71,7 @@ fn round_trip_opt_stats( let data = writer.into_inner().into_inner(); let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; + assert_eq!(array.as_ref(), result.as_ref()); if check_stats { assert_eq!(statistics, stats); @@ -539,6 +540,116 @@ fn decimal_26_required_v1() -> Result<()> { ) } +#[test] +fn decimal256_9_optional_v1() -> Result<()> { + round_trip( + "decimal256_9", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_9_required_v1() -> Result<()> { + round_trip( + "decimal256_9", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_optional_v1() -> Result<()> { + round_trip( + "decimal256_18", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_required_v1() -> Result<()> { + round_trip( + "decimal256_18", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_optional_v1() -> Result<()> { + round_trip( + "decimal256_26", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_required_v1() -> Result<()> { + round_trip( + "decimal256_26", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_optional_v1() -> Result<()> { + round_trip( + "decimal256_39", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_required_v1() -> Result<()> { + round_trip( + "decimal256_39", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_optional_v1() -> Result<()> { + round_trip( + "decimal256_76", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_required_v1() -> Result<()> { + round_trip( + "decimal256_76", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn decimal_9_optional_v2() -> Result<()> { round_trip( @@ -605,6 +716,116 @@ fn decimal_26_required_v2() -> Result<()> { ) } +#[test] +fn decimal256_9_optional_v2() -> Result<()> { + round_trip( + "decimal256_9", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_9_required_v2() -> Result<()> { + round_trip( + "decimal256_9", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_optional_v2() -> Result<()> { + round_trip( + "decimal256_18", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_required_v2() -> Result<()> { + round_trip( + "decimal256_18", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_optional_v2() -> Result<()> { + round_trip( + "decimal256_26", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_required_v2() -> Result<()> { + round_trip( + "decimal256_26", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_optional_v2() -> Result<()> { + round_trip( + "decimal256_39", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_required_v2() -> Result<()> { + round_trip( + "decimal256_39", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_optional_v2() -> Result<()> { + round_trip( + "decimal256_76", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_required_v2() -> Result<()> { + round_trip( + "decimal256_76", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn struct_v1() -> Result<()> { round_trip(