Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write timestamps with timezones for CSV #623

Merged
merged 1 commit into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 141 additions & 26 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,19 @@ use crate::array::{DictionaryArray, DictionaryKey, Offset};
use std::any::Any;

/// Options to serialize logical types to CSV
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
pub struct SerializeOptions {
/// used for [`DataType::Date32`]
pub date32_format: String,
pub date32_format: Option<String>,
/// used for [`DataType::Date64`]
pub date64_format: String,
pub date64_format: Option<String>,
/// used for [`DataType::Time32`]
pub time32_format: String,
pub time32_format: Option<String>,
/// used for [`DataType::Time64`]
pub time64_format: String,
pub time64_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub timestamp_format: String,
}

impl Default for SerializeOptions {
fn default() -> Self {
Self {
date32_format: "%F".to_string(),
date64_format: "%F".to_string(),
time32_format: "%T".to_string(),
time64_format: "%T".to_string(),
timestamp_format: "%FT%H:%M:%S.%9f".to_string(),
}
}
pub timestamp_format: Option<String>,
}

fn primitive_write<'a, T: NativeType + ToLexical>(
Expand Down Expand Up @@ -68,16 +57,134 @@ macro_rules! dyn_date {
.as_any()
.downcast_ref::<PrimitiveArray<$ty>>()
.unwrap();
Box::new(BufStreamingIterator::new(
if let Some(format) = $format {
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).format(format).to_string().as_bytes())
}
},
vec![],
))
} else {
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).to_string().as_bytes())
}
},
vec![],
))
}
}};
}

fn timestamp_with_tz_default<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let timezone = temporal_conversions::parse_offset(tz);
Ok(match timezone {
Ok(timezone) => Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes())
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}};
)),
#[cfg(feature = "chrono-tz")]
_ => {
let timezone = temporal_conversions::parse_offset_tz(tz)?;
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => {
return Err(crate::error::ArrowError::InvalidArgumentError(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
.to_string(),
))
}
})
}

fn timestamp_with_tz_with_format<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
format: &'a str,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let timezone = temporal_conversions::parse_offset(tz);
Ok(match timezone {
Ok(timezone) => Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.format(format)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
)),
#[cfg(feature = "chrono-tz")]
_ => {
let timezone = temporal_conversions::parse_offset_tz(tz)?;
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let data =
temporal_conversions::timestamp_to_datetime(*x, time_unit, &timezone)
.format(format)
.to_string();
buf.extend_from_slice(data.as_bytes())
}
},
vec![],
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => {
return Err(crate::error::ArrowError::InvalidArgumentError(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
.to_string(),
))
}
})
}

fn timestamp_with_tz<'a>(
array: &'a PrimitiveArray<i64>,
time_unit: TimeUnit,
tz: &str,
format: Option<&'a str>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
if let Some(format) = format {
timestamp_with_tz_with_format(array, time_unit, tz, format)
} else {
timestamp_with_tz_default(array, time_unit, tz)
}
}

/// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`.
Expand Down Expand Up @@ -136,23 +243,23 @@ pub fn new_serializer<'a>(
i32,
temporal_conversions::date32_to_datetime,
array,
&options.date32_format
options.date32_format.as_ref()
)
}
DataType::Time32(TimeUnit::Second) => {
dyn_date!(
i32,
temporal_conversions::time32s_to_time,
array,
&options.time32_format
options.time32_format.as_ref()
)
}
DataType::Time32(TimeUnit::Millisecond) => {
dyn_date!(
i32,
temporal_conversions::time32ms_to_time,
array,
&options.time32_format
options.time32_format.as_ref()
)
}
DataType::Int64 => {
Expand All @@ -163,7 +270,7 @@ pub fn new_serializer<'a>(
i64,
temporal_conversions::date64_to_datetime,
array,
&options.date64_format
options.date64_format.as_ref()
)
}
DataType::Time64(TimeUnit::Microsecond) => {
Expand Down Expand Up @@ -214,6 +321,14 @@ pub fn new_serializer<'a>(
&options.timestamp_format
)
}
DataType::Timestamp(time_unit, Some(tz)) => {
return timestamp_with_tz(
array.as_any().downcast_ref().unwrap(),
*time_unit,
tz.as_ref(),
options.timestamp_format.as_ref().map(|x| x.as_ref()),
)
}
DataType::Float32 => {
dyn_primitive!(f32, array)
}
Expand Down
14 changes: 6 additions & 8 deletions src/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,15 @@ pub fn time32s_to_time(v: i32) -> NaiveTime {
NaiveTime::from_num_seconds_from_midnight(v as u32, 0)
}

/// converts a `i32` representing a `time32(ms)` to [`NaiveDateTime`]
/// converts a `i32` representing a `time32(ms)` to [`NaiveTime`]
#[inline]
pub fn time32ms_to_time(v: i32) -> NaiveTime {
let v = v as i64;
NaiveTime::from_num_seconds_from_midnight(
// extract seconds from milliseconds
(v / MILLISECONDS) as u32,
// discard extracted seconds and convert milliseconds to
// nanoseconds
(v % MILLISECONDS * MICROSECONDS) as u32,
)
let seconds = v / MILLISECONDS;

let milli_to_nano = 1_000_000;
let nano = (v - seconds * MILLISECONDS) * milli_to_nano;
NaiveTime::from_num_seconds_from_midnight(seconds as u32, nano as u32)
}

/// converts a `i64` representing a `time64(us)` to [`NaiveDateTime`]
Expand Down
Loading