Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Refactored JSON writing (5-10x) (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 27, 2021
1 parent f33a41f commit f07cc2c
Show file tree
Hide file tree
Showing 15 changed files with 526 additions and 692 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
Expand Down Expand Up @@ -300,3 +300,7 @@ harness = false
[[bench]]
name = "bitwise"
harness = false

[[bench]]
name = "write_json"
harness = false
58 changes: 58 additions & 0 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::json::write;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

fn write_batch(batch: &RecordBatch) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();

let batches = vec![Ok(batch.clone())].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
RecordBatch::try_from_iter([("a", Arc::new(array) as Arc<dyn Array>)]).unwrap()
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
34 changes: 34 additions & 0 deletions examples/json_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::{
array::Int32Array,
datatypes::{Field, Schema},
error::Result,
io::json::write,
record_batch::RecordBatch,
};

fn write_batches(path: &str, batches: &[RecordBatch]) -> Result<()> {
let mut writer = File::create(path)?;
let format = write::JsonArray::default();

let batches = batches.iter().cloned().map(Ok);

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn main() -> Result<()> {
let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

write_batches("example.json", &[batch.clone(), batch])
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
- [Read Avro](./io/avro_read.md)
- [Write Avro](./io/avro_write.md)
- [Read JSON](./io/json_read.md)
- [Write JSON](./io/json_write.md)
8 changes: 8 additions & 0 deletions guide/src/io/json_write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Write JSON

When compiled with feature `io_json`, you can use this crate to write JSON files.
The following example writes a batch as a JSON file:

```rust
{{#include ../../../examples/json_write.rs}}
```
6 changes: 3 additions & 3 deletions src/io/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use streaming_iterator::StreamingIterator;
pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
iterator: I,
f: F,
Expand All @@ -17,7 +17,7 @@ where
impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Expand All @@ -33,7 +33,7 @@ where
impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
type Item = [u8];

Expand Down
4 changes: 1 addition & 3 deletions src/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
//! Convert data between the Arrow memory format and JSON line-delimited records.
pub mod read;
mod write;

pub use write::*;
pub mod write;

use crate::error::ArrowError;

Expand Down
77 changes: 77 additions & 0 deletions src/io/json/write/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::{fmt::Debug, io::Write};

use crate::error::Result;

/// Trait defining how to format a sequence of JSON objects to a byte stream.
pub trait JsonFormat: Debug + Default + Copy {
#[inline]
/// write any bytes needed at the start of the file to the writer
fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the start of each row
fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the end of each row
fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

/// write any bytes needed for the start of each row
fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}
}

/// Produces JSON output with one record per line. For example
///
/// ```json
/// {"foo":1}
/// {"bar":1}
///
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct LineDelimited {}

impl JsonFormat for LineDelimited {
#[inline]
fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"\n")?;
Ok(())
}
}

/// Produces JSON output as a single JSON array. For example
///
/// ```json
/// [{"foo":1},{"bar":1}]
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct JsonArray {}

impl JsonFormat for JsonArray {
#[inline]
fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"[")?;
Ok(())
}

#[inline]
fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<()> {
if !is_first_row {
writer.write_all(b",")?;
}
Ok(())
}

#[inline]
fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"]")?;
Ok(())
}
}
103 changes: 83 additions & 20 deletions src/io/json/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! APIs to write to JSON
mod format;
mod serialize;
mod writer;
pub use serialize::write_record_batches;
pub use writer::*;
pub use fallible_streaming_iterator::*;
pub use format::*;
pub use serialize::serialize;

use crate::{
error::{ArrowError, Result},
record_batch::RecordBatch,
};

/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written
/// JSON has the expected `format`
pub fn write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
where
W: std::io::Write,
F: JsonFormat,
I: FallibleStreamingIterator<Item = [u8], Error = ArrowError>,
{
format.start_stream(writer)?;
let mut is_first_row = true;
while let Some(block) = blocks.next()? {
format.start_row(writer, is_first_row)?;
is_first_row = false;
writer.write_all(block)?;
}
format.end_stream(writer)?;
Ok(())
}

/// [`FallibleStreamingIterator`] that serializes a [`RecordBatch`] to bytes.
/// Advancing it is CPU-bounded
pub struct Serializer<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> {
iter: I,
buffer: Vec<u8>,
format: F,
}

impl<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> Serializer<F, I> {
/// Creates a new [`Serializer`].
pub fn new(iter: I, buffer: Vec<u8>, format: F) -> Self {
Self {
iter,
buffer,
format,
}
}
}

impl<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> FallibleStreamingIterator
for Serializer<F, I>
{
type Item = [u8];

type Error = ArrowError;

fn advance(&mut self) -> Result<()> {
self.buffer.clear();
self.iter
.next()
.map(|maybe_batch| {
maybe_batch.map(|batch| {
let names = batch
.schema()
.fields()
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<_>>();
serialize(&names, batch.columns(), self.format, &mut self.buffer)
})
})
.transpose()?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if !self.buffer.is_empty() {
Some(&self.buffer)
} else {
None
}
}
}
Loading

0 comments on commit f07cc2c

Please sign in to comment.