From 3f7b937f1954bd6cdc4233a77e493d608b1d0df1 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sat, 5 Feb 2022 10:24:27 +0100 Subject: [PATCH] Added support for parquet timestamps from dictionaries (#809) --- src/io/parquet/read/deserialize/mod.rs | 348 ++++++++++++++++--------- tests/it/io/parquet/read.rs | 5 + 2 files changed, 232 insertions(+), 121 deletions(-) diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index a310980e617..aff31d43998 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -30,6 +30,19 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( ) -> Result> { use DataType::*; + let (physical_type, logical_type) = if let ParquetType::PrimitiveType { + physical_type, + logical_type, + .. + } = type_ + { + (physical_type, logical_type) + } else { + return Err(ArrowError::InvalidArgumentError( + "page_iter_to_arrays can only be called with a parquet primitive type".into(), + )); + }; + Ok(match data_type.to_logical_type() { Null => null::iter_to_arrays(pages, data_type, chunk_size), Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), @@ -74,72 +87,73 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Timestamp(time_unit, None) => { let time_unit = *time_unit; - return timestamp(pages, type_, data_type, chunk_size, time_unit); + return timestamp( + pages, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); } FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), - Decimal(_, _) => match type_ { - ParquetType::PrimitiveType { physical_type, .. } => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i32| x as i128, - ))), - PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( - pages, - data_type, - chunk_size, - read_item, - |x: i64| x as i128, - ))), - &PhysicalType::FixedLenByteArray(n) if n > 16 => { - return Err(ArrowError::NotYetImplemented(format!( - "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", - n - ))) - } - &PhysicalType::FixedLenByteArray(n) => { - let n = n as usize; - - let pages = fixed_size_binary::Iter::new( - pages, - DataType::FixedSizeBinary(n), - chunk_size, - ); - - let pages = pages.map(move |maybe_array| { - let array = maybe_array?; - let values = array - .values() - .chunks_exact(n) - .map(|value: &[u8]| { - // Copy the fixed-size byte value to the start of a 16 byte stack - // allocated buffer, then use an arithmetic right shift to fill in - // MSBs, which accounts for leading 1's in negative (two's complement) - // values. - let mut bytes = [0u8; 16]; - bytes[..n].copy_from_slice(value); - i128::from_be_bytes(bytes) >> (8 * (16 - n)) - }) - .collect::>(); - let validity = array.validity().cloned(); - - Ok(PrimitiveArray::::from_data( - data_type.clone(), - values.into(), - validity, - )) - }); - - let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); - - Box::new(arrays) as _ - } - _ => unreachable!(), - }, + Decimal(_, _) => match physical_type { + PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i32| x as i128, + ))), + PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + chunk_size, + read_item, + |x: i64| x as i128, + ))), + &PhysicalType::FixedLenByteArray(n) if n > 16 => { + return Err(ArrowError::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", + n + ))) + } + &PhysicalType::FixedLenByteArray(n) => { + let n = n as usize; + + let pages = + fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let values = array + .values() + .chunks_exact(n) + .map(|value: &[u8]| { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) + }) + .collect::>(); + let validity = array.validity().cloned(); + + Ok(PrimitiveArray::::from_data( + data_type.clone(), + values.into(), + validity, + )) + }); + + let arrays = pages.map(|x| x.map(|x| Arc::new(x) as Arc)); + + Box::new(arrays) as _ + } _ => unreachable!(), }, @@ -185,7 +199,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Dictionary(key_type, _, _) => { return match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, type_, data_type, chunk_size) + dict_read::<$K, _>(pages, physical_type, logical_type, data_type, chunk_size) }) } @@ -200,22 +214,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( fn timestamp<'a, I: 'a + DataPages>( pages: I, - type_: &ParquetType, + physical_type: &PhysicalType, + logical_type: &Option, data_type: DataType, chunk_size: usize, time_unit: TimeUnit, ) -> Result> { - let (physical_type, logical_type) = if let ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } = type_ - { - (physical_type, logical_type) - } else { - unreachable!() - }; - if physical_type == &PhysicalType::Int96 { if time_unit == TimeUnit::Nanosecond { return Ok(dyn_iter(iden(primitive::Iter::new( @@ -264,9 +268,147 @@ fn timestamp<'a, I: 'a + DataPages>( }) } +fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( + pages: I, + physical_type: &PhysicalType, + logical_type: &Option, + data_type: DataType, + chunk_size: usize, + time_unit: TimeUnit, +) -> Result> { + if physical_type == &PhysicalType::Int96 { + if time_unit == TimeUnit::Nanosecond { + return Ok(dyn_iter(primitive::DictIter::::new( + pages, + DataType::Timestamp(TimeUnit::Nanosecond, None), + chunk_size, + int96_to_i64_ns, + ))); + } else { + return Err(ArrowError::nyi( + "Can't decode int96 to timestamp other than ns", + )); + } + }; + + let unit = if let Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) = logical_type { + unit + } else { + return Ok(dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + ))); + }; + + Ok(match (unit, time_unit) { + (ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Second) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000_000_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Millisecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Microsecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x / 1_000, + )) + } + + (ParquetTimeUnit::MILLIS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000_000, + )) + } + (ParquetTimeUnit::MICROS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x * 1_000, + )) + } + (ParquetTimeUnit::NANOS(_), TimeUnit::Nanosecond) => { + dyn_iter(primitive::DictIter::::new( + pages, + data_type, + chunk_size, + |x: i64| x, + )) + } + }) +} + fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( iter: I, - type_: &ParquetType, + physical_type: &PhysicalType, + logical_type: &Option, data_type: DataType, chunk_size: usize, ) -> Result> { @@ -314,53 +456,17 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( }), ), - Timestamp(TimeUnit::Nanosecond, None) => match type_ { - ParquetType::PrimitiveType { + Timestamp(time_unit, None) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, physical_type, logical_type, - .. - } => match (physical_type, logical_type) { - (PhysicalType::Int96, _) => dyn_iter(primitive::DictIter::::new( - iter, - DataType::Timestamp(TimeUnit::Nanosecond, None), - chunk_size, - int96_to_i64_ns, - )), - (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit { - ParquetTimeUnit::MILLIS(_) => { - dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i64| x * 1_000_000, - )) - } - ParquetTimeUnit::MICROS(_) => { - dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i64| x * 1_000, - )) - } - ParquetTimeUnit::NANOS(_) => { - dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i64| x, - )) - } - }, - _ => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: i64| x, - )), - }, - _ => unreachable!(), - }, + data_type, + chunk_size, + time_unit, + ); + } Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_iter( primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 794b9b24160..25975b3b0b2 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -350,6 +350,11 @@ fn v1_timestamp_s_nullable() -> Result<()> { test_pyarrow_integration(11, 1, "basic", false, false, None) } +#[test] +fn v1_timestamp_s_nullable_dict() -> Result<()> { + test_pyarrow_integration(11, 1, "basic", true, false, None) +} + #[test] fn v2_decimal_26_required() -> Result<()> { test_pyarrow_integration(8, 2, "basic", false, true, None)