From 8e1e258484215ba637f53aec201f6e7f87a3c833 Mon Sep 17 00:00:00 2001 From: taichong Date: Tue, 21 Feb 2023 21:13:39 +0800 Subject: [PATCH 1/9] feat(parquet): add support decimal256 read/write in parquet --- src/io/parquet/read/deserialize/simple.rs | 32 +++++++++++++++++++++++ src/io/parquet/read/mod.rs | 8 ++++++ src/io/parquet/write/mod.rs | 29 ++++++++++++++++++++ src/io/parquet/write/schema.rs | 8 ++++++ 4 files changed, 77 insertions(+) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 95563836d69..b9fedd8528b 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -5,6 +5,7 @@ use parquet2::{ types::int96_to_i64_ns, }; +use crate::types::i256; use crate::{ array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, @@ -228,6 +229,37 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 16 => { + return Err(Error::NotYetImplemented(format!( + "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" + ))) + } + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) => { + let n = *n; + + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(16), + num_rows, + chunk_size, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(16) + .map(|value: &[u8]| super::super::convert_i256(value, n)) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } (PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 1171b9cf018..f1c20eef3ef 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -34,6 +34,7 @@ pub use parquet2::{ use crate::{array::Array, error::Result}; +use crate::types::{i256, NativeType}; pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, InitNested, NestedArrayIter, NestedState, StructIterator, @@ -81,3 +82,10 @@ fn convert_i128(value: &[u8], n: usize) -> i128 { bytes[..n].copy_from_slice(value); i128::from_be_bytes(bytes) >> (8 * (16 - n)) } + +fn convert_i256(value: &[u8], n: usize) -> i256 { + let mut bytes = [0u8; 32]; + bytes[..n].copy_from_slice(value); + + i256::from_be_bytes(bytes) +} diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 0d928748793..d94fd79f874 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -30,6 +30,7 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{Error, Result}; use crate::types::days_ms; +use crate::types::i256; use crate::types::NativeType; pub use nested::write_rep_and_def; @@ -463,6 +464,34 @@ pub fn array_to_page_simple( fixed_len_bytes::array_to_page(array, options, type_, statistics) } + DataType::Decimal256(precision, _) => { + let type_ = type_; + let precision = *precision; + let size = 16; + println!("the array is {:?}", array.clone()); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let mut values = Vec::::with_capacity(size * array.len()); + array.values().iter().for_each(|x| { + let bytes = &x.to_be_bytes()[16 - size..]; + values.extend_from_slice(bytes) + }); + let array = FixedSizeBinaryArray::new( + DataType::FixedSizeBinary(size), + values.into(), + array.validity().cloned(), + ); + let statistics = if options.write_statistics { + let stats = fixed_len_bytes::build_statistics(&array, type_.clone()); + Some(stats) + } else { + None + }; + + fixed_len_bytes::array_to_page(&array, options, type_, statistics) + } DataType::Decimal(precision, _) => { let type_ = type_; let precision = *precision; diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index d5008603bb5..ab9a24c8665 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -295,6 +295,14 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } + DataType::Decimal256(_, _) => Ok(ParquetType::try_from_primitive( + name, + PhysicalType::FixedLenByteArray(16), + repetition, + None, + None, + None, + )?), DataType::Interval(_) => Ok(ParquetType::try_from_primitive( name, PhysicalType::FixedLenByteArray(12), From c73c083bc14176f3e356e1879e96b25bff712d50 Mon Sep 17 00:00:00 2001 From: taichong Date: Wed, 22 Feb 2023 11:59:49 +0800 Subject: [PATCH 2/9] add test --- parquet_integration/write_parquet.py | 4 ++ .../parquet/read/indexes/fixed_len_binary.rs | 11 +++++ src/io/parquet/read/indexes/mod.rs | 12 +++++ src/io/parquet/read/statistics/fixlen.rs | 25 ++++++++++- src/io/parquet/read/statistics/mod.rs | 7 +++ src/io/parquet/write/mod.rs | 4 +- tests/it/io/parquet/mod.rs | 26 +++++++++++ tests/it/io/parquet/read.rs | 25 +++++++++++ tests/it/io/parquet/write.rs | 44 +++++++++++++++++++ 9 files changed, 154 insertions(+), 4 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 1bb262e5439..b486981f8bb 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -32,6 +32,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0)), pa.field("decimal_18", pa.decimal128(18, 0)), pa.field("decimal_26", pa.decimal128(26, 0)), + pa.field("decimal_39", pa.decimal256(39, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), @@ -51,6 +52,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, + "decimal_39": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -83,6 +85,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), + pa.field("decimal_39", pa.decimal256(39, 0), nullable=False), ] schema = pa.schema(fields) @@ -97,6 +100,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, + "decimal_39": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/io/parquet/read/indexes/fixed_len_binary.rs b/src/io/parquet/read/indexes/fixed_len_binary.rs index 16e3348746c..f10ba552ef3 100644 --- a/src/io/parquet/read/indexes/fixed_len_binary.rs +++ b/src/io/parquet/read/indexes/fixed_len_binary.rs @@ -1,5 +1,6 @@ use parquet2::indexes::PageIndex; +use crate::types::{i256, NativeType}; use crate::{ array::{Array, FixedSizeBinaryArray, MutableFixedSizeBinaryArray, PrimitiveArray}, datatypes::{DataType, PhysicalType, PrimitiveType}, @@ -42,6 +43,16 @@ fn deserialize_binary_iter<'a, I: TrustedLen>>>( }) }))) } + PhysicalType::Primitive(PrimitiveType::Int256) => { + Box::new(PrimitiveArray::from_trusted_len_iter(iter.map(|v| { + v.map(|x| { + let n = x.len(); + let mut bytes = [0u8; 32]; + bytes[..n].copy_from_slice(x); + i256::from_be_bytes(bytes) + }) + }))) + } _ => { let mut a = MutableFixedSizeBinaryArray::try_new( data_type, diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index e040478e12c..b44066a69fb 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -108,6 +108,18 @@ fn deserialize( ))), } } + PhysicalType::Primitive(PrimitiveType::Int256) => { + let index = indexes.pop_front().unwrap(); + match index.physical_type() { + parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => { + let index = index.as_any().downcast_ref::().unwrap(); + Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into()) + } + other => Err(Error::nyi(format!( + "Deserialize {other:?} to arrow's int64" + ))), + } + } PhysicalType::Primitive(PrimitiveType::UInt8) | PhysicalType::Primitive(PrimitiveType::UInt16) | PhysicalType::Primitive(PrimitiveType::UInt32) diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index f90ca9f4f34..62b41956b00 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -2,7 +2,8 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; -use crate::types::days_ms; +use crate::io::parquet::read::convert_i256; +use crate::types::{days_ms, i256}; use super::super::{convert_days_ms, convert_i128}; @@ -28,6 +29,28 @@ pub(super) fn push_i128( Ok(()) } +pub(super) fn push_i256( + from: Option<&dyn ParquetStatistics>, + n: usize, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n)))); + max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n)))); + + Ok(()) +} + pub(super) fn push( from: Option<&dyn ParquetStatistics>, min: &mut dyn MutableArray, diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 927695d2905..a8c9ded8cc4 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -515,6 +515,13 @@ fn push( ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i128(from, *n, min, max), _ => unreachable!(), }, + Decimal256(_, _) => match physical_type { + ParquetPhysicalType::FixedLenByteArray(n) if *n > 16 => Err(Error::NotYetImplemented( + format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), + )), + ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), + _ => unreachable!(), + }, Binary => binary::push::(from, min, max), LargeBinary => binary::push::(from, min, max), Utf8 => utf8::push::(from, min, max), diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index d94fd79f874..778a7e42e87 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -464,11 +464,9 @@ pub fn array_to_page_simple( fixed_len_bytes::array_to_page(array, options, type_, statistics) } - DataType::Decimal256(precision, _) => { + DataType::Decimal256(_, _) => { let type_ = type_; - let precision = *precision; let size = 16; - println!("the array is {:?}", array.clone()); let array = array .as_any() .downcast_ref::>() diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index bd0ff01e9ec..2f33fab3832 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1,5 +1,7 @@ +use ethnum::AsI256; use std::io::{Cursor, Read, Seek}; +use arrow2::types::i256; use arrow2::{ array::*, bitmap::Bitmap, @@ -528,6 +530,13 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } + "decimal_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } "timestamp_us" => Box::new( PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), @@ -614,6 +623,16 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(26, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(26, 0))), }, + "decimal_39" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(39, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)), + ), + }, "timestamp_us" => Statistics { distinct_count: UInt64Array::from([None]).boxed(), null_count: UInt64Array::from([Some(3)]).boxed(), @@ -694,6 +713,13 @@ pub fn pyarrow_required(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } + "decimal_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 93a9e428185..bb10778e74c 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -401,6 +401,16 @@ fn v1_decimal_26_required() -> Result<()> { test_pyarrow_integration("decimal_26", 1, "basic", false, true, None) } +#[test] +fn v1_decimal_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal_39", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal_39_required() -> Result<()> { + test_pyarrow_integration("decimal_39", 1, "basic", false, true, None) +} + #[test] fn v2_decimal_9_nullable() -> Result<()> { test_pyarrow_integration("decimal_9", 2, "basic", false, false, None) @@ -436,6 +446,11 @@ fn v2_decimal_26_nullable() -> Result<()> { test_pyarrow_integration("decimal_26", 2, "basic", false, false, None) } +#[test] +fn v2_decimal_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal_39", 2, "basic", false, false, None) +} + #[test] fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None) @@ -466,6 +481,16 @@ fn v2_decimal_26_required_dict() -> Result<()> { test_pyarrow_integration("decimal_26", 2, "basic", true, true, None) } +#[test] +fn v2_decimal_39_required() -> Result<()> { + test_pyarrow_integration("decimal_39", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal_39_required_dict() -> Result<()> { + test_pyarrow_integration("decimal_39", 2, "basic", true, true, None) +} + #[test] fn v1_struct_required_optional() -> Result<()> { test_pyarrow_integration("struct", 1, "struct", false, false, None) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index a38e82fd65c..9d4945cf75c 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -539,6 +539,28 @@ fn decimal_26_required_v1() -> Result<()> { ) } +#[test] +fn decimal_39_optional_v1() -> Result<()> { + round_trip( + "decimal_39", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal_39_required_v1() -> Result<()> { + round_trip( + "decimal_39", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn decimal_9_optional_v2() -> Result<()> { round_trip( @@ -605,6 +627,28 @@ fn decimal_26_required_v2() -> Result<()> { ) } +#[test] +fn decimal_39_optional_v2() -> Result<()> { + round_trip( + "decimal_39", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal_39_required_v2() -> Result<()> { + round_trip( + "decimal_39", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn struct_v1() -> Result<()> { round_trip( From 012559011d99a566d29471a754362c5456855de8 Mon Sep 17 00:00:00 2001 From: taichong Date: Wed, 22 Feb 2023 21:15:39 +0800 Subject: [PATCH 3/9] decimal256 to FixedLenByteArray(32) --- src/io/parquet/read/deserialize/simple.rs | 6 +++--- src/io/parquet/read/statistics/mod.rs | 2 +- src/io/parquet/write/mod.rs | 4 ++-- src/io/parquet/write/schema.rs | 2 +- tests/it/io/parquet/write.rs | 1 + 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index b9fedd8528b..370dd1c6a2b 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -229,7 +229,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } - (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 16 => { + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => { return Err(Error::NotYetImplemented(format!( "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" ))) @@ -239,7 +239,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( let pages = fixed_size_binary::Iter::new( pages, - DataType::FixedSizeBinary(16), + DataType::FixedSizeBinary(n), num_rows, chunk_size, ); @@ -248,7 +248,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( let array = maybe_array?; let values = array .values() - .chunks_exact(16) + .chunks_exact(n) .map(|value: &[u8]| super::super::convert_i256(value, n)) .collect::>(); let validity = array.validity().cloned(); diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index a8c9ded8cc4..d32d7167e01 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -516,7 +516,7 @@ fn push( _ => unreachable!(), }, Decimal256(_, _) => match physical_type { - ParquetPhysicalType::FixedLenByteArray(n) if *n > 16 => Err(Error::NotYetImplemented( + ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), )), ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 778a7e42e87..f2a52e76ae2 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -466,14 +466,14 @@ pub fn array_to_page_simple( } DataType::Decimal256(_, _) => { let type_ = type_; - let size = 16; + let size = 32; let array = array .as_any() .downcast_ref::>() .unwrap(); let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { - let bytes = &x.to_be_bytes()[16 - size..]; + let bytes = &x.to_be_bytes()[32 - size..]; values.extend_from_slice(bytes) }); let array = FixedSizeBinaryArray::new( diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index ab9a24c8665..057e2490807 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -297,7 +297,7 @@ pub fn to_parquet_type(field: &Field) -> Result { } DataType::Decimal256(_, _) => Ok(ParquetType::try_from_primitive( name, - PhysicalType::FixedLenByteArray(16), + PhysicalType::FixedLenByteArray(32), repetition, None, None, diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 9d4945cf75c..3660cf73619 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -71,6 +71,7 @@ fn round_trip_opt_stats( let data = writer.into_inner().into_inner(); let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; + assert_eq!(array.as_ref(), result.as_ref()); if check_stats { assert_eq!(statistics, stats); From 4cf76d7c521497fae4f54c984f2894a81e1a8866 Mon Sep 17 00:00:00 2001 From: taichong Date: Thu, 23 Feb 2023 12:44:27 +0800 Subject: [PATCH 4/9] decimal_39_required_v1/v2 result eq --- src/io/parquet/read/mod.rs | 2 +- src/io/parquet/read/row_group.rs | 3 +++ src/io/parquet/read/statistics/fixlen.rs | 3 +-- src/io/parquet/write/fixed_len_bytes.rs | 23 +++++++++++++++++++++++ src/io/parquet/write/mod.rs | 17 +++++++++-------- src/io/parquet/write/schema.rs | 4 ++-- src/types/native.rs | 2 +- 7 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index f1c20eef3ef..d173d6fc3cd 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -87,5 +87,5 @@ fn convert_i256(value: &[u8], n: usize) -> i256 { let mut bytes = [0u8; 32]; bytes[..n].copy_from_slice(value); - i256::from_be_bytes(bytes) + i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n))) } diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 176c6e83182..e475010e606 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -225,6 +225,9 @@ pub fn to_deserializer<'a>( (columns, types) } else { + for (meta, chunk) in columns.clone() { + println!("the meta is {:?},\nthe chunk is {:?}", meta, chunk); + } let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index 62b41956b00..6ad5aab9ff6 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -2,10 +2,9 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; -use crate::io::parquet::read::convert_i256; use crate::types::{days_ms, i256}; -use super::super::{convert_days_ms, convert_i128}; +use super::super::{convert_days_ms, convert_i128, convert_i256}; pub(super) fn push_i128( from: Option<&dyn ParquetStatistics>, diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index db871bb817e..357c14f2bb4 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -6,6 +6,7 @@ use parquet2::{ }; use super::{binary::ord_binary, utils, WriteOptions}; +use crate::types::i256; use crate::{ array::{Array, FixedSizeBinaryArray, PrimitiveArray}, error::Result, @@ -103,3 +104,25 @@ pub(super) fn build_statistics_decimal( .map(|x| x.to_be_bytes()[16 - size..].to_vec()), } } + +pub(super) fn build_statistics_decimal256( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), + } +} diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index f2a52e76ae2..fd46e55f582 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -464,13 +464,20 @@ pub fn array_to_page_simple( fixed_len_bytes::array_to_page(array, options, type_, statistics) } - DataType::Decimal256(_, _) => { + DataType::Decimal256(precision, _) => { let type_ = type_; - let size = 32; + let size = decimal_length_from_precision(*precision); let array = array .as_any() .downcast_ref::>() .unwrap(); + let statistics = if options.write_statistics { + let stats = + fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); + Some(stats) + } else { + None + }; let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { let bytes = &x.to_be_bytes()[32 - size..]; @@ -481,12 +488,6 @@ pub fn array_to_page_simple( values.into(), array.validity().cloned(), ); - let statistics = if options.write_statistics { - let stats = fixed_len_bytes::build_statistics(&array, type_.clone()); - Some(stats) - } else { - None - }; fixed_len_bytes::array_to_page(&array, options, type_, statistics) } diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 057e2490807..8911eff3170 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -295,9 +295,9 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } - DataType::Decimal256(_, _) => Ok(ParquetType::try_from_primitive( + DataType::Decimal256(precision, _) => Ok(ParquetType::try_from_primitive( name, - PhysicalType::FixedLenByteArray(32), + PhysicalType::FixedLenByteArray(decimal_length_from_precision(*precision)), repetition, None, None, diff --git a/src/types/native.rs b/src/types/native.rs index 19c1697d1cf..40b79bcf1a2 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -513,7 +513,7 @@ impl NativeType for f16 { } /// Physical representation of a decimal -#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, PartialOrd, Ord)] #[allow(non_camel_case_types)] #[repr(C)] pub struct i256(pub ethnum::I256); From 11cd899256f6fc64b154c9cbc7437847f7b3ddb9 Mon Sep 17 00:00:00 2001 From: taichong Date: Tue, 28 Feb 2023 21:39:17 +0800 Subject: [PATCH 5/9] support decimal256(9,_), decimal256(18,_), decimal256(38,_) --- parquet_integration/write_parquet.py | 17 ++-- src/io/parquet/read/deserialize/simple.rs | 51 +++++++++-- src/io/parquet/read/indexes/mod.rs | 15 ++++ src/io/parquet/read/indexes/primitive.rs | 15 +++- src/io/parquet/read/schema/convert.rs | 1 + src/io/parquet/read/schema/mod.rs | 1 + src/io/parquet/read/statistics/fixlen.rs | 31 +++++++ src/io/parquet/read/statistics/mod.rs | 18 +++- src/io/parquet/write/fixed_len_bytes.rs | 22 +++++ src/io/parquet/write/mod.rs | 89 ++++++++++++++---- src/io/parquet/write/schema.rs | 53 +++++++++-- tests/it/io/parquet/mod.rs | 62 +++++++++++-- tests/it/io/parquet/read.rs | 70 ++++++++++++--- tests/it/io/parquet/write.rs | 104 ++++++++++++++++++++-- 14 files changed, 486 insertions(+), 63 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index b486981f8bb..ede8e9de591 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -32,14 +32,15 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0)), pa.field("decimal_18", pa.decimal128(18, 0)), pa.field("decimal_26", pa.decimal128(26, 0)), - pa.field("decimal_39", pa.decimal256(39, 0)), + pa.field("decimal256_9", pa.decimal256(9, 0)), + pa.field("decimal256_18", pa.decimal256(18, 0)), + pa.field("decimal256_26", pa.decimal256(26, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), pa.field("timestamp_s_utc", pa.timestamp("s", "UTC")), ] schema = pa.schema(fields) - return ( { "int64": int64, @@ -52,7 +53,9 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, - "decimal_39": decimal, + "decimal256_9": decimal, + "decimal256_18": decimal, + "decimal256_26": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -85,7 +88,9 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), - pa.field("decimal_39", pa.decimal256(39, 0), nullable=False), + pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False), + pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False), + pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False), ] schema = pa.schema(fields) @@ -100,7 +105,9 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal_9": decimal, "decimal_18": decimal, "decimal_26": decimal, - "decimal_39": decimal, + "decimal256_9": decimal, + "decimal256_18": decimal, + "decimal256_26": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 370dd1c6a2b..0b1e3926239 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -1,3 +1,4 @@ +use ethnum::I256; use parquet2::{ schema::types::{ PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, @@ -229,12 +230,47 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } - (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => { - return Err(Error::NotYetImplemented(format!( - "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" - ))) + (PhysicalType::Int32, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i32| i256(I256::new(x as i128)), + ))), + (PhysicalType::Int64, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( + pages, + data_type, + num_rows, + chunk_size, + |x: i64| i256(I256::new(x as i128)), + ))), + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => { + let n = *n; + + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n)))) + .collect::>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ } - (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) => { + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => { let n = *n; let pages = fixed_size_binary::Iter::new( @@ -260,6 +296,11 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } + (PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => { + return Err(Error::NotYetImplemented(format!( + "Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}" + ))) + } (PhysicalType::Int32, Date64) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, diff --git a/src/io/parquet/read/indexes/mod.rs b/src/io/parquet/read/indexes/mod.rs index b44066a69fb..08a8dde5d8c 100644 --- a/src/io/parquet/read/indexes/mod.rs +++ b/src/io/parquet/read/indexes/mod.rs @@ -111,6 +111,21 @@ fn deserialize( PhysicalType::Primitive(PrimitiveType::Int256) => { let index = indexes.pop_front().unwrap(); match index.physical_type() { + ParquetPhysicalType::Int32 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok(primitive::deserialize_i32(&index.indexes, data_type).into()) + } + parquet2::schema::types::PhysicalType::Int64 => { + let index = index.as_any().downcast_ref::>().unwrap(); + Ok( + primitive::deserialize_i64( + &index.indexes, + &index.primitive_type, + data_type, + ) + .into(), + ) + } parquet2::schema::types::PhysicalType::FixedLenByteArray(_) => { let index = index.as_any().downcast_ref::().unwrap(); Ok(fixed_len_binary::deserialize(&index.indexes, data_type).into()) diff --git a/src/io/parquet/read/indexes/primitive.rs b/src/io/parquet/read/indexes/primitive.rs index 636661f5223..96fdc0b7b5a 100644 --- a/src/io/parquet/read/indexes/primitive.rs +++ b/src/io/parquet/read/indexes/primitive.rs @@ -1,3 +1,4 @@ +use ethnum::I256; use parquet2::indexes::PageIndex; use parquet2::schema::types::{PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit}; use parquet2::types::int96_to_i64_ns; @@ -5,7 +6,7 @@ use parquet2::types::int96_to_i64_ns; use crate::array::{Array, MutablePrimitiveArray, PrimitiveArray}; use crate::datatypes::{DataType, TimeUnit}; use crate::trusted_len::TrustedLen; -use crate::types::NativeType; +use crate::types::{i256, NativeType}; use super::ColumnPageStatistics; @@ -32,6 +33,12 @@ fn deserialize_int32>>( PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) .to(data_type), ), + Decimal256(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter( + iter.map(|x| x.map(|x| i256(I256::new(x.into())))), + ) + .to(data_type), + ) as _, _ => Box::new(PrimitiveArray::::from_trusted_len_iter(iter).to(data_type)), } } @@ -110,6 +117,12 @@ fn deserialize_int64>>( PrimitiveArray::::from_trusted_len_iter(iter.map(|x| x.map(|x| x as i128))) .to(data_type), ) as _, + Decimal256(_, _) => Box::new( + PrimitiveArray::::from_trusted_len_iter( + iter.map(|x| x.map(|x| i256(I256::new(x.into())))), + ) + .to(data_type), + ) as _, Timestamp(time_unit, _) => { let mut array = MutablePrimitiveArray::::from_trusted_len_iter(iter).to(data_type.clone()); diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 821d5107649..20fc0d50206 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -12,6 +12,7 @@ use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { + println!("in parquet_to_arrow_schema"); fields.iter().filter_map(to_field).collect::>() } diff --git a/src/io/parquet/read/schema/mod.rs b/src/io/parquet/read/schema/mod.rs index d47055ef6aa..cb1661760bb 100644 --- a/src/io/parquet/read/schema/mod.rs +++ b/src/io/parquet/read/schema/mod.rs @@ -21,6 +21,7 @@ use self::metadata::parse_key_value_metadata; /// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, /// indicating that that the file's arrow metadata was incorrectly written. pub fn infer_schema(file_metadata: &FileMetaData) -> Result { + println!("in infer_schema"); let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); let schema = read_schema_from_metadata(&mut metadata)?; diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index 6ad5aab9ff6..1362d22bb97 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -1,3 +1,4 @@ +use ethnum::I256; use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; @@ -28,6 +29,36 @@ pub(super) fn push_i128( Ok(()) } +pub(super) fn push_i256_with_i128( + from: Option<&dyn ParquetStatistics>, + n: usize, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| { + s.min_value + .as_deref() + .map(|x| i256(I256::new(convert_i128(x, n)))) + })); + max.push(from.and_then(|s| { + s.max_value + .as_deref() + .map(|x| i256(I256::new(convert_i128(x, n)))) + })); + + Ok(()) +} + pub(super) fn push_i256( from: Option<&dyn ParquetStatistics>, n: usize, diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index d32d7167e01..8a81074e8f4 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -1,4 +1,5 @@ //! APIs exposing `parquet2`'s statistics as arrow's statistics. +use ethnum::I256; use std::collections::VecDeque; use std::sync::Arc; @@ -17,6 +18,7 @@ use crate::datatypes::IntervalUnit; use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Error; use crate::error::Result; +use crate::types::i256; mod binary; mod boolean; @@ -516,10 +518,24 @@ fn push( _ => unreachable!(), }, Decimal256(_, _) => match physical_type { + ParquetPhysicalType::Int32 => { + primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into())))) + } + ParquetPhysicalType::Int64 => { + println!("in push static Decimal256"); + primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into())))) + } + ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => { + println!("in push FixedLenByteArray n {:?}", n); + fixlen::push_i256_with_i128(from, *n, min, max) + } ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), )), - ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), + ParquetPhysicalType::FixedLenByteArray(n) => { + println!("in push FixedLenByteArray n {:?}", n); + fixlen::push_i256(from, *n, min, max) + } _ => unreachable!(), }, Binary => binary::push::(from, min, max), diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 357c14f2bb4..91b641da17f 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -105,6 +105,28 @@ pub(super) fn build_statistics_decimal( } } +pub(super) fn build_statistics_decimal256_with_i128( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), + } +} + pub(super) fn build_statistics_decimal256( array: &PrimitiveArray, primitive_type: PrimitiveType, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index fd46e55f582..4600fb81cd1 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -466,30 +466,83 @@ pub fn array_to_page_simple( } DataType::Decimal256(precision, _) => { let type_ = type_; - let size = decimal_length_from_precision(*precision); + let precision = *precision; let array = array .as_any() .downcast_ref::>() .unwrap(); - let statistics = if options.write_statistics { - let stats = - fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); - Some(stats) + if precision <= 9 { + let values = array + .values() + .iter() + .map(|x| x.0.as_i32()) + .collect::>() + .into(); + + let array = + PrimitiveArray::::new(DataType::Int32, values, array.validity().cloned()); + primitive::array_to_page_integer::(&array, options, type_, encoding) + } else if precision <= 18 { + let values = array + .values() + .iter() + .map(|x| x.0.as_i64()) + .collect::>() + .into(); + + let array = + PrimitiveArray::::new(DataType::Int64, values, array.validity().cloned()); + primitive::array_to_page_integer::(&array, options, type_, encoding) + } else if precision <= 38 { + let size = decimal_length_from_precision(precision); + let statistics = if options.write_statistics { + let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( + array, + type_.clone(), + size, + ); + Some(stats) + } else { + None + }; + + let mut values = Vec::::with_capacity(size * array.len()); + array.values().iter().for_each(|x| { + let bytes = &x.0.low().to_be_bytes()[16 - size..]; + values.extend_from_slice(bytes) + }); + let array = FixedSizeBinaryArray::new( + DataType::FixedSizeBinary(size), + values.into(), + array.validity().cloned(), + ); + fixed_len_bytes::array_to_page(&array, options, type_, statistics) } else { - None - }; - let mut values = Vec::::with_capacity(size * array.len()); - array.values().iter().for_each(|x| { - let bytes = &x.to_be_bytes()[32 - size..]; - values.extend_from_slice(bytes) - }); - let array = FixedSizeBinaryArray::new( - DataType::FixedSizeBinary(size), - values.into(), - array.validity().cloned(), - ); + let size = decimal_length_from_precision(precision); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let statistics = if options.write_statistics { + let stats = + fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); + Some(stats) + } else { + None + }; + let mut values = Vec::::with_capacity(size * array.len()); + array.values().iter().for_each(|x| { + let bytes = &x.to_be_bytes()[32 - size..]; + values.extend_from_slice(bytes) + }); + let array = FixedSizeBinaryArray::new( + DataType::FixedSizeBinary(size), + values.into(), + array.validity().cloned(), + ); - fixed_len_bytes::array_to_page(&array, options, type_, statistics) + fixed_len_bytes::array_to_page(&array, options, type_, statistics) + } } DataType::Decimal(precision, _) => { let type_ = type_; diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 8911eff3170..e5d30d6fa65 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -295,14 +295,51 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } - DataType::Decimal256(precision, _) => Ok(ParquetType::try_from_primitive( - name, - PhysicalType::FixedLenByteArray(decimal_length_from_precision(*precision)), - repetition, - None, - None, - None, - )?), + DataType::Decimal256(precision, scale) => { + let precision = *precision; + let scale = *scale; + let logical_type = Some(PrimitiveLogicalType::Decimal(precision, scale)); + + if precision <= 9 { + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::Int32, + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else if precision <= 18 { + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::Int64, + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else if precision <= 38 { + let len = decimal_length_from_precision(precision); + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::FixedLenByteArray(len), + repetition, + Some(PrimitiveConvertedType::Decimal(precision, scale)), + logical_type, + None, + )?) + } else { + let len = decimal_length_from_precision(precision); + Ok(ParquetType::try_from_primitive( + name, + PhysicalType::FixedLenByteArray(len), + repetition, + None, + logical_type, + None, + )?) + } + } DataType::Interval(_) => Ok(ParquetType::try_from_primitive( name, PhysicalType::FixedLenByteArray(12), diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 2f33fab3832..7a4a30d56b4 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -530,12 +530,26 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } - "decimal_39" => { + "decimal256_9" => { let values = i64_values .iter() .map(|x| x.map(|x| i256(x.as_i256()))) .collect::>(); - Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(9, 0))) + } + "decimal256_18" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(18, 0))) + } + "decimal256_26" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) } "timestamp_us" => Box::new( PrimitiveArray::::from(i64_values) @@ -623,14 +637,34 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(26, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(26, 0))), }, - "decimal_39" => Statistics { + "decimal256_9" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(9, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(9, 0)), + ), + }, + "decimal256_18" => Statistics { distinct_count: UInt64Array::from([None]).boxed(), null_count: UInt64Array::from([Some(3)]).boxed(), min_value: Box::new( - Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(39, 0)), + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(18, 0)), ), max_value: Box::new( - Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)), + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(18, 0)), + ), + }, + "decimal256_26" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(26, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(26, 0)), ), }, "timestamp_us" => Statistics { @@ -713,12 +747,26 @@ pub fn pyarrow_required(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal(26, 0))) } - "decimal_39" => { + "decimal256_9" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(9, 0))) + } + "decimal256_18" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(18, 0))) + } + "decimal256_26" => { let values = i64_values .iter() .map(|x| x.map(|x| i256(x.as_i256()))) .collect::>(); - Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) } _ => unreachable!(), } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index bb10778e74c..17bdd2b9f6f 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -402,13 +402,33 @@ fn v1_decimal_26_required() -> Result<()> { } #[test] -fn v1_decimal_39_nullable() -> Result<()> { - test_pyarrow_integration("decimal_39", 1, "basic", false, false, None) +fn v1_decimal256_9_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_9", 1, "basic", false, false, None) } #[test] -fn v1_decimal_39_required() -> Result<()> { - test_pyarrow_integration("decimal_39", 1, "basic", false, true, None) +fn v1_decimal256_9_required() -> Result<()> { + test_pyarrow_integration("decimal256_9", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_18_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_18", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_18_required() -> Result<()> { + test_pyarrow_integration("decimal256_18", 1, "basic", false, true, None) +} + +#[test] +fn v1_decimal256_26_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_26", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_26_required() -> Result<()> { + test_pyarrow_integration("decimal256_26", 1, "basic", false, true, None) } #[test] @@ -447,8 +467,18 @@ fn v2_decimal_26_nullable() -> Result<()> { } #[test] -fn v2_decimal_39_nullable() -> Result<()> { - test_pyarrow_integration("decimal_39", 2, "basic", false, false, None) +fn v2_decimal256_9_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_18_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", false, false, None) +} + +#[test] +fn v2_decimal256_26_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", false, false, None) } #[test] @@ -482,13 +512,33 @@ fn v2_decimal_26_required_dict() -> Result<()> { } #[test] -fn v2_decimal_39_required() -> Result<()> { - test_pyarrow_integration("decimal_39", 2, "basic", false, true, None) +fn v2_decimal256_9_required() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_9_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_9", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_18_required() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_18_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_18", 2, "basic", true, true, None) +} + +#[test] +fn v2_decimal256_26_required() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", false, true, None) } #[test] -fn v2_decimal_39_required_dict() -> Result<()> { - test_pyarrow_integration("decimal_39", 2, "basic", true, true, None) +fn v2_decimal256_26_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_26", 2, "basic", true, true, None) } #[test] diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 3660cf73619..d30eac95118 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -541,9 +541,9 @@ fn decimal_26_required_v1() -> Result<()> { } #[test] -fn decimal_39_optional_v1() -> Result<()> { +fn decimal256_9_optional_v1() -> Result<()> { round_trip( - "decimal_39", + "decimal256_9", "nullable", Version::V1, CompressionOptions::Uncompressed, @@ -552,9 +552,53 @@ fn decimal_39_optional_v1() -> Result<()> { } #[test] -fn decimal_39_required_v1() -> Result<()> { +fn decimal256_9_required_v1() -> Result<()> { round_trip( - "decimal_39", + "decimal256_9", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_optional_v1() -> Result<()> { + round_trip( + "decimal256_18", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_required_v1() -> Result<()> { + round_trip( + "decimal256_18", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_optional_v1() -> Result<()> { + round_trip( + "decimal256_26", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_required_v1() -> Result<()> { + round_trip( + "decimal256_26", "required", Version::V1, CompressionOptions::Uncompressed, @@ -629,9 +673,53 @@ fn decimal_26_required_v2() -> Result<()> { } #[test] -fn decimal_39_optional_v2() -> Result<()> { +fn decimal256_9_optional_v2() -> Result<()> { + round_trip( + "decimal256_9", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_9_required_v2() -> Result<()> { + round_trip( + "decimal256_9", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_optional_v2() -> Result<()> { + round_trip( + "decimal256_18", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_18_required_v2() -> Result<()> { + round_trip( + "decimal256_18", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_26_optional_v2() -> Result<()> { round_trip( - "decimal_39", + "decimal256_26", "nullable", Version::V2, CompressionOptions::Uncompressed, @@ -640,9 +728,9 @@ fn decimal_39_optional_v2() -> Result<()> { } #[test] -fn decimal_39_required_v2() -> Result<()> { +fn decimal256_26_required_v2() -> Result<()> { round_trip( - "decimal_39", + "decimal256_26", "required", Version::V2, CompressionOptions::Uncompressed, From ed8efa0de6923ae0d701a1a289a4e114cd76fc07 Mon Sep 17 00:00:00 2001 From: taichong Date: Tue, 28 Feb 2023 21:59:23 +0800 Subject: [PATCH 6/9] add decimal256 precision>38 --- parquet_integration/write_parquet.py | 4 +++ src/io/parquet/read/schema/convert.rs | 1 - src/io/parquet/read/schema/mod.rs | 1 - src/io/parquet/read/statistics/mod.rs | 7 +---- src/io/parquet/write/schema.rs | 2 +- tests/it/io/parquet/mod.rs | 24 +++++++++++++++ tests/it/io/parquet/read.rs | 25 +++++++++++++++ tests/it/io/parquet/write.rs | 44 +++++++++++++++++++++++++++ 8 files changed, 99 insertions(+), 9 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index ede8e9de591..d6e6dfe1b51 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -35,6 +35,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_9", pa.decimal256(9, 0)), pa.field("decimal256_18", pa.decimal256(18, 0)), pa.field("decimal256_26", pa.decimal256(26, 0)), + pa.field("decimal256_39", pa.decimal256(39, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), @@ -56,6 +57,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal256_9": decimal, "decimal256_18": decimal, "decimal256_26": decimal, + "decimal256_39": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -91,6 +93,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False), pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False), pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False), + pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False), ] schema = pa.schema(fields) @@ -108,6 +111,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal256_9": decimal, "decimal256_18": decimal, "decimal256_26": decimal, + "decimal256_39": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 20fc0d50206..821d5107649 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -12,7 +12,6 @@ use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { - println!("in parquet_to_arrow_schema"); fields.iter().filter_map(to_field).collect::>() } diff --git a/src/io/parquet/read/schema/mod.rs b/src/io/parquet/read/schema/mod.rs index cb1661760bb..d47055ef6aa 100644 --- a/src/io/parquet/read/schema/mod.rs +++ b/src/io/parquet/read/schema/mod.rs @@ -21,7 +21,6 @@ use self::metadata::parse_key_value_metadata; /// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, /// indicating that that the file's arrow metadata was incorrectly written. pub fn infer_schema(file_metadata: &FileMetaData) -> Result { - println!("in infer_schema"); let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); let schema = read_schema_from_metadata(&mut metadata)?; diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 8a81074e8f4..27f137f0f34 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -522,20 +522,15 @@ fn push( primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into())))) } ParquetPhysicalType::Int64 => { - println!("in push static Decimal256"); primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into())))) } ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => { - println!("in push FixedLenByteArray n {:?}", n); fixlen::push_i256_with_i128(from, *n, min, max) } ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), )), - ParquetPhysicalType::FixedLenByteArray(n) => { - println!("in push FixedLenByteArray n {:?}", n); - fixlen::push_i256(from, *n, min, max) - } + ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), _ => unreachable!(), }, Binary => binary::push::(from, min, max), diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index e5d30d6fa65..55ed0de795c 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -335,7 +335,7 @@ pub fn to_parquet_type(field: &Field) -> Result { PhysicalType::FixedLenByteArray(len), repetition, None, - logical_type, + None, None, )?) } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7a4a30d56b4..f12b1352995 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -551,6 +551,13 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) } + "decimal256_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } "timestamp_us" => Box::new( PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), @@ -667,6 +674,16 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(26, 0)), ), }, + "decimal256_39" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(39, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)), + ), + }, "timestamp_us" => Statistics { distinct_count: UInt64Array::from([None]).boxed(), null_count: UInt64Array::from([Some(3)]).boxed(), @@ -768,6 +785,13 @@ pub fn pyarrow_required(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(26, 0))) } + "decimal256_39" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 17bdd2b9f6f..58e4c685bc2 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -431,6 +431,16 @@ fn v1_decimal256_26_required() -> Result<()> { test_pyarrow_integration("decimal256_26", 1, "basic", false, true, None) } +#[test] +fn v1_decimal256_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_39", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_39_required() -> Result<()> { + test_pyarrow_integration("decimal256_39", 1, "basic", false, true, None) +} + #[test] fn v2_decimal_9_nullable() -> Result<()> { test_pyarrow_integration("decimal_9", 2, "basic", false, false, None) @@ -481,6 +491,11 @@ fn v2_decimal256_26_nullable() -> Result<()> { test_pyarrow_integration("decimal256_26", 2, "basic", false, false, None) } +#[test] +fn v2_decimal256_39_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", false, false, None) +} + #[test] fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None) @@ -541,6 +556,16 @@ fn v2_decimal256_26_required_dict() -> Result<()> { test_pyarrow_integration("decimal256_26", 2, "basic", true, true, None) } +#[test] +fn v2_decimal256_39_required() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_39_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_39", 2, "basic", true, true, None) +} + #[test] fn v1_struct_required_optional() -> Result<()> { test_pyarrow_integration("struct", 1, "struct", false, false, None) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index d30eac95118..862d4436511 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -606,6 +606,28 @@ fn decimal256_26_required_v1() -> Result<()> { ) } +#[test] +fn decimal256_39_optional_v1() -> Result<()> { + round_trip( + "decimal256_39", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_required_v1() -> Result<()> { + round_trip( + "decimal256_39", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn decimal_9_optional_v2() -> Result<()> { round_trip( @@ -738,6 +760,28 @@ fn decimal256_26_required_v2() -> Result<()> { ) } +#[test] +fn decimal256_39_optional_v2() -> Result<()> { + round_trip( + "decimal256_39", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_39_required_v2() -> Result<()> { + round_trip( + "decimal256_39", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn struct_v1() -> Result<()> { round_trip( From ed93db5dea249d761c4aabb8164d0420eb3ed1a4 Mon Sep 17 00:00:00 2001 From: taichong Date: Wed, 1 Mar 2023 19:35:51 +0800 Subject: [PATCH 7/9] support decimal256(76,0) --- parquet_integration/write_parquet.py | 4 +++ src/io/parquet/read/deserialize/simple.rs | 2 +- src/io/parquet/read/mod.rs | 13 ++++--- src/io/parquet/read/row_group.rs | 3 -- src/io/parquet/read/statistics/fixlen.rs | 8 ++--- src/io/parquet/read/statistics/mod.rs | 2 +- src/io/parquet/write/mod.rs | 4 +-- src/io/parquet/write/schema.rs | 3 +- src/types/native.rs | 10 +++--- tests/it/io/parquet/mod.rs | 24 +++++++++++++ tests/it/io/parquet/read.rs | 25 +++++++++++++ tests/it/io/parquet/write.rs | 44 +++++++++++++++++++++++ 12 files changed, 120 insertions(+), 22 deletions(-) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index d6e6dfe1b51..acfd819d57c 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -36,6 +36,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_18", pa.decimal256(18, 0)), pa.field("decimal256_26", pa.decimal256(26, 0)), pa.field("decimal256_39", pa.decimal256(39, 0)), + pa.field("decimal256_76", pa.decimal256(76, 0)), pa.field("timestamp_us", pa.timestamp("us")), pa.field("timestamp_s", pa.timestamp("s")), pa.field("emoji", pa.utf8()), @@ -58,6 +59,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "decimal256_18": decimal, "decimal256_26": decimal, "decimal256_39": decimal, + "decimal256_76": decimal, "timestamp_us": int64, "timestamp_s": int64, "emoji": emoji, @@ -94,6 +96,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False), pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False), pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False), + pa.field("decimal256_76", pa.decimal256(76, 0), nullable=False), ] schema = pa.schema(fields) @@ -112,6 +115,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "decimal256_18": decimal, "decimal256_26": decimal, "decimal256_39": decimal, + "decimal256_76": decimal, }, schema, f"basic_required_10.parquet", diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 0b1e3926239..b4b614980e8 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -285,7 +285,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( let values = array .values() .chunks_exact(n) - .map(|value: &[u8]| super::super::convert_i256(value, n)) + .map(super::super::convert_i256) .collect::>(); let validity = array.validity().cloned(); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d173d6fc3cd..cb557df86bc 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -83,9 +83,14 @@ fn convert_i128(value: &[u8], n: usize) -> i128 { i128::from_be_bytes(bytes) >> (8 * (16 - n)) } -fn convert_i256(value: &[u8], n: usize) -> i256 { +fn convert_i256(value: &[u8]) -> i256 { let mut bytes = [0u8; 32]; - bytes[..n].copy_from_slice(value); - - i256(i256::from_be_bytes(bytes).0 >> (8 * (32 - n))) + let mut neg_bytes = [255u8; 32]; + if value[0] >= 128 { + neg_bytes[32 - value.len()..].copy_from_slice(value); + i256::from_be_bytes(neg_bytes) + } else { + bytes[32 - value.len()..].copy_from_slice(value); + i256::from_be_bytes(bytes) + } } diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index e475010e606..176c6e83182 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -225,9 +225,6 @@ pub fn to_deserializer<'a>( (columns, types) } else { - for (meta, chunk) in columns.clone() { - println!("the meta is {:?},\nthe chunk is {:?}", meta, chunk); - } let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index 1362d22bb97..04d881da3b1 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -3,9 +3,10 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; use crate::array::*; use crate::error::Result; +use crate::io::parquet::read::convert_i256; use crate::types::{days_ms, i256}; -use super::super::{convert_days_ms, convert_i128, convert_i256}; +use super::super::{convert_days_ms, convert_i128}; pub(super) fn push_i128( from: Option<&dyn ParquetStatistics>, @@ -61,7 +62,6 @@ pub(super) fn push_i256_with_i128( pub(super) fn push_i256( from: Option<&dyn ParquetStatistics>, - n: usize, min: &mut dyn MutableArray, max: &mut dyn MutableArray, ) -> Result<()> { @@ -75,8 +75,8 @@ pub(super) fn push_i256( .unwrap(); let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); - min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i256(x, n)))); - max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i256(x, n)))); + min.push(from.and_then(|s| s.min_value.as_deref().map(convert_i256))); + max.push(from.and_then(|s| s.max_value.as_deref().map(convert_i256))); Ok(()) } diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 27f137f0f34..f3c1ed9e8de 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -530,7 +530,7 @@ fn push( ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented( format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"), )), - ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max), + ParquetPhysicalType::FixedLenByteArray(_) => fixlen::push_i256(from, min, max), _ => unreachable!(), }, Binary => binary::push::(from, min, max), diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 4600fb81cd1..d2723a4808b 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -518,7 +518,7 @@ pub fn array_to_page_simple( ); fixed_len_bytes::array_to_page(&array, options, type_, statistics) } else { - let size = decimal_length_from_precision(precision); + let size = 32; let array = array .as_any() .downcast_ref::>() @@ -532,7 +532,7 @@ pub fn array_to_page_simple( }; let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { - let bytes = &x.to_be_bytes()[32 - size..]; + let bytes = &x.to_be_bytes(); values.extend_from_slice(bytes) }); let array = FixedSizeBinaryArray::new( diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 55ed0de795c..48dd853ea43 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -329,10 +329,9 @@ pub fn to_parquet_type(field: &Field) -> Result { None, )?) } else { - let len = decimal_length_from_precision(precision); Ok(ParquetType::try_from_primitive( name, - PhysicalType::FixedLenByteArray(len), + PhysicalType::FixedLenByteArray(32), repetition, None, None, diff --git a/src/types/native.rs b/src/types/native.rs index 40b79bcf1a2..f66ceb8403e 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -577,14 +577,14 @@ impl NativeType for i256 { let mut bytes = [0u8; 32]; let (a, b) = self.0.into_words(); - let b = b.to_be_bytes(); + let a = a.to_be_bytes(); (0..16).for_each(|i| { - bytes[i] = b[i]; + bytes[i] = a[i]; }); - let a = a.to_be_bytes(); + let b = b.to_be_bytes(); (0..16).for_each(|i| { - bytes[i + 16] = a[i]; + bytes[i + 16] = b[i]; }); bytes @@ -592,7 +592,7 @@ impl NativeType for i256 { #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { - let (b, a) = bytes.split_at(16); + let (a, b) = bytes.split_at(16); let a: [u8; 16] = a.try_into().unwrap(); let b: [u8; 16] = b.try_into().unwrap(); let a = i128::from_be_bytes(a); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index f12b1352995..cdf5b41573a 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -558,6 +558,13 @@ pub fn pyarrow_nullable(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) } + "decimal256_76" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(76, 0))) + } "timestamp_us" => Box::new( PrimitiveArray::::from(i64_values) .to(DataType::Timestamp(TimeUnit::Microsecond, None)), @@ -684,6 +691,16 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)), ), }, + "decimal256_76" => Statistics { + distinct_count: UInt64Array::from([None]).boxed(), + null_count: UInt64Array::from([Some(3)]).boxed(), + min_value: Box::new( + Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(76, 0)), + ), + max_value: Box::new( + Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(76, 0)), + ), + }, "timestamp_us" => Statistics { distinct_count: UInt64Array::from([None]).boxed(), null_count: UInt64Array::from([Some(3)]).boxed(), @@ -792,6 +809,13 @@ pub fn pyarrow_required(column: &str) -> Box { .collect::>(); Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(39, 0))) } + "decimal256_76" => { + let values = i64_values + .iter() + .map(|x| x.map(|x| i256(x.as_i256()))) + .collect::>(); + Box::new(PrimitiveArray::::from(values).to(DataType::Decimal256(76, 0))) + } _ => unreachable!(), } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 58e4c685bc2..786bdf6f96d 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -441,6 +441,16 @@ fn v1_decimal256_39_required() -> Result<()> { test_pyarrow_integration("decimal256_39", 1, "basic", false, true, None) } +#[test] +fn v1_decimal256_76_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_76", 1, "basic", false, false, None) +} + +#[test] +fn v1_decimal256_76_required() -> Result<()> { + test_pyarrow_integration("decimal256_76", 1, "basic", false, true, None) +} + #[test] fn v2_decimal_9_nullable() -> Result<()> { test_pyarrow_integration("decimal_9", 2, "basic", false, false, None) @@ -496,6 +506,11 @@ fn v2_decimal256_39_nullable() -> Result<()> { test_pyarrow_integration("decimal256_39", 2, "basic", false, false, None) } +#[test] +fn v2_decimal256_76_nullable() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", false, false, None) +} + #[test] fn v1_timestamp_us_nullable() -> Result<()> { test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None) @@ -566,6 +581,16 @@ fn v2_decimal256_39_required_dict() -> Result<()> { test_pyarrow_integration("decimal256_39", 2, "basic", true, true, None) } +#[test] +fn v2_decimal256_76_required() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", false, true, None) +} + +#[test] +fn v2_decimal256_76_required_dict() -> Result<()> { + test_pyarrow_integration("decimal256_76", 2, "basic", true, true, None) +} + #[test] fn v1_struct_required_optional() -> Result<()> { test_pyarrow_integration("struct", 1, "struct", false, false, None) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 862d4436511..439710eb243 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -628,6 +628,28 @@ fn decimal256_39_required_v1() -> Result<()> { ) } +#[test] +fn decimal256_76_optional_v1() -> Result<()> { + round_trip( + "decimal256_76", + "nullable", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_required_v1() -> Result<()> { + round_trip( + "decimal256_76", + "required", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn decimal_9_optional_v2() -> Result<()> { round_trip( @@ -782,6 +804,28 @@ fn decimal256_39_required_v2() -> Result<()> { ) } +#[test] +fn decimal256_76_optional_v2() -> Result<()> { + round_trip( + "decimal256_76", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + +#[test] +fn decimal256_76_required_v2() -> Result<()> { + round_trip( + "decimal256_76", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain], + ) +} + #[test] fn struct_v1() -> Result<()> { round_trip( From a77621dab0bf74f08bdbddf60cc393dbdfebfd43 Mon Sep 17 00:00:00 2001 From: taichong Date: Wed, 1 Mar 2023 21:40:53 +0800 Subject: [PATCH 8/9] fix test_decimal256 --- tests/it/compute/sort/row/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/it/compute/sort/row/mod.rs b/tests/it/compute/sort/row/mod.rs index d82d548623c..4931689a192 100644 --- a/tests/it/compute/sort/row/mod.rs +++ b/tests/it/compute/sort/row/mod.rs @@ -68,12 +68,12 @@ fn test_decimal256() { let col = Int256Array::from_iter([ None, Some(i256::from_words(i128::MIN, i128::MIN)), - Some(i256::from_words(0, -1)), - Some(i256::from_words(i128::MAX, -1)), - Some(i256::from_words(i128::MAX, 0)), Some(i256::from_words(0, 46_i128)), + Some(i256::from_words(0, -1)), Some(i256::from_words(5, 46_i128)), + Some(i256::from_words(i128::MAX, 0)), Some(i256::from_words(i128::MAX, i128::MAX)), + Some(i256::from_words(i128::MAX, -1)), ]) .to(DataType::Decimal256(76, 7)) .to_boxed(); From 0ec7826a315cdb0fe7e7593f7523ff73ce42e210 Mon Sep 17 00:00:00 2001 From: taichong Date: Thu, 2 Mar 2023 09:37:38 +0800 Subject: [PATCH 9/9] fix review conversation --- src/io/parquet/read/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index cb557df86bc..baaffd6d446 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -84,12 +84,12 @@ fn convert_i128(value: &[u8], n: usize) -> i128 { } fn convert_i256(value: &[u8]) -> i256 { - let mut bytes = [0u8; 32]; - let mut neg_bytes = [255u8; 32]; if value[0] >= 128 { + let mut neg_bytes = [255u8; 32]; neg_bytes[32 - value.len()..].copy_from_slice(value); i256::from_be_bytes(neg_bytes) } else { + let mut bytes = [0u8; 32]; bytes[32 - value.len()..].copy_from_slice(value); i256::from_be_bytes(bytes) }