Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to add an interval to a timestamp #417

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 22 additions & 47 deletions src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,56 +66,31 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
dyn_primitive!(array, i64, temporal_conversions::time64ns_to_time)
}
Time64(_) => unreachable!(), // remaining are not valid
Timestamp(TimeUnit::Second, tz) => {
Timestamp(time_unit, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_s_to_datetime(x),
offset,
)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_s_to_datetime)
}
}
Timestamp(TimeUnit::Millisecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_ms_to_datetime(x),
offset,
)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_ms_to_datetime)
}
}
Timestamp(TimeUnit::Microsecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_us_to_datetime(x),
offset,
)
})
let timezone = temporal_conversions::parse_offset(tz);
match timezone {
Ok(timezone) => {
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone)
})
}
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(tz).unwrap();
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone)
})
}
#[cfg(not(feature = "chrono-tz"))]
_ => panic!(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
),
}
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_us_to_datetime)
}
}
Timestamp(TimeUnit::Nanosecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_ns_to_datetime(x),
offset,
)
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_naive_datetime(time, *time_unit)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_ns_to_datetime)
}
}
Interval(IntervalUnit::YearMonth) => {
Expand Down
9 changes: 8 additions & 1 deletion src/compute/arithmetics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::ops::{Add, Div, Mul, Neg, Rem, Sub};

use num_traits::{NumCast, Zero};

use crate::datatypes::{DataType, TimeUnit};
use crate::datatypes::{DataType, IntervalUnit, TimeUnit};
use crate::error::{ArrowError, Result};
use crate::types::NativeType;
use crate::{array::*, bitmap::Bitmap};
Expand Down Expand Up @@ -145,6 +145,11 @@ pub fn arithmetic(lhs: &dyn Array, op: Operator, rhs: &dyn Array) -> Result<Box<
let rhs = rhs.as_any().downcast_ref().unwrap();
time::add_duration::<i64>(lhs, rhs).map(|x| Box::new(x) as Box<dyn Array>)
}
(Timestamp(_, _), Add, Interval(IntervalUnit::MonthDayNano)) => {
let lhs = lhs.as_any().downcast_ref().unwrap();
let rhs = rhs.as_any().downcast_ref().unwrap();
time::add_interval(lhs, rhs).map(|x| Box::new(x) as Box<dyn Array>)
}
(Time64(TimeUnit::Microsecond), Subtract, Duration(_))
| (Time64(TimeUnit::Nanosecond), Subtract, Duration(_))
| (Date64, Subtract, Duration(_))
Expand Down Expand Up @@ -214,6 +219,7 @@ pub fn can_arithmetic(lhs: &DataType, op: Operator, rhs: &DataType) -> bool {
| (Time64(TimeUnit::Nanosecond), Add, Duration(_))
| (Timestamp(_, _), Subtract, Duration(_))
| (Timestamp(_, _), Add, Duration(_))
| (Timestamp(_, _), Add, Interval(IntervalUnit::MonthDayNano))
| (Timestamp(_, None), Subtract, Timestamp(_, None))
)
}
Expand Down Expand Up @@ -462,6 +468,7 @@ mod tests {
Duration(TimeUnit::Millisecond),
Duration(TimeUnit::Microsecond),
Duration(TimeUnit::Nanosecond),
Interval(IntervalUnit::MonthDayNano),
];
let operators = vec![
Operator::Add,
Expand Down
71 changes: 65 additions & 6 deletions src/compute/arithmetics/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
compute::arity::binary,
datatypes::{DataType, TimeUnit},
error::{ArrowError, Result},
temporal_conversions::{timeunit_scale, SECONDS_IN_DAY},
types::NativeType,
temporal_conversions,
types::{months_days_ns, NativeType},
};

