Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Taking care of Timestamp in the most pain-free way I could come up with
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 13, 2023
1 parent 3f75972 commit c1e58ad
Show file tree
Hide file tree
Showing 24 changed files with 250 additions and 112 deletions.
8 changes: 4 additions & 4 deletions src/compute/arithmetics/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -96,7 +96,7 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down Expand Up @@ -161,7 +161,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// let duration = PrimitiveArray::from([Some(10i64), Some(20i64), None, Some(30i64)])
Expand All @@ -176,7 +176,7 @@ where
/// ])
/// .to(DataType::Timestamp(
/// TimeUnit::Second,
/// Some("America/New_York".to_string()),
/// Some(std::sync::Arc::new("America/New_york".to_string())),
/// ));
///
/// assert_eq!(result, expected);
Expand Down
3 changes: 2 additions & 1 deletion src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::hash::Hash;
use std::sync::Arc;

use num_traits::{AsPrimitive, Float, ToPrimitive};

Expand Down Expand Up @@ -406,7 +407,7 @@ pub fn timestamp_to_timestamp(
from: &PrimitiveArray<i64>,
from_unit: TimeUnit,
to_unit: TimeUnit,
tz: &Option<String>,
tz: &Option<Arc<String>>,
) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
Expand Down
6 changes: 4 additions & 2 deletions src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use chrono::Datelike;

use crate::{
Expand Down Expand Up @@ -127,7 +129,7 @@ pub fn utf8_to_naive_timestamp_ns<O: Offset>(from: &Utf8Array<O>) -> PrimitiveAr

pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
from: &dyn Array,
timezone: String,
timezone: Arc<String>,
) -> Result<Box<dyn Array>> {
let from = from.as_any().downcast_ref().unwrap();
utf8_to_timestamp_ns::<O>(from, timezone)
Expand All @@ -138,7 +140,7 @@ pub(super) fn utf8_to_timestamp_ns_dyn<O: Offset>(
/// [`crate::temporal_conversions::utf8_to_timestamp_ns`] applied for RFC3339 formatting
pub fn utf8_to_timestamp_ns<O: Offset>(
from: &Utf8Array<O>,
timezone: String,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
utf8_to_timestamp_ns_(from, RFC3339, timezone)
}
Expand Down
8 changes: 5 additions & 3 deletions src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub enum DataType {
/// * An absolute time zone offset of the form +XX:XX or -XX:XX, such as +07:30
/// When the timezone is not specified, the timestamp is considered to have no timezone
/// and is represented _as is_
Timestamp(TimeUnit, Option<String>),
Timestamp(TimeUnit, Option<Arc<String>>),
/// An [`i32`] representing the elapsed time since UNIX epoch (1970-01-01)
/// in days.
Date32,
Expand Down Expand Up @@ -218,7 +218,9 @@ impl From<DataType> for arrow_schema::DataType {
DataType::Float16 => Self::Float16,
DataType::Float32 => Self::Float32,
DataType::Float64 => Self::Float64,
DataType::Timestamp(unit, tz) => Self::Timestamp(unit.into(), tz),
DataType::Timestamp(unit, tz) => {
Self::Timestamp(unit.into(), tz.map(Arc::unwrap_or_clone_polyfill))
}
DataType::Date32 => Self::Date32,
DataType::Date64 => Self::Date64,
DataType::Time32(unit) => Self::Time32(unit.into()),
Expand Down Expand Up @@ -280,7 +282,7 @@ impl From<arrow_schema::DataType> for DataType {
DataType::Float16 => Self::Float16,
DataType::Float32 => Self::Float32,
DataType::Float64 => Self::Float64,
DataType::Timestamp(unit, tz) => Self::Timestamp(unit.into(), tz),
DataType::Timestamp(unit, tz) => Self::Timestamp(unit.into(), tz.map(Arc::new)),
DataType::Date32 => Self::Date32,
DataType::Date64 => Self::Date64,
DataType::Time32(unit) => Self::Time32(unit.into()),
Expand Down
28 changes: 21 additions & 7 deletions src/ffi/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,18 @@ unsafe fn to_data_type(schema: &ArrowSchema) -> Result<DataType> {
["tsn", ""] => DataType::Timestamp(TimeUnit::Nanosecond, None),

// Timestamps with timezone
["tss", tz] => DataType::Timestamp(TimeUnit::Second, Some(tz.to_string())),
["tsm", tz] => DataType::Timestamp(TimeUnit::Millisecond, Some(tz.to_string())),
["tsu", tz] => DataType::Timestamp(TimeUnit::Microsecond, Some(tz.to_string())),
["tsn", tz] => DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.to_string())),
["tss", tz] => {
DataType::Timestamp(TimeUnit::Second, Some(Arc::new(tz.to_string())))
}
["tsm", tz] => {
DataType::Timestamp(TimeUnit::Millisecond, Some(Arc::new(tz.to_string())))
}
["tsu", tz] => {
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::new(tz.to_string())))
}
["tsn", tz] => {
DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::new(tz.to_string())))
}

