Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
add decimal256 precision>38
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Feb 28, 2023
1 parent dfa251d commit 2512564
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 9 deletions.
4 changes: 4 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_9", pa.decimal256(9, 0)),
pa.field("decimal256_18", pa.decimal256(18, 0)),
pa.field("decimal256_26", pa.decimal256(26, 0)),
pa.field("decimal256_39", pa.decimal256(39, 0)),
pa.field("timestamp_us", pa.timestamp("us")),
pa.field("timestamp_s", pa.timestamp("s")),
pa.field("emoji", pa.utf8()),
Expand All @@ -56,6 +57,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
"timestamp_us": int64,
"timestamp_s": int64,
"emoji": emoji,
Expand Down Expand Up @@ -91,6 +93,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("decimal256_9", pa.decimal256(9, 0), nullable=False),
pa.field("decimal256_18", pa.decimal256(18, 0), nullable=False),
pa.field("decimal256_26", pa.decimal256(26, 0), nullable=False),
pa.field("decimal256_39", pa.decimal256(39, 0), nullable=False),
]
schema = pa.schema(fields)

Expand All @@ -108,6 +111,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"decimal256_9": decimal,
"decimal256_18": decimal,
"decimal256_26": decimal,
"decimal256_39": decimal,
},
schema,
f"basic_required_10.parquet",
Expand Down
1 change: 0 additions & 1 deletion src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
/// any physical column.
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec<Field> {
println!("in parquet_to_arrow_schema");
fields.iter().filter_map(to_field).collect::<Vec<_>>()
}

Expand Down
1 change: 0 additions & 1 deletion src/io/parquet/read/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use self::metadata::parse_key_value_metadata;
/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded,
/// indicating that that the file's arrow metadata was incorrectly written.
pub fn infer_schema(file_metadata: &FileMetaData) -> Result<Schema> {
println!("in infer_schema");
let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata());

let schema = read_schema_from_metadata(&mut metadata)?;
Expand Down
7 changes: 1 addition & 6 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,20 +522,15 @@ fn push(
primitive::push(from, min, max, |x: i32| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::Int64 => {
println!("in push static Decimal256");
primitive::push(from, min, max, |x: i64| Ok(i256(I256::new(x.into()))))
}
ParquetPhysicalType::FixedLenByteArray(n) if *n <= 16 => {
println!("in push FixedLenByteArray n {:?}", n);
fixlen::push_i256_with_i128(from, *n, min, max)
}
ParquetPhysicalType::FixedLenByteArray(n) if *n > 32 => Err(Error::NotYetImplemented(
format!("Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}"),
)),
ParquetPhysicalType::FixedLenByteArray(n) => {
println!("in push FixedLenByteArray n {:?}", n);
fixlen::push_i256(from, *n, min, max)
}
ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i256(from, *n, min, max),
_ => unreachable!(),
},
Binary => binary::push::<i32>(from, min, max),
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub fn to_parquet_type(field: &Field) -> Result<ParquetType> {
PhysicalType::FixedLenByteArray(len),
repetition,
None,
logical_type,
None,
None,
)?)
}
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,13 @@ pub fn pyarrow_nullable(column: &str) -> Box<dyn Array> {
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(26, 0)))
}
"decimal256_39" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(39, 0)))
}
"timestamp_us" => Box::new(
PrimitiveArray::<i64>::from(i64_values)
.to(DataType::Timestamp(TimeUnit::Microsecond, None)),
Expand Down Expand Up @@ -667,6 +674,16 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics {
Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(26, 0)),
),
},
"decimal256_39" => Statistics {
distinct_count: UInt64Array::from([None]).boxed(),
null_count: UInt64Array::from([Some(3)]).boxed(),
min_value: Box::new(
Int256Array::from_slice([i256(-(256.as_i256()))]).to(DataType::Decimal256(39, 0)),
),
max_value: Box::new(
Int256Array::from_slice([i256(9.as_i256())]).to(DataType::Decimal256(39, 0)),
),
},
"timestamp_us" => Statistics {
distinct_count: UInt64Array::from([None]).boxed(),
null_count: UInt64Array::from([Some(3)]).boxed(),
Expand Down Expand Up @@ -768,6 +785,13 @@ pub fn pyarrow_required(column: &str) -> Box<dyn Array> {
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(26, 0)))
}
"decimal256_39" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(39, 0)))
}
_ => unreachable!(),
}
}
Expand Down
25 changes: 25 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,16 @@ fn v1_decimal256_26_required() -> Result<()> {
test_pyarrow_integration("decimal256_26", 1, "basic", false, true, None)
}

#[test]
fn v1_decimal256_39_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_39", 1, "basic", false, false, None)
}

#[test]
fn v1_decimal256_39_required() -> Result<()> {
test_pyarrow_integration("decimal256_39", 1, "basic", false, true, None)
}

#[test]
fn v2_decimal_9_nullable() -> Result<()> {
test_pyarrow_integration("decimal_9", 2, "basic", false, false, None)
Expand Down Expand Up @@ -481,6 +491,11 @@ fn v2_decimal256_26_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_26", 2, "basic", false, false, None)
}

#[test]
fn v2_decimal256_39_nullable() -> Result<()> {
test_pyarrow_integration("decimal256_39", 2, "basic", false, false, None)
}

#[test]
fn v1_timestamp_us_nullable() -> Result<()> {
test_pyarrow_integration("timestamp_us", 1, "basic", false, false, None)
Expand Down Expand Up @@ -541,6 +556,16 @@ fn v2_decimal256_26_required_dict() -> Result<()> {
test_pyarrow_integration("decimal256_26", 2, "basic", true, true, None)
}

#[test]
fn v2_decimal256_39_required() -> Result<()> {
test_pyarrow_integration("decimal256_39", 2, "basic", false, true, None)
}

#[test]
fn v2_decimal256_39_required_dict() -> Result<()> {
test_pyarrow_integration("decimal256_39", 2, "basic", true, true, None)
}

#[test]
fn v1_struct_required_optional() -> Result<()> {
test_pyarrow_integration("struct", 1, "struct", false, false, None)
Expand Down
44 changes: 44 additions & 0 deletions tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,28 @@ fn decimal256_26_required_v1() -> Result<()> {
)
}

#[test]
fn decimal256_39_optional_v1() -> Result<()> {
round_trip(
"decimal256_39",
"nullable",
Version::V1,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal256_39_required_v1() -> Result<()> {
round_trip(
"decimal256_39",
"required",
Version::V1,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal_9_optional_v2() -> Result<()> {
round_trip(
Expand Down Expand Up @@ -737,6 +759,28 @@ fn decimal256_26_required_v2() -> Result<()> {
)
}

#[test]
fn decimal256_39_optional_v2() -> Result<()> {
round_trip(
"decimal256_39",
"nullable",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn decimal256_39_required_v2() -> Result<()> {
round_trip(
"decimal256_39",
"required",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::Plain],
)
}

#[test]
fn struct_v1() -> Result<()> {
round_trip(
Expand Down

0 comments on commit 2512564

Please sign in to comment.