From bd6b059b451a34707b86809d38a6b137740dc74f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 7 Aug 2023 11:06:36 -0700 Subject: [PATCH 1/4] Add correct coercion logic when reading int96 timestamps into specified timeunits --- src/io/parquet/read/deserialize/simple.rs | 55 ++++++++++++++++++----- tests/it/io/parquet/read.rs | 49 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index b4b614980e..32335572c6 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -391,6 +391,44 @@ fn unifiy_timestmap_unit( } } +#[inline] +pub fn int96_to_i64_us(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MICROS_PER_SECOND: i64 = 1_000_000; + + let day = value[2] as i64; + let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + seconds * MICROS_PER_SECOND + microseconds +} + +#[inline] +pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + const MILLIS_PER_SECOND: i64 = 1_000; + + let day = value[2] as i64; + let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000; + let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + seconds * MILLIS_PER_SECOND + milliseconds +} + +#[inline] +pub fn int96_to_i64_s(value: [u32; 3]) -> i64 { + const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; + const SECONDS_PER_DAY: i64 = 86_400; + + let day = value[2] as i64; + let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000; + let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY; + + day_seconds + seconds +} + fn timestamp<'a, I: Pages + 'a>( pages: I, physical_type: &PhysicalType, @@ -401,17 +439,14 @@ fn timestamp<'a, I: Pages + 'a>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns); - let logical_type = PrimitiveLogicalType::Timestamp { - unit: ParquetTimeUnit::Nanoseconds, - is_adjusted_to_utc: false, - }; - let (factor, is_multiplier) = unifiy_timestmap_unit(&Some(logical_type), time_unit); - return match (factor, is_multiplier) { - (1, _) => Ok(dyn_iter(iden(iter))), - (a, true) => Ok(dyn_iter(op(iter, move |x| x * a))), - (a, false) => Ok(dyn_iter(op(iter, move |x| x / a))), + let conversion_op = match time_unit { + TimeUnit::Nanosecond => int96_to_i64_ns, + TimeUnit::Microsecond => int96_to_i64_us, + TimeUnit::Millisecond => int96_to_i64_ms, + TimeUnit::Second => int96_to_i64_s, }; + let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, conversion_op); + return Ok(dyn_iter(iden(iter))) }; if physical_type != &PhysicalType::Int64 { diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 786bdf6f96..95eee74463 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -770,3 +770,52 @@ fn invalid_utf8() -> Result<()> { ); Ok(()) } + +#[test] +fn read_int96_timestamps() -> Result<()> { + use std::collections::BTreeMap; + + let timestamp_data = &[ + 0x50, 0x41, 0x52, 0x31, 0x15, 0x04, 0x15, 0x48, 0x15, 0x3c, 0x4c, 0x15, 0x06, 0x15, 0x00, + 0x12, 0x00, 0x00, 0x24, 0x00, 0x00, 0x0d, 0x01, 0x08, 0x9f, 0xd5, 0x1f, 0x0d, 0x0a, 0x44, + 0x00, 0x00, 0x59, 0x68, 0x25, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, + 0xfb, 0x2a, 0x00, 0x15, 0x00, 0x15, 0x14, 0x15, 0x18, 0x2c, 0x15, 0x06, 0x15, 0x10, 0x15, + 0x06, 0x15, 0x06, 0x1c, 0x00, 0x00, 0x00, 0x0a, 0x24, 0x02, 0x00, 0x00, 0x00, 0x06, 0x01, + 0x02, 0x03, 0x24, 0x00, 0x26, 0x9e, 0x01, 0x1c, 0x15, 0x06, 0x19, 0x35, 0x10, 0x00, 0x06, + 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, + 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, + 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, + 0x15, 0x04, 0x19, 0x2c, 0x35, 0x00, 0x18, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x15, + 0x02, 0x00, 0x15, 0x06, 0x25, 0x02, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x73, 0x00, 0x16, 0x06, 0x19, 0x1c, 0x19, 0x1c, 0x26, 0x9e, 0x01, 0x1c, 0x15, + 0x06, 0x19, 0x35, 0x10, 0x00, 0x06, 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, + 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, + 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96, + 0x01, 0x14, 0x00, 0x00, 0x28, 0x20, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x63, + 0x70, 0x70, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x20, 0x31, 0x32, 0x2e, 0x30, 0x2e, 0x30, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x95, + 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31 + ]; + + let parse = |time_unit: TimeUnit| { + let mut reader = Cursor::new(timestamp_data); + let metadata = read_metadata(&mut reader)?; + let schema = arrow2::datatypes::Schema{ + fields: vec![arrow2::datatypes::Field::new("timestamps", arrow2::datatypes::DataType::Timestamp(time_unit, None), false)], + metadata: BTreeMap::new(), + }; + let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None); + reader.collect::>>() + }; + + // This data contains int96 timestamps in the year 1000 and 3000, which are out of range for + // Timestamp(TimeUnit::Nanoseconds) and will cause a panic in dev builds/overflow in release builds + // However, the code should work for the Microsecond/Millisecond time units + for time_unit in [arrow2::datatypes::TimeUnit::Microsecond, arrow2::datatypes::TimeUnit::Millisecond, arrow2::datatypes::TimeUnit::Second] { + parse(time_unit).expect("Should not error"); + } + std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond)).expect_err("Should be a panic error"); + + Ok(()) +} From 65e6dce611d0f66b3f87b076bb360e7644767e0f Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 10:36:03 -0700 Subject: [PATCH 2/4] Refactor to better inline nested function call in Iter::new --- src/io/parquet/read/deserialize/simple.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 32335572c6..fc40625aa6 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -445,7 +445,12 @@ fn timestamp<'a, I: Pages + 'a>( TimeUnit::Millisecond => int96_to_i64_ms, TimeUnit::Second => int96_to_i64_s, }; - let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, conversion_op); + let iter = match time_unit { + TimeUnit::Nanosecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns), + TimeUnit::Microsecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_us), + TimeUnit::Millisecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ms), + TimeUnit::Second => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_s), + }; return Ok(dyn_iter(iden(iter))) }; From 9805b30673cf76ac634ac9c0b920b88113680688 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 9 Aug 2023 11:08:30 -0700 Subject: [PATCH 3/4] Fix static check issues - immediately wrap and return an ArrayIter object --- src/io/parquet/read/deserialize/simple.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index fc40625aa6..b0106f0f85 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -445,13 +445,12 @@ fn timestamp<'a, I: Pages + 'a>( TimeUnit::Millisecond => int96_to_i64_ms, TimeUnit::Second => int96_to_i64_s, }; - let iter = match time_unit { - TimeUnit::Nanosecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns), - TimeUnit::Microsecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_us), - TimeUnit::Millisecond => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ms), - TimeUnit::Second => primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_s), + return match time_unit { + TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns)))), + TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_us)))), + TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ms)))), + TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_s)))), }; - return Ok(dyn_iter(iden(iter))) }; if physical_type != &PhysicalType::Int64 { From b7497355eb9e0f8b5a86f5af7df69e7a41d21768 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Sat, 2 Sep 2023 10:57:15 -0700 Subject: [PATCH 4/4] Fix lints --- src/io/parquet/read/deserialize/simple.rs | 38 +++++++++++++++++------ tests/it/io/parquet/read.rs | 21 +++++++++---- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index b0106f0f85..d19296a4b7 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -439,17 +439,35 @@ fn timestamp<'a, I: Pages + 'a>( time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let conversion_op = match time_unit { - TimeUnit::Nanosecond => int96_to_i64_ns, - TimeUnit::Microsecond => int96_to_i64_us, - TimeUnit::Millisecond => int96_to_i64_ms, - TimeUnit::Second => int96_to_i64_s, - }; return match time_unit { - TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns)))), - TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_us)))), - TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ms)))), - TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_s)))), + TimeUnit::Nanosecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_ns, + )))), + TimeUnit::Microsecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_us, + )))), + TimeUnit::Millisecond => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_ms, + )))), + TimeUnit::Second => Ok(dyn_iter(iden(primitive::Iter::new( + pages, + data_type, + num_rows, + chunk_size, + int96_to_i64_s, + )))), }; }; diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 95eee74463..2e155d5cae 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -791,18 +791,22 @@ fn read_int96_timestamps() -> Result<()> { 0x06, 0x19, 0x35, 0x10, 0x00, 0x06, 0x19, 0x18, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x73, 0x15, 0x02, 0x16, 0x06, 0x16, 0x9e, 0x01, 0x16, 0x96, 0x01, 0x26, 0x60, 0x26, 0x08, 0x29, 0x2c, 0x15, 0x04, 0x15, 0x00, 0x15, 0x02, 0x00, 0x15, 0x00, 0x15, - 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96, + 0x10, 0x15, 0x02, 0x00, 0x00, 0x00, 0x16, 0x9e, 0x01, 0x16, 0x06, 0x26, 0x08, 0x16, 0x96, 0x01, 0x14, 0x00, 0x00, 0x28, 0x20, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x2d, 0x63, 0x70, 0x70, 0x2d, 0x61, 0x72, 0x72, 0x6f, 0x77, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x31, 0x32, 0x2e, 0x30, 0x2e, 0x30, 0x19, 0x1c, 0x1c, 0x00, 0x00, 0x00, 0x95, - 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31 + 0x00, 0x00, 0x00, 0x50, 0x41, 0x52, 0x31, ]; let parse = |time_unit: TimeUnit| { let mut reader = Cursor::new(timestamp_data); let metadata = read_metadata(&mut reader)?; - let schema = arrow2::datatypes::Schema{ - fields: vec![arrow2::datatypes::Field::new("timestamps", arrow2::datatypes::DataType::Timestamp(time_unit, None), false)], + let schema = arrow2::datatypes::Schema { + fields: vec![arrow2::datatypes::Field::new( + "timestamps", + arrow2::datatypes::DataType::Timestamp(time_unit, None), + false, + )], metadata: BTreeMap::new(), }; let reader = FileReader::new(reader, metadata.row_groups, schema, Some(5), None, None); @@ -812,10 +816,15 @@ fn read_int96_timestamps() -> Result<()> { // This data contains int96 timestamps in the year 1000 and 3000, which are out of range for // Timestamp(TimeUnit::Nanoseconds) and will cause a panic in dev builds/overflow in release builds // However, the code should work for the Microsecond/Millisecond time units - for time_unit in [arrow2::datatypes::TimeUnit::Microsecond, arrow2::datatypes::TimeUnit::Millisecond, arrow2::datatypes::TimeUnit::Second] { + for time_unit in [ + arrow2::datatypes::TimeUnit::Microsecond, + arrow2::datatypes::TimeUnit::Millisecond, + arrow2::datatypes::TimeUnit::Second, + ] { parse(time_unit).expect("Should not error"); } - std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond)).expect_err("Should be a panic error"); + std::panic::catch_unwind(|| parse(arrow2::datatypes::TimeUnit::Nanosecond)) + .expect_err("Should be a panic error"); Ok(()) }