["w", size_raw] => {
// Example: "w:42" fixed-width binary [42 bytes]
Expand Down Expand Up @@ -433,7 +441,7 @@ fn to_format(data_type: &DataType) -> String {
format!(
"ts{}:{}",
unit,
tz.as_ref().map(|x| x.as_ref()).unwrap_or("")
tz.as_ref().map(|x| x.as_str()).unwrap_or("")
)
}
DataType::Decimal(precision, scale) => format!("d:{precision},{scale}"),
Expand Down Expand Up @@ -576,7 +584,10 @@ mod tests {
true,
),
]),
DataType::Map(std::sync::Arc::new(Field::new("a", DataType::Int64, true)), true),
DataType::Map(
std::sync::Arc::new(Field::new("a", DataType::Int64, true)),
true,
),
DataType::Union(
vec![
Field::new("a", DataType::Int64, true),
Expand Down Expand Up @@ -609,7 +620,10 @@ mod tests {
TimeUnit::Nanosecond,
] {
dts.push(DataType::Timestamp(time_unit, None));
dts.push(DataType::Timestamp(time_unit, Some("00:00".to_string())));
dts.push(DataType::Timestamp(
time_unit,
Some(Arc::new("00:00".to_string())),
));
dts.push(DataType::Duration(time_unit));
}
for interval_type in [
Expand Down
6 changes: 4 additions & 2 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema};

use crate::datatypes::*;
Expand Down Expand Up @@ -52,10 +54,10 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) ->
Some(logical) => match logical {
avro_schema::schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond),
avro_schema::schema::LongLogical::TimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, Some("00:00".to_string()))
DataType::Timestamp(TimeUnit::Millisecond, Some(Arc::new("00:00".to_string())))
}
avro_schema::schema::LongLogical::TimestampMicros => {
DataType::Timestamp(TimeUnit::Microsecond, Some("00:00".to_string()))
DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::new("00:00".to_string())))
}
avro_schema::schema::LongLogical::LocalTimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, None)
Expand Down
17 changes: 11 additions & 6 deletions src/io/csv/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use crate::datatypes::{DataType, Field, TimeUnit};
use ahash::AHashSet;

Expand Down Expand Up @@ -27,15 +29,18 @@ fn is_naive_datetime(string: &str) -> bool {
string.parse::<chrono::NaiveDateTime>().is_ok()
}

fn is_datetime(string: &str) -> Option<String> {
fn is_datetime(string: &str) -> Option<Arc<String>> {
let mut parsed = chrono::format::Parsed::new();
let fmt = chrono::format::StrftimeItems::new(RFC3339);
if chrono::format::parse(&mut parsed, string, fmt).is_ok() {
parsed.offset.map(|x| {
let hours = x / 60 / 60;
let minutes = x / 60 - hours * 60;
format!("{hours:03}:{minutes:02}")
})
parsed
.offset
.map(|x| {
let hours = x / 60 / 60;
let minutes = x / 60 - hours * 60;
format!("{hours:03}:{minutes:02}")
})
.map(Arc::new)
} else {
None
}
Expand Down
4 changes: 3 additions & 1 deletion src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use arrow_format::ipc::{
planus::ReadAsRoot, FieldRef, FixedSizeListRef, MapRef, TimeRef, TimestampRef, UnionRef,
};
Expand Down Expand Up @@ -104,7 +106,7 @@ fn deserialize_time(time: TimeRef) -> Result<(DataType, IpcField)> {
}

fn deserialize_timestamp(timestamp: TimestampRef) -> Result<(DataType, IpcField)> {
let timezone = timestamp.timezone()?.map(|tz| tz.to_string());
let timezone = timestamp.timezone()?.map(|tz| Arc::new(tz.to_string()));
let time_unit = deserialize_timeunit(timestamp.unit()?)?;
Ok((
DataType::Timestamp(time_unit, timezone),
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ fn serialize_type(data_type: &DataType) -> arrow_format::ipc::Type {
})),
Timestamp(unit, tz) => ipc::Type::Timestamp(Box::new(ipc::Timestamp {
unit: serialize_time_unit(unit),
timezone: tz.as_ref().cloned(),
timezone: tz.as_ref().map(|tz| tz.as_str().to_owned()),
})),
Interval(unit) => ipc::Type::Interval(Box::new(ipc::Interval {
unit: match unit {
Expand Down
2 changes: 1 addition & 1 deletion src/io/json_integration/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ fn to_data_type(item: &Value, mut children: Vec<Field>) -> Result<DataType> {
Some(Value::String(tz)) => Ok(Some(tz.clone())),
_ => Err(Error::OutOfSpec("timezone must be a string".to_string())),
}?;
DataType::Timestamp(unit, tz)
DataType::Timestamp(unit, tz.map(Arc::new))
}
"date" => match item.get("unit") {
Some(p) if p == "DAY" => DataType::Date32,
Expand Down
Loading

0 comments on commit c1e58ad

Please sign in to comment.