From e91b0af2c6f2ebfebd069607bdab813f44319eaf Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 22 Nov 2021 17:30:41 +0000 Subject: [PATCH] Added support to write timestamps with timezones. --- src/io/csv/write/serialize.rs | 167 +++++++++++++++++++++---- src/temporal_conversions.rs | 14 +-- tests/it/io/csv/write.rs | 227 +++++++++++++++++++++++++++------- 3 files changed, 328 insertions(+), 80 deletions(-) diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index a8577a644cd..959c1f56b1b 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -15,30 +15,19 @@ use crate::array::{DictionaryArray, DictionaryKey, Offset}; use std::any::Any; /// Options to serialize logical types to CSV -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +/// The default is to format times and dates as `chrono` crate formats them. +#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)] pub struct SerializeOptions { /// used for [`DataType::Date32`] - pub date32_format: String, + pub date32_format: Option, /// used for [`DataType::Date64`] - pub date64_format: String, + pub date64_format: Option, /// used for [`DataType::Time32`] - pub time32_format: String, + pub time32_format: Option, /// used for [`DataType::Time64`] - pub time64_format: String, + pub time64_format: Option, /// used for [`DataType::Timestamp`] - pub timestamp_format: String, -} - -impl Default for SerializeOptions { - fn default() -> Self { - Self { - date32_format: "%F".to_string(), - date64_format: "%F".to_string(), - time32_format: "%T".to_string(), - time64_format: "%T".to_string(), - timestamp_format: "%FT%H:%M:%S.%9f".to_string(), - } - } + pub timestamp_format: Option, } fn primitive_write<'a, T: NativeType + ToLexical>( @@ -68,16 +57,134 @@ macro_rules! dyn_date { .as_any() .downcast_ref::>() .unwrap(); - Box::new(BufStreamingIterator::new( + if let Some(format) = $format { + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(($fn)(*x).format(format).to_string().as_bytes()) + } + }, + vec![], + )) + } else { + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(($fn)(*x).to_string().as_bytes()) + } + }, + vec![], + )) + } + }}; +} + +fn timestamp_with_tz_default<'a>( + array: &'a PrimitiveArray, + time_unit: TimeUnit, + tz: &str, +) -> Result + 'a>> { + let timezone = temporal_conversions::parse_offset(tz); + Ok(match timezone { + Ok(timezone) => Box::new(BufStreamingIterator::new( array.iter(), move |x, buf| { if let Some(x) = x { - buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes()) + let data = + temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone) + .to_string(); + buf.extend_from_slice(data.as_bytes()) } }, vec![], - )) - }}; + )), + #[cfg(feature = "chrono-tz")] + _ => { + let timezone = temporal_conversions::parse_offset_tz(tz)?; + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + let data = + temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone) + .to_string(); + buf.extend_from_slice(data.as_bytes()) + } + }, + vec![], + )) + } + #[cfg(not(feature = "chrono-tz"))] + _ => { + return Err(crate::error::ArrowError::InvalidArgumentError( + "Invalid Offset format (must be [-]00:00) or chrono-tz feature not active" + .to_string(), + )) + } + }) +} + +fn timestamp_with_tz_with_format<'a>( + array: &'a PrimitiveArray, + time_unit: TimeUnit, + tz: &str, + format: &'a str, +) -> Result + 'a>> { + let timezone = temporal_conversions::parse_offset(tz); + Ok(match timezone { + Ok(timezone) => Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + let data = + temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone) + .format(format) + .to_string(); + buf.extend_from_slice(data.as_bytes()) + } + }, + vec![], + )), + #[cfg(feature = "chrono-tz")] + _ => { + let timezone = temporal_conversions::parse_offset_tz(tz)?; + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + let data = + temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone) + .format(format) + .to_string(); + buf.extend_from_slice(data.as_bytes()) + } + }, + vec![], + )) + } + #[cfg(not(feature = "chrono-tz"))] + _ => { + return Err(crate::error::ArrowError::InvalidArgumentError( + "Invalid Offset format (must be [-]00:00) or chrono-tz feature not active" + .to_string(), + )) + } + }) +} + +fn timestamp_with_tz<'a>( + array: &'a PrimitiveArray, + time_unit: TimeUnit, + tz: &str, + format: Option<&'a str>, +) -> Result + 'a>> { + if let Some(format) = format { + timestamp_with_tz_with_format(array, time_unit, tz, format) + } else { + timestamp_with_tz_default(array, time_unit, tz) + } } /// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`. @@ -136,7 +243,7 @@ pub fn new_serializer<'a>( i32, temporal_conversions::date32_to_datetime, array, - &options.date32_format + options.date32_format.as_ref() ) } DataType::Time32(TimeUnit::Second) => { @@ -144,7 +251,7 @@ pub fn new_serializer<'a>( i32, temporal_conversions::time32s_to_time, array, - &options.time32_format + options.time32_format.as_ref() ) } DataType::Time32(TimeUnit::Millisecond) => { @@ -152,7 +259,7 @@ pub fn new_serializer<'a>( i32, temporal_conversions::time32ms_to_time, array, - &options.time32_format + options.time32_format.as_ref() ) } DataType::Int64 => { @@ -163,7 +270,7 @@ pub fn new_serializer<'a>( i64, temporal_conversions::date64_to_datetime, array, - &options.date64_format + options.date64_format.as_ref() ) } DataType::Time64(TimeUnit::Microsecond) => { @@ -214,6 +321,14 @@ pub fn new_serializer<'a>( &options.timestamp_format ) } + DataType::Timestamp(time_unit, Some(tz)) => { + return timestamp_with_tz( + array.as_any().downcast_ref().unwrap(), + *time_unit, + tz.as_ref(), + options.timestamp_format.as_ref().map(|x| x.as_ref()), + ) + } DataType::Float32 => { dyn_primitive!(f32, array) } diff --git a/src/temporal_conversions.rs b/src/temporal_conversions.rs index d66f068f357..472d979185a 100644 --- a/src/temporal_conversions.rs +++ b/src/temporal_conversions.rs @@ -63,17 +63,15 @@ pub fn time32s_to_time(v: i32) -> NaiveTime { NaiveTime::from_num_seconds_from_midnight(v as u32, 0) } -/// converts a `i32` representing a `time32(ms)` to [`NaiveDateTime`] +/// converts a `i32` representing a `time32(ms)` to [`NaiveTime`] #[inline] pub fn time32ms_to_time(v: i32) -> NaiveTime { let v = v as i64; - NaiveTime::from_num_seconds_from_midnight( - // extract seconds from milliseconds - (v / MILLISECONDS) as u32, - // discard extracted seconds and convert milliseconds to - // nanoseconds - (v % MILLISECONDS * MICROSECONDS) as u32, - ) + let seconds = v / MILLISECONDS; + + let milli_to_nano = 1_000_000; + let nano = (v - seconds * MILLISECONDS) * milli_to_nano; + NaiveTime::from_num_seconds_from_midnight(seconds as u32, nano as u32) } /// converts a `i64` representing a `time64(us)` to [`NaiveDateTime`] diff --git a/tests/it/io/csv/write.rs b/tests/it/io/csv/write.rs index 8bcd10cd829..2291ab69069 100644 --- a/tests/it/io/csv/write.rs +++ b/tests/it/io/csv/write.rs @@ -8,25 +8,7 @@ use arrow2::io::csv::write::*; use arrow2::record_batch::RecordBatch; fn data() -> RecordBatch { - let schema = Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::Float64, true), - Field::new("c3", DataType::UInt32, false), - Field::new("c4", DataType::Boolean, true), - Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true), - Field::new("c6", DataType::Time32(TimeUnit::Second), false), - Field::new( - "c7", - DataType::Dictionary(u32::KEY_TYPE, Box::new(DataType::Utf8)), - false, - ), - ]); - - let c1 = Utf8Array::::from_slice([ - "Lorem ipsum dolor sit amet", - "consectetur adipiscing elit", - "sed do eiusmod tempor", - ]); + let c1 = Utf8Array::::from_slice(["a b", "c", "d"]); let c2 = Float64Array::from([Some(123.564532), None, Some(-556132.25)]); let c3 = UInt32Array::from_slice(&[3, 2, 1]); let c4 = BooleanArray::from(&[Some(true), Some(false), None]); @@ -37,18 +19,15 @@ fn data() -> RecordBatch { let keys = UInt32Array::from_slice(&[2, 0, 1]); let c7 = DictionaryArray::from_data(keys, Arc::new(c1.clone())); - RecordBatch::try_new( - Arc::new(schema), - vec![ - Arc::new(c1), - Arc::new(c2), - Arc::new(c3), - Arc::new(c4), - Arc::new(c5), - Arc::new(c6), - Arc::new(c7), - ], - ) + RecordBatch::try_from_iter(vec![ + ("c1", Arc::new(c1) as Arc), + ("c2", Arc::new(c2) as Arc), + ("c3", Arc::new(c3) as Arc), + ("c4", Arc::new(c4) as Arc), + ("c5", Arc::new(c5) as Arc), + ("c6", Arc::new(c6) as Arc), + ("c7", Arc::new(c7) as Arc), + ]) .unwrap() } @@ -60,22 +39,16 @@ fn write_csv() -> Result<()> { let mut writer = WriterBuilder::new().from_writer(write); write_header(&mut writer, batch.schema())?; - let batches = vec![&batch, &batch]; let options = SerializeOptions::default(); - batches - .iter() - .try_for_each(|batch| write_batch(&mut writer, batch, &options))?; + write_batch(&mut writer, &batch, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); assert_eq!( r#"c1,c2,c3,c4,c5,c6,c7 -Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,sed do eiusmod tempor -consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,Lorem ipsum dolor sit amet -sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,consectetur adipiscing elit -Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,sed do eiusmod tempor -consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,Lorem ipsum dolor sit amet -sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,consectetur adipiscing elit +a b,123.564532,3,true,,00:20:34,d +c,,2,false,2019-04-18 10:54:47.378,06:51:20,a b +d,-556132.25,1,,2019-04-18 02:45:55.555,23:46:03,c "# .to_string(), String::from_utf8(buffer).unwrap(), @@ -91,8 +64,8 @@ fn write_csv_custom_options() -> Result<()> { let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); let options = SerializeOptions { - time32_format: "%r".to_string(), - time64_format: "%r".to_string(), + time32_format: Some("%r".to_string()), + time64_format: Some("%r".to_string()), ..Default::default() }; write_batch(&mut writer, &batch, &options)?; @@ -100,12 +73,174 @@ fn write_csv_custom_options() -> Result<()> { // check let buffer = writer.into_inner().unwrap().into_inner(); assert_eq!( - r#"Lorem ipsum dolor sit amet|123.564532|3|true||12:20:34 AM|sed do eiusmod tempor -consectetur adipiscing elit||2|false|2019-04-18T10:54:47.378000000|06:51:20 AM|Lorem ipsum dolor sit amet -sed do eiusmod tempor|-556132.25|1||2019-04-18T02:45:55.555000000|11:46:03 PM|consectetur adipiscing elit + r#"a b|123.564532|3|true||12:20:34 AM|d +c||2|false|2019-04-18 10:54:47.378|06:51:20 AM|a b +d|-556132.25|1||2019-04-18 02:45:55.555|11:46:03 PM|c "# .to_string(), String::from_utf8(buffer).unwrap(), ); Ok(()) } + +fn data_array(column: usize) -> (RecordBatch, Vec<&'static str>) { + let (array, expected) = match column { + 0 => ( + Arc::new(Utf8Array::::from_slice(["a b", "c", "d"])) as Arc, + vec!["a b", "c", "d"], + ), + 1 => ( + Arc::new(BinaryArray::::from_slice(["a b", "c", "d"])) as Arc, + vec!["a b", "c", "d"], + ), + 2 => ( + Arc::new(BinaryArray::::from_slice(["a b", "c", "d"])) as Arc, + vec!["a b", "c", "d"], + ), + 3 => ( + Arc::new(Int8Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 4 => ( + Arc::new(Int16Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 5 => ( + Arc::new(Int32Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 6 => ( + Arc::new(Int64Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 7 => ( + Arc::new(UInt64Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 8 => ( + Arc::new(UInt64Array::from_slice(&[3, 2, 1])) as Arc, + vec!["3", "2", "1"], + ), + 9 => { + let array = PrimitiveArray::::from_slice(&[1_234_001, 24_680_001, 85_563_001]) + .to(DataType::Time32(TimeUnit::Millisecond)); + ( + Arc::new(array) as Arc, + vec!["00:20:34.001", "06:51:20.001", "23:46:03.001"], + ) + } + 10 => { + let array = + PrimitiveArray::::from_slice(&[1_234_000_001, 24_680_000_001, 85_563_000_001]) + .to(DataType::Time64(TimeUnit::Microsecond)); + ( + Arc::new(array) as Arc, + vec!["00:20:34.000001", "06:51:20.000001", "23:46:03.000001"], + ) + } + 11 => { + let array = PrimitiveArray::::from_slice(&[ + 1_234_000_000_001, + 24_680_000_000_001, + 85_563_000_000_001, + ]) + .to(DataType::Time64(TimeUnit::Nanosecond)); + ( + Arc::new(array) as Arc, + vec![ + "00:20:34.000000001", + "06:51:20.000000001", + "23:46:03.000000001", + ], + ) + } + 12 => { + let array = PrimitiveArray::::from_slice([ + 1_555_584_887_378_000_001, + 1_555_555_555_555_000_001, + ]) + .to(DataType::Timestamp(TimeUnit::Nanosecond, None)); + ( + Arc::new(array) as Arc, + vec![ + "2019-04-18 10:54:47.378000001", + "2019-04-18 02:45:55.555000001", + ], + ) + } + 13 => { + let array = PrimitiveArray::::from_slice([ + 1_555_584_887_378_000_001, + 1_555_555_555_555_000_001, + ]) + .to(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+01:00".to_string()), + )); + ( + Arc::new(array) as Arc, + vec![ + "2019-04-18 11:54:47.378000001 +01:00", + "2019-04-18 03:45:55.555000001 +01:00", + ], + ) + } + 14 => { + let array = PrimitiveArray::::from_slice([ + 1_555_584_887_378_000_001, + 1_555_555_555_555_000_001, + ]) + .to(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("Europe/Lisbon".to_string()), + )); + ( + Arc::new(array) as Arc, + vec![ + "2019-04-18 11:54:47.378000001 WEST", + "2019-04-18 03:45:55.555000001 WEST", + ], + ) + } + _ => todo!(), + }; + + ( + RecordBatch::try_from_iter(vec![("c1", array)]).unwrap(), + expected, + ) +} + +fn write_single(column: usize) -> Result<()> { + let (batch, data) = data_array(column); + + let write = Cursor::new(Vec::::new()); + let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); + + write_header(&mut writer, batch.schema())?; + let options = SerializeOptions::default(); + write_batch(&mut writer, &batch, &options)?; + + // check + let buffer = writer.into_inner().unwrap().into_inner(); + + let mut expected = "c1\n".to_owned(); + expected.push_str(&data.join("\n")); + expected.push('\n'); + assert_eq!(expected, String::from_utf8(buffer).unwrap(),); + Ok(()) +} + +#[test] +fn write_each() -> Result<()> { + for i in 0..=13 { + write_single(i)?; + } + Ok(()) +} + +#[test] +#[cfg(feature = "chrono-tz")] +fn write_tz_timezone() -> Result<()> { + write_single(14) +}