Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified API for writing to JSON #864

Merged
merged 1 commit into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/write_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::util::bench_util::*;

fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();
let format = write::Format::Json;

let batches = vec![Ok(columns.clone())].into_iter();

Expand Down
2 changes: 1 addition & 1 deletion examples/json_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::{

fn write_batches(path: &str, names: Vec<String>, batches: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
let mut writer = File::create(path)?;
let format = write::JsonArray::default();
let format = write::Format::Json;

let batches = batches.iter().cloned().map(Ok);

Expand Down
43 changes: 31 additions & 12 deletions src/io/json/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
//! APIs to write to JSON
mod format;
mod serialize;

pub use fallible_streaming_iterator::*;
pub use format::*;
pub use serialize::serialize;

use crate::{
array::Array,
chunk::Chunk,
error::{ArrowError, Result},
};
use format::*;

/// The supported variations of JSON supported
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Format {
/// JSON
Json,
/// NDJSON (http://ndjson.org/)
NewlineDelimitedJson,
}

/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written
/// JSON has the expected `format`
pub fn write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
fn _write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
where
W: std::io::Write,
F: JsonFormat,
Expand All @@ -30,28 +38,40 @@ where
Ok(())
}

/// Writes blocks of JSON-encoded data into `writer` according to format [`Format`].
/// # Implementation
/// This is IO-bounded
pub fn write<W, I>(writer: &mut W, format: Format, blocks: I) -> Result<()>
where
W: std::io::Write,
I: FallibleStreamingIterator<Item = [u8], Error = ArrowError>,
{
match format {
Format::Json => _write(writer, JsonArray::default(), blocks),
Format::NewlineDelimitedJson => _write(writer, LineDelimited::default(), blocks),
}
}

/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] to bytes.
/// Advancing it is CPU-bounded
pub struct Serializer<F, A, I>
pub struct Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
batches: I,
names: Vec<String>,
buffer: Vec<u8>,
format: F,
format: Format,
}

impl<F, A, I> Serializer<F, A, I>
impl<A, I> Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
/// Creates a new [`Serializer`].
pub fn new(batches: I, names: Vec<String>, buffer: Vec<u8>, format: F) -> Self {
pub fn new(batches: I, names: Vec<String>, buffer: Vec<u8>, format: Format) -> Self {
Self {
batches,
names,
Expand All @@ -61,9 +81,8 @@ where
}
}

impl<F, A, I> FallibleStreamingIterator for Serializer<F, A, I>
impl<A, I> FallibleStreamingIterator for Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
Expand Down
20 changes: 18 additions & 2 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::temporal_conversions::{
use crate::util::lexical_to_bytes_mut;
use crate::{array::*, datatypes::DataType, types::NativeType};

use super::{JsonArray, JsonFormat};
use super::format::{JsonArray, JsonFormat, LineDelimited};
use super::Format;

fn boolean_serializer<'a>(
array: &'a BooleanArray,
Expand Down Expand Up @@ -249,7 +250,7 @@ fn serialize_item<F: JsonFormat>(

/// Serializes a (name, array) to a valid JSON to `buffer`
/// This is CPU-bounded
pub fn serialize<N, A, F>(names: &[N], columns: &Chunk<A>, format: F, buffer: &mut Vec<u8>)
fn _serialize<N, A, F>(names: &[N], columns: &Chunk<A>, format: F, buffer: &mut Vec<u8>)
where
N: AsRef<str>,
A: AsRef<dyn Array>,
Expand Down Expand Up @@ -278,3 +279,18 @@ where
is_first_row = false;
})
}

/// Serializes a (name, array) to a valid JSON to `buffer`
/// This is CPU-bounded
pub fn serialize<N, A>(names: &[N], columns: &Chunk<A>, format: Format, buffer: &mut Vec<u8>)
where
N: AsRef<str>,
A: AsRef<dyn Array>,
{
match format {
Format::Json => _serialize(names, columns, JsonArray::default(), buffer),
Format::NewlineDelimitedJson => {
_serialize(names, columns, LineDelimited::default(), buffer)
}
}
}
6 changes: 3 additions & 3 deletions tests/it/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ fn read_batch(data: String, fields: &[Field]) -> Result<Chunk<Arc<dyn Array>>> {
json_read::deserialize(rows, fields)
}

fn write_batch<F: json_write::JsonFormat, A: AsRef<dyn Array>>(
fn write_batch<A: AsRef<dyn Array>>(
batch: Chunk<A>,
names: Vec<String>,
format: F,
format: json_write::Format,
) -> Result<Vec<u8>> {
let batches = vec![Ok(batch)].into_iter();

Expand All @@ -46,7 +46,7 @@ fn round_trip(data: String) -> Result<()> {
let buf = write_batch(
columns.clone(),
fields.iter().map(|x| x.name.clone()).collect(),
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

let new_chunk = read_batch(String::from_utf8(buf).unwrap(), &fields)?;
Expand Down
22 changes: 11 additions & 11 deletions tests/it/io/json/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn write_simple_rows() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -45,7 +45,7 @@ fn write_simple_rows_array() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::JsonArray::default(),
json_write::Format::Json,
)?;

assert_eq!(
Expand Down Expand Up @@ -88,7 +88,7 @@ fn write_nested_struct_with_validity() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -133,7 +133,7 @@ fn write_nested_structs() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -169,7 +169,7 @@ fn write_struct_with_list_field() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -213,7 +213,7 @@ fn write_nested_list() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -274,7 +274,7 @@ fn write_list_of_struct() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -296,7 +296,7 @@ fn write_escaped_utf8() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -315,7 +315,7 @@ fn write_quotation_marks_in_utf8() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -334,7 +334,7 @@ fn write_date32() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -356,7 +356,7 @@ fn write_timestamp() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down