/// Creates the scale required to add or subtract a Duration to a time array
Expand All @@ -36,20 +36,21 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
| (DataType::Time32(timeunit_a), DataType::Duration(timeunit_b))
| (DataType::Time64(timeunit_a), DataType::Duration(timeunit_b)) => {
// The scale is based on the TimeUnit that each of the numbers have.
timeunit_scale(*timeunit_a, *timeunit_b)
temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b)
}
(DataType::Date32, DataType::Duration(timeunit)) => {
// Date32 represents the time elapsed time since UNIX epoch
// (1970-01-01) in days (32 bits). The duration value has to be
// scaled to days to be able to add the value to the Date.
timeunit_scale(TimeUnit::Second, *timeunit) / SECONDS_IN_DAY as f64
temporal_conversions::timeunit_scale(TimeUnit::Second, *timeunit)
/ temporal_conversions::SECONDS_IN_DAY as f64
}
(DataType::Date64, DataType::Duration(timeunit)) => {
// Date64 represents the time elapsed time since UNIX epoch
// (1970-01-01) in milliseconds (64 bits). The duration value has
// to be scaled to milliseconds to be able to add the value to the
// Date.
timeunit_scale(TimeUnit::Millisecond, *timeunit)
temporal_conversions::timeunit_scale(TimeUnit::Millisecond, *timeunit)
}
_ => {
return Err(ArrowError::InvalidArgumentError(
Expand Down Expand Up @@ -216,7 +217,7 @@ pub fn subtract_timestamps(
(DataType::Timestamp(timeunit_a, None), DataType::Timestamp(timeunit_b, None)) => {
// Closure for the binary operation. The closure contains the scale
// required to calculate the difference between the timestamps.
let scale = timeunit_scale(*timeunit_a, *timeunit_b);
let scale = temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b);
let op = move |a, b| a - (b as f64 * scale) as i64;

binary(lhs, rhs, DataType::Duration(*timeunit_a), op)
Expand All @@ -227,6 +228,64 @@ pub fn subtract_timestamps(
}
}

/// Adds an interval to a [`DataType::Timestamp`].
pub fn add_interval(
timestamp: &PrimitiveArray<i64>,
interval: &PrimitiveArray<months_days_ns>,
) -> Result<PrimitiveArray<i64>> {
match timestamp.data_type().to_logical_type() {
DataType::Timestamp(time_unit, Some(timezone_str)) => {
let time_unit = *time_unit;
let timezone = temporal_conversions::parse_offset(timezone_str);
match timezone {
Ok(timezone) => binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
),
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(timezone_str)?;
binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
)
}
#[cfg(not(feature = "chrono-tz"))]
_ => Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
))),
}
}
DataType::Timestamp(time_unit, None) => {
let time_unit = *time_unit;
binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_naive_interval(timestamp, time_unit, interval)
},
)
}
_ => Err(ArrowError::InvalidArgumentError(
"Adding an interval is only supported for `DataType::Timestamp`".to_string(),
)),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
17 changes: 5 additions & 12 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
bitmap::Bitmap,
compute::arity::unary,
datatypes::{DataType, TimeUnit},
error::ArrowError,
temporal_conversions::*,
types::NativeType,
};
Expand Down Expand Up @@ -329,17 +328,10 @@ fn chrono_tz_timestamp_to_utf8<O: Offset>(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
}

#[cfg(not(feature = "chrono-tz"))]
Expand All @@ -348,6 +340,7 @@ fn chrono_tz_timestamp_to_utf8<O: Offset>(
_: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
use crate::error::ArrowError;
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
Expand Down
22 changes: 4 additions & 18 deletions src/compute/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,8 @@ fn chrono_tz_hour(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<PrimitiveArray<u32>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(extract_impl(array, time_unit, timezone, |x| x.hour()))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(extract_impl(array, time_unit, timezone, |x| x.hour()))
}

#[cfg(not(feature = "chrono-tz"))]
Expand All @@ -112,15 +105,8 @@ fn chrono_tz_year(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<PrimitiveArray<i32>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(extract_impl(array, time_unit, timezone, |x| x.year()))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(extract_impl(array, time_unit, timezone, |x| x.year()))
}

#[cfg(not(feature = "chrono-tz"))]
Expand Down
Loading