Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
add json serialization of timestamp/date32/date64 (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Feb 6, 2022
1 parent eb4e588 commit f02da8a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use chrono::{NaiveDate, NaiveDateTime};
use lexical_core::ToLexical;
use std::io::Write;
use streaming_iterator::StreamingIterator;

use crate::bitmap::utils::zip_validity;
use crate::chunk::Chunk;
use crate::datatypes::TimeUnit;
use crate::io::iterator::BufStreamingIterator;
use crate::temporal_conversions::{
date32_to_date, date64_to_date, timestamp_ms_to_datetime, timestamp_ns_to_datetime,
timestamp_s_to_datetime, timestamp_us_to_datetime,
};
use crate::util::lexical_to_bytes_mut;
use crate::{array::*, datatypes::DataType, types::NativeType};

Expand Down Expand Up @@ -136,6 +143,49 @@ fn list_serializer<'a, O: Offset>(
))
}

fn date_serializer<'a, T, F>(
array: &'a PrimitiveArray<T>,
convert: F,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
T: NativeType,
F: Fn(T) -> NaiveDate + 'static + Send + Sync,
{
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let nd = convert(*x);
write!(buf, "{}", nd).unwrap();
} else {
buf.extend_from_slice(b"null")
}
},
vec![],
))
}

fn timestamp_serializer<'a, F>(
array: &'a PrimitiveArray<i64>,
convert: F,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>
where
F: Fn(i64) -> NaiveDateTime + 'static + Send + Sync,
{
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
let ndt = convert(*x);
write!(buf, "{}", ndt).unwrap();
} else {
buf.extend_from_slice(b"null")
}
},
vec![],
))
}

fn new_serializer<'a>(
array: &'a dyn Array,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync> {
Expand All @@ -156,6 +206,21 @@ fn new_serializer<'a>(
DataType::Struct(_) => struct_serializer(array.as_any().downcast_ref().unwrap()),
DataType::List(_) => list_serializer::<i32>(array.as_any().downcast_ref().unwrap()),
DataType::LargeList(_) => list_serializer::<i64>(array.as_any().downcast_ref().unwrap()),
DataType::Date32 => date_serializer(array.as_any().downcast_ref().unwrap(), date32_to_date),
DataType::Date64 => date_serializer(array.as_any().downcast_ref().unwrap(), date64_to_date),
DataType::Timestamp(tu, tz) => {
if tz.is_some() {
todo!("still have to implement timezone")
} else {
let convert = match tu {
TimeUnit::Nanosecond => timestamp_ns_to_datetime,
TimeUnit::Microsecond => timestamp_us_to_datetime,
TimeUnit::Millisecond => timestamp_ms_to_datetime,
TimeUnit::Second => timestamp_s_to_datetime,
};
timestamp_serializer(array.as_any().downcast_ref().unwrap(), convert)
}
}
other => todo!("Writing {:?} to JSON", other),
}
}
Expand Down
41 changes: 41 additions & 0 deletions tests/it/io/json/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,44 @@ fn write_quotation_marks_in_utf8() -> Result<()> {
);
Ok(())
}

#[test]
fn write_date32() -> Result<()> {
let a = PrimitiveArray::from_data(DataType::Date32, vec![1000i32, 8000, 10000].into(), None);

let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap();

let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
)?;

assert_eq!(
String::from_utf8(buf).unwrap().as_bytes(),
b"{\"c1\":1972-09-27}\n{\"c1\":1991-11-27}\n{\"c1\":1997-05-19}\n"
);
Ok(())
}
#[test]
fn write_timestamp() -> Result<()> {
let a = PrimitiveArray::from_data(
DataType::Timestamp(TimeUnit::Second, None),
vec![10i64, 1 << 32, 1 << 33].into(),
None,
);

let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap();

let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
)?;

assert_eq!(
String::from_utf8(buf).unwrap().as_bytes(),
b"{\"c1\":1970-01-01 00:00:10}\n{\"c1\":2106-02-07 06:28:16}\n{\"c1\":2242-03-16 12:56:32}\n"
);
Ok(())
}

0 comments on commit f02da8a

Please sign in to comment.