Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed casting of utf8 <> Timestamp with and without timezone #376

Merged
merged 2 commits into from
Sep 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ bench = false

[dependencies]
num-traits = "0.2"
chrono = "^0.4"
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.5", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }
# We need to Hash values before sending them to an hasher. This
Expand Down Expand Up @@ -84,6 +85,8 @@ full = [
"merge_sort",
"ahash",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
]
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core"]
Expand Down
38 changes: 30 additions & 8 deletions src/compute/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ mod binary_to;
mod boolean_to;
mod dictionary_to;
mod primitive_to;
mod timestamps;
mod utf8_to;

pub use binary_to::*;
pub use boolean_to::*;
pub use dictionary_to::*;
pub use primitive_to::*;
pub use timestamps::*;
pub use utf8_to::*;

/// options defining how Cast kernels behave
Expand Down Expand Up @@ -126,14 +124,16 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {

(Utf8, Date32) => true,
(Utf8, Date64) => true,
(Utf8, Timestamp(TimeUnit::Nanosecond, None)) => true,
(Utf8, Timestamp(TimeUnit::Nanosecond, _)) => true,
(Utf8, LargeUtf8) => true,
(Utf8, _) => is_numeric(to_type),
(LargeUtf8, Date32) => true,
(LargeUtf8, Date64) => true,
(LargeUtf8, Timestamp(TimeUnit::Nanosecond, None)) => true,
(LargeUtf8, Timestamp(TimeUnit::Nanosecond, _)) => true,
(LargeUtf8, Utf8) => true,
(LargeUtf8, _) => is_numeric(to_type),
(Timestamp(_, _), Utf8) => true,
(Timestamp(_, _), LargeUtf8) => true,
(_, Utf8) => is_numeric(from_type) || from_type == &Binary,
(_, LargeUtf8) => is_numeric(from_type) || from_type == &Binary,

Expand Down Expand Up @@ -487,7 +487,10 @@ fn cast_with_options(
LargeUtf8 => Ok(Box::new(utf8_to_large_utf8(
array.as_any().downcast_ref().unwrap(),
))),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_timestamp_ns_dyn::<i32>(array),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_naive_timestamp_ns_dyn::<i32>(array),
Timestamp(TimeUnit::Nanosecond, Some(tz)) => {
utf8_to_timestamp_ns_dyn::<i32>(array, tz.clone())
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand All @@ -508,7 +511,10 @@ fn cast_with_options(
Date64 => utf8_to_date64_dyn::<i64>(array),
Utf8 => utf8_large_to_utf8(array.as_any().downcast_ref().unwrap())
.map(|x| Box::new(x) as Box<dyn Array>),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_timestamp_ns_dyn::<i64>(array),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_naive_timestamp_ns_dyn::<i64>(array),
Timestamp(TimeUnit::Nanosecond, Some(tz)) => {
utf8_to_timestamp_ns_dyn::<i64>(array, tz.clone())
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -537,6 +543,14 @@ fn cast_with_options(
let array = Utf8Array::<i32>::from_trusted_len_iter(iter);
Ok(Box::new(array))
}
Timestamp(from_unit, Some(tz)) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(timestamp_to_utf8::<i32>(from, *from_unit, tz)?))
}
Timestamp(from_unit, None) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(naive_timestamp_to_utf8::<i32>(from, *from_unit)))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -565,6 +579,14 @@ fn cast_with_options(
let array = Utf8Array::<i64>::from_trusted_len_iter(iter);
Ok(Box::new(array))
}
Timestamp(from_unit, Some(tz)) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(timestamp_to_utf8::<i64>(from, *from_unit, tz)?))
}
Timestamp(from_unit, None) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(naive_timestamp_to_utf8::<i64>(from, *from_unit)))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -793,8 +815,8 @@ fn cast_with_options(
}
(Timestamp(_, _), Int64) => primitive_to_same_primitive_dyn::<i64>(array, to_type),
(Int64, Timestamp(_, _)) => primitive_to_same_primitive_dyn::<i64>(array, to_type),
(Timestamp(from_unit, tz1), Timestamp(to_unit, tz2)) if tz1 == tz2 => {
primitive_dyn!(array, timestamp_to_timestamp, *from_unit, *to_unit, tz2)
(Timestamp(from_unit, _), Timestamp(to_unit, tz)) => {
primitive_dyn!(array, timestamp_to_timestamp, *from_unit, *to_unit, tz)
}
(Timestamp(from_unit, _), Date32) => primitive_dyn!(array, timestamp_to_date32, *from_unit),
(Timestamp(from_unit, _), Date64) => primitive_dyn!(array, timestamp_to_date64, *from_unit),
Expand Down
142 changes: 142 additions & 0 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
bitmap::Bitmap,
compute::arity::unary,
datatypes::{DataType, TimeUnit},
error::ArrowError,
temporal_conversions::*,
types::NativeType,
};
Expand Down Expand Up @@ -268,3 +269,144 @@ pub fn timestamp_to_timestamp(
unary(from, |x| (x * (to_size / from_size)), to_type)
}
}

fn timestamp_to_utf8_impl<O: Offset, T: chrono::TimeZone>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone: T,
) -> Utf8Array<O>
where
T::Offset: std::fmt::Display,
{
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ns_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_us_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ms_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_s_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}

#[cfg(feature = "chrono-tz")]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
}

#[cfg(not(feature = "chrono-tz"))]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
_: &PrimitiveArray<i64>,
_: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
)))
}

/// Returns a [`Utf8Array`] where every element is the utf8 representation of the timestamp in the rfc3339 format.
pub fn timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset(timezone_str);

if let Ok(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono::FixedOffset>(
from, time_unit, timezone,
))
} else {
chrono_tz_timestamp_to_utf8(from, time_unit, timezone_str)
}
}

/// Returns a [`Utf8Array`] where every element is the utf8 representation of the timestamp in the rfc3339 format.
pub fn naive_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
) -> Utf8Array<O> {
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ns_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_us_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ms_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_s_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}
Loading