diff --git a/Cargo.toml b/Cargo.toml index 8014b6f4279..13fbf2e05cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,7 +127,7 @@ io_csv_async = ["io_csv_read_async"] io_csv_read = ["csv", "lexical-core"] io_csv_read_async = ["csv-async", "lexical-core", "futures"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] -io_json = ["serde", "serde_json", "indexmap"] +io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] io_ipc_compression = ["lz4", "zstd"] @@ -300,3 +300,7 @@ harness = false [[bench]] name = "bitwise" harness = false + +[[bench]] +name = "write_json" +harness = false diff --git a/benches/write_json.rs b/benches/write_json.rs new file mode 100644 index 00000000000..354ebfab0e0 --- /dev/null +++ b/benches/write_json.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use criterion::{criterion_group, criterion_main, Criterion}; + +use arrow2::array::*; +use arrow2::error::Result; +use arrow2::io::json::write; +use arrow2::record_batch::RecordBatch; +use arrow2::util::bench_util::*; + +fn write_batch(batch: &RecordBatch) -> Result<()> { + let mut writer = vec![]; + let format = write::JsonArray::default(); + + let batches = vec![Ok(batch.clone())].into_iter(); + + // Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded) + let blocks = write::Serializer::new(batches, vec![], format); + + // the operation of writing is IO-bounded. + write::write(&mut writer, format, blocks)?; + + Ok(()) +} + +fn make_batch(array: impl Array + 'static) -> RecordBatch { + RecordBatch::try_from_iter([("a", Arc::new(array) as Arc)]).unwrap() +} + +fn add_benchmark(c: &mut Criterion) { + (10..=18).step_by(2).for_each(|log2_size| { + let size = 2usize.pow(log2_size); + + let array = create_primitive_array::(size, 0.1); + let batch = make_batch(array); + + c.bench_function(&format!("json write i32 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + + let array = create_string_array::(size, 100, 0.1, 42); + let batch = make_batch(array); + + c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + + let array = create_primitive_array::(size, 0.1); + let batch = make_batch(array); + + c.bench_function(&format!("json write f64 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + }); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/examples/json_write.rs b/examples/json_write.rs new file mode 100644 index 00000000000..e0679e6f1ba --- /dev/null +++ b/examples/json_write.rs @@ -0,0 +1,34 @@ +use std::fs::File; +use std::sync::Arc; + +use arrow2::{ + array::Int32Array, + datatypes::{Field, Schema}, + error::Result, + io::json::write, + record_batch::RecordBatch, +}; + +fn write_batches(path: &str, batches: &[RecordBatch]) -> Result<()> { + let mut writer = File::create(path)?; + let format = write::JsonArray::default(); + + let batches = batches.iter().cloned().map(Ok); + + // Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded) + let blocks = write::Serializer::new(batches, vec![], format); + + // the operation of writing is IO-bounded. + write::write(&mut writer, format, blocks)?; + + Ok(()) +} + +fn main() -> Result<()> { + let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + + write_batches("example.json", &[batch.clone(), batch]) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index d444beac250..a39fd47da8c 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -19,3 +19,4 @@ - [Read Avro](./io/avro_read.md) - [Write Avro](./io/avro_write.md) - [Read JSON](./io/json_read.md) + - [Write JSON](./io/json_write.md) diff --git a/guide/src/io/json_write.md b/guide/src/io/json_write.md new file mode 100644 index 00000000000..b237e014eca --- /dev/null +++ b/guide/src/io/json_write.md @@ -0,0 +1,8 @@ +# Write JSON + +When compiled with feature `io_json`, you can use this crate to write JSON files. +The following example writes a batch as a JSON file: + +```rust +{{#include ../../../examples/json_write.rs}} +``` diff --git a/src/io/iterator.rs b/src/io/iterator.rs index 65e16b4d173..91ec86fc2e0 100644 --- a/src/io/iterator.rs +++ b/src/io/iterator.rs @@ -6,7 +6,7 @@ pub use streaming_iterator::StreamingIterator; pub struct BufStreamingIterator where I: Iterator, - F: Fn(T, &mut Vec), + F: FnMut(T, &mut Vec), { iterator: I, f: F, @@ -17,7 +17,7 @@ where impl BufStreamingIterator where I: Iterator, - F: Fn(T, &mut Vec), + F: FnMut(T, &mut Vec), { #[inline] pub fn new(iterator: I, f: F, buffer: Vec) -> Self { @@ -33,7 +33,7 @@ where impl StreamingIterator for BufStreamingIterator where I: Iterator, - F: Fn(T, &mut Vec), + F: FnMut(T, &mut Vec), { type Item = [u8]; diff --git a/src/io/json/mod.rs b/src/io/json/mod.rs index 7411038a7d5..d7a1a862046 100644 --- a/src/io/json/mod.rs +++ b/src/io/json/mod.rs @@ -3,9 +3,7 @@ //! Convert data between the Arrow memory format and JSON line-delimited records. pub mod read; -mod write; - -pub use write::*; +pub mod write; use crate::error::ArrowError; diff --git a/src/io/json/write/format.rs b/src/io/json/write/format.rs new file mode 100644 index 00000000000..66d85100d3e --- /dev/null +++ b/src/io/json/write/format.rs @@ -0,0 +1,77 @@ +use std::{fmt::Debug, io::Write}; + +use crate::error::Result; + +/// Trait defining how to format a sequence of JSON objects to a byte stream. +pub trait JsonFormat: Debug + Default + Copy { + #[inline] + /// write any bytes needed at the start of the file to the writer + fn start_stream(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } + + #[inline] + /// write any bytes needed for the start of each row + fn start_row(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> { + Ok(()) + } + + #[inline] + /// write any bytes needed for the end of each row + fn end_row(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } + + /// write any bytes needed for the start of each row + fn end_stream(&self, _writer: &mut W) -> Result<()> { + Ok(()) + } +} + +/// Produces JSON output with one record per line. For example +/// +/// ```json +/// {"foo":1} +/// {"bar":1} +/// +/// ``` +#[derive(Debug, Default, Clone, Copy)] +pub struct LineDelimited {} + +impl JsonFormat for LineDelimited { + #[inline] + fn end_row(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"\n")?; + Ok(()) + } +} + +/// Produces JSON output as a single JSON array. For example +/// +/// ```json +/// [{"foo":1},{"bar":1}] +/// ``` +#[derive(Debug, Default, Clone, Copy)] +pub struct JsonArray {} + +impl JsonFormat for JsonArray { + #[inline] + fn start_stream(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"[")?; + Ok(()) + } + + #[inline] + fn start_row(&self, writer: &mut W, is_first_row: bool) -> Result<()> { + if !is_first_row { + writer.write_all(b",")?; + } + Ok(()) + } + + #[inline] + fn end_stream(&self, writer: &mut W) -> Result<()> { + writer.write_all(b"]")?; + Ok(()) + } +} diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index 7915dad3416..726825ed09f 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -1,21 +1,84 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - +//! APIs to write to JSON +mod format; mod serialize; -mod writer; -pub use serialize::write_record_batches; -pub use writer::*; +pub use fallible_streaming_iterator::*; +pub use format::*; +pub use serialize::serialize; + +use crate::{ + error::{ArrowError, Result}, + record_batch::RecordBatch, +}; + +/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written +/// JSON has the expected `format` +pub fn write(writer: &mut W, format: F, mut blocks: I) -> Result<()> +where + W: std::io::Write, + F: JsonFormat, + I: FallibleStreamingIterator, +{ + format.start_stream(writer)?; + let mut is_first_row = true; + while let Some(block) = blocks.next()? { + format.start_row(writer, is_first_row)?; + is_first_row = false; + writer.write_all(block)?; + } + format.end_stream(writer)?; + Ok(()) +} + +/// [`FallibleStreamingIterator`] that serializes a [`RecordBatch`] to bytes. +/// Advancing it is CPU-bounded +pub struct Serializer>> { + iter: I, + buffer: Vec, + format: F, +} + +impl>> Serializer { + /// Creates a new [`Serializer`]. + pub fn new(iter: I, buffer: Vec, format: F) -> Self { + Self { + iter, + buffer, + format, + } + } +} + +impl>> FallibleStreamingIterator + for Serializer +{ + type Item = [u8]; + + type Error = ArrowError; + + fn advance(&mut self) -> Result<()> { + self.buffer.clear(); + self.iter + .next() + .map(|maybe_batch| { + maybe_batch.map(|batch| { + let names = batch + .schema() + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>(); + serialize(&names, batch.columns(), self.format, &mut self.buffer) + }) + }) + .transpose()?; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if !self.buffer.is_empty() { + Some(&self.buffer) + } else { + None + } + } +} diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 8aa43ce946e..c195e2f69f3 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -1,109 +1,63 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use serde_json::map::Map; -use serde_json::{Number, Value}; +use lexical_core::ToLexical; +use serde_json::Value; +use streaming_iterator::StreamingIterator; use crate::bitmap::utils::zip_validity; -use crate::{array::*, datatypes::*, record_batch::RecordBatch, types::NativeType}; - -trait JsonSerializable { - fn into_json_value(self) -> Option; -} - -impl JsonSerializable for i8 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for i16 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for i32 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for i64 { - fn into_json_value(self) -> Option { - Some(Value::Number(Number::from(self))) - } -} - -impl JsonSerializable for u8 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for u16 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for u32 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for u64 { - fn into_json_value(self) -> Option { - Some(self.into()) - } -} - -impl JsonSerializable for f32 { - fn into_json_value(self) -> Option { - Number::from_f64(f64::round(self as f64 * 1000.0) / 1000.0).map(Value::Number) - } -} - -impl JsonSerializable for f64 { - fn into_json_value(self) -> Option { - Number::from_f64(self).map(Value::Number) - } -} - -#[inline] -fn to_json(value: Option<&T>) -> Value -where - T: NativeType + JsonSerializable, -{ - value - .and_then(|x| x.into_json_value()) - .unwrap_or(Value::Null) -} - -fn primitive_array_to_json(array: &dyn Array) -> Vec -where - T: NativeType + JsonSerializable, -{ - let array = array.as_any().downcast_ref::>().unwrap(); - array.iter().map(to_json).collect() -} - -fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec> { +use crate::io::iterator::BufStreamingIterator; +use crate::util::lexical_to_bytes_mut; +use crate::{array::*, datatypes::DataType, types::NativeType}; + +use super::{JsonArray, JsonFormat}; + +fn boolean_serializer<'a>( + array: &'a BooleanArray, +) -> Box + 'a + Send + Sync> { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| match x { + Some(true) => buf.extend_from_slice(b"true"), + Some(false) => buf.extend_from_slice(b"false"), + None => buf.extend_from_slice(b"null"), + }, + vec![], + )) +} + +fn primitive_serializer<'a, T: NativeType + ToLexical>( + array: &'a PrimitiveArray, +) -> Box + 'a + Send + Sync> { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + lexical_to_bytes_mut(*x, buf) + } else { + buf.extend(b"null") + } + }, + vec![], + )) +} + +fn utf8_serializer<'a, O: Offset>( + array: &'a Utf8Array, +) -> Box + 'a + Send + Sync> { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + utf8_serialize(x, buf) + } else { + buf.extend_from_slice(b"null") + } + }, + vec![], + )) +} + +fn struct_serializer<'a>( + array: &'a StructArray, +) -> Box + 'a + Send + Sync> { // {"a": [1, 2, 3], "b": [a, b, c], "c": {"a": [1, 2, 3]}} // [ // {"a": 1, "b": a, "c": {"a": 1}}, @@ -111,272 +65,164 @@ fn struct_array_to_jsonmap_array(array: &StructArray, row_count: usize) -> Vec>>(); - - array + let mut serializers = array .values() .iter() - .enumerate() - .for_each(|(j, struct_col)| { - set_column_for_json_rows( - &mut inner_objs, - row_count, - struct_col.as_ref(), - fields[j].name(), - ); - }); - - inner_objs + .map(|x| x.as_ref()) + .map(new_serializer) + .collect::>(); + let names = array.fields().iter().map(|f| f.name().as_str()); + + Box::new(BufStreamingIterator::new( + zip_validity(0..array.len(), array.validity().map(|x| x.iter())), + move |maybe, buf| { + if maybe.is_some() { + let names = names.clone(); + let mut record: Vec<(&str, &[u8])> = Default::default(); + serializers + .iter_mut() + .zip(names) + // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` + .for_each(|(iter, name)| { + let item = iter.next().unwrap(); + record.push((name, item)); + }); + serialize_item(buf, &record, JsonArray::default(), true); + } else { + serializers.iter_mut().for_each(|iter| { + let _ = iter.next(); + }); + buf.extend(b"null"); + } + }, + vec![], + )) } -fn write_array(array: &dyn Array) -> Value { - Value::Array(match array.data_type() { - DataType::Null => std::iter::repeat(Value::Null).take(array.len()).collect(), - DataType::Boolean => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|maybe_value| match maybe_value { - Some(v) => v.into(), - None => Value::Null, - }) - .collect(), - DataType::Utf8 => array - .as_any() - .downcast_ref::>() - .unwrap() - .iter() - .map(|maybe_value| match maybe_value { - Some(v) => v.into(), - None => Value::Null, - }) - .collect(), - DataType::LargeUtf8 => array - .as_any() - .downcast_ref::>() - .unwrap() - .iter() - .map(|maybe_value| match maybe_value { - Some(v) => v.into(), - None => Value::Null, - }) - .collect(), - DataType::Int8 => primitive_array_to_json::(array), - DataType::Int16 => primitive_array_to_json::(array), - DataType::Int32 => primitive_array_to_json::(array), - DataType::Int64 => primitive_array_to_json::(array), - DataType::UInt8 => primitive_array_to_json::(array), - DataType::UInt16 => primitive_array_to_json::(array), - DataType::UInt32 => primitive_array_to_json::(array), - DataType::UInt64 => primitive_array_to_json::(array), - DataType::Float32 => primitive_array_to_json::(array), - DataType::Float64 => primitive_array_to_json::(array), - DataType::List(_) => array - .as_any() - .downcast_ref::>() - .unwrap() - .iter() - .map(|maybe_value| match maybe_value { - Some(v) => write_array(v.as_ref()), - None => Value::Null, - }) - .collect(), - DataType::LargeList(_) => array - .as_any() - .downcast_ref::>() - .unwrap() - .iter() - .map(|maybe_value| match maybe_value { - Some(v) => write_array(v.as_ref()), - None => Value::Null, - }) - .collect(), - DataType::Struct(_) => { - let jsonmaps = struct_array_to_jsonmap_array( - array.as_any().downcast_ref::().unwrap(), - array.len(), - ); - zip_validity(jsonmaps.into_iter(), array.validity().map(|v| v.iter())) - .map(|m| m.map(Value::Object).unwrap_or(Value::Null)) - .collect() - } - _ => { - panic!( - "Unsupported datatype for array conversion: {:#?}", - array.data_type() - ); - } - }) +fn list_serializer<'a, O: Offset>( + array: &'a ListArray, +) -> Box + 'a + Send + Sync> { + // [[1, 2], [3]] + // [ + // [1, 2], + // [3] + // ] + // + let mut serializer = new_serializer(array.values().as_ref()); + + Box::new(BufStreamingIterator::new( + zip_validity( + array.offsets().windows(2), + array.validity().map(|x| x.iter()), + ), + move |offset, buf| { + if let Some(offset) = offset { + let length = (offset[1] - offset[0]).to_usize(); + buf.push(b'['); + let mut is_first_row = true; + for _ in 0..length { + if !is_first_row { + buf.push(b','); + } + is_first_row = false; + buf.extend(serializer.next().unwrap()); + } + buf.push(b']'); + } else { + buf.extend(b"null"); + } + }, + vec![], + )) } -fn set_column_by_primitive_type( - rows: &mut [Map], - row_count: usize, - array: &dyn Array, - col_name: &str, -) { - let primitive_arr = array.as_any().downcast_ref::>().unwrap(); +#[inline] +fn utf8_serialize(value: &str, buf: &mut Vec) { + if value.as_bytes().is_ascii() { + buf.reserve(value.len() + 2); + buf.push(b'"'); + buf.extend_from_slice(value.as_bytes()); + buf.push(b'"'); + } else { + // it may contain reserved keywords: perform roundtrip for + // todo: avoid this roundtrip over serde_json + serde_json::to_writer(buf, &Value::String(value.to_string())).unwrap(); + } +} - rows.iter_mut() - .zip(primitive_arr.iter()) - .take(row_count) - .for_each(|(row, value)| { - let value = to_json::(value); - row.insert(col_name.to_string(), value); - }); +fn new_serializer<'a>( + array: &'a dyn Array, +) -> Box + 'a + Send + Sync> { + match array.data_type().to_logical_type() { + DataType::Boolean => boolean_serializer(array.as_any().downcast_ref().unwrap()), + DataType::Int8 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Int16 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Int32 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Int64 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::UInt8 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::UInt16 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::UInt32 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::UInt64 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Float32 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Float64 => primitive_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Utf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::LargeUtf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::Struct(_) => struct_serializer(array.as_any().downcast_ref().unwrap()), + DataType::List(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), + DataType::LargeList(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), + other => todo!("Writing {:?} to JSON", other), + } } -fn set_column_for_json_rows( - rows: &mut [Map], - row_count: usize, - array: &dyn Array, - col_name: &str, +fn serialize_item( + buffer: &mut Vec, + record: &[(&str, &[u8])], + format: F, + is_first_row: bool, ) { - match array.data_type() { - DataType::Null => { - // when value is null, we simply skip setting the key - } - DataType::Boolean => { - let array = array.as_any().downcast_ref::().unwrap(); - rows.iter_mut() - .zip(array.iter()) - .take(row_count) - .for_each(|(row, value)| { - row.insert( - col_name.to_string(), - value.map(Value::Bool).unwrap_or(Value::Null), - ); - }); - } - DataType::Int8 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int16 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Int64 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt8 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt16 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::UInt64 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Float32 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Float64 => set_column_by_primitive_type::(rows, row_count, array, col_name), - DataType::Utf8 => { - let array = array.as_any().downcast_ref::>().unwrap(); - rows.iter_mut() - .zip(array.iter()) - .take(row_count) - .for_each(|(row, value)| { - row.insert( - col_name.to_string(), - value - .map(|x| Value::String(x.to_string())) - .unwrap_or(Value::Null), - ); - }); - } - DataType::LargeUtf8 => { - let array = array.as_any().downcast_ref::>().unwrap(); - rows.iter_mut() - .zip(array.iter()) - .take(row_count) - .for_each(|(row, value)| { - row.insert( - col_name.to_string(), - value - .map(|x| Value::String(x.to_string())) - .unwrap_or(Value::Null), - ); - }); - } - DataType::Struct(_) => { - let array = array.as_any().downcast_ref::().unwrap(); - let inner_objs = struct_array_to_jsonmap_array(array, row_count); - rows.iter_mut() - .take(row_count) - .zip(zip_validity( - inner_objs.into_iter(), - array.validity().map(|v| v.iter()), - )) - .for_each(|(row, obj)| { - row.insert( - col_name.to_string(), - obj.map(Value::Object).unwrap_or(Value::Null), - ); - }); - } - DataType::List(_) => { - let array = array.as_any().downcast_ref::>().unwrap(); - rows.iter_mut() - .zip(array.iter()) - .take(row_count) - .for_each(|(row, value)| { - row.insert( - col_name.to_string(), - value - .map(|x| write_array(x.as_ref())) - .unwrap_or(Value::Null), - ); - }); - } - DataType::LargeList(_) => { - let array = array.as_any().downcast_ref::>().unwrap(); - rows.iter_mut() - .zip(array.iter()) - .take(row_count) - .for_each(|(row, value)| { - row.insert( - col_name.to_string(), - value - .map(|x| write_array(x.as_ref())) - .unwrap_or(Value::Null), - ); - }); - } - _ => { - panic!("Unsupported datatype: {:#?}", array.data_type()); + format.start_row(buffer, is_first_row).unwrap(); + buffer.push(b'{'); + let mut first_item = true; + for (key, value) in record { + if !first_item { + buffer.push(b','); } + first_item = false; + utf8_serialize(key, buffer); + buffer.push(b':'); + buffer.extend(*value); } + buffer.push(b'}'); + format.end_row(buffer).unwrap(); } -/// Serializes a [`RecordBatch`] into Json -/// # Example -/// ``` -/// use std::sync::Arc; -/// use arrow2::array::PrimitiveArray; -/// use arrow2::datatypes::{DataType, Field, Schema}; -/// use arrow2::io::json; -/// use arrow2::record_batch::RecordBatch; -/// -/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); -/// let a = PrimitiveArray::from_slice([1i32, 2, 3]); -/// let batch = RecordBatch::try_new(schema, vec![Arc::new(a)]).unwrap(); -/// -/// let json_rows = json::write_record_batches(&[batch]); -/// assert_eq!( -/// serde_json::Value::Object(json_rows[1].clone()), -/// serde_json::json!({"a": 2}), -/// ); -/// ``` -pub fn write_record_batches(batches: &[RecordBatch]) -> Vec> { - let mut rows: Vec> = std::iter::repeat(Map::new()) - .take(batches.iter().map(|b| b.num_rows()).sum()) +/// Serializes a (name, array) to a valid JSON to `buffer` +/// This is CPU-bounded +pub fn serialize(names: &[N], arrays: &[A], format: F, buffer: &mut Vec) +where + N: AsRef, + A: AsRef, + F: JsonFormat, +{ + let num_rows = arrays[0].as_ref().len(); + + let mut serializers: Vec<_> = arrays + .iter() + .map(|array| new_serializer(array.as_ref())) .collect(); - if !rows.is_empty() { - let schema = batches[0].schema(); - let mut base = 0; - batches.iter().for_each(|batch| { - let row_count = batch.num_rows(); - batch.columns().iter().enumerate().for_each(|(j, col)| { - let col_name = schema.field(j).name(); - set_column_for_json_rows(&mut rows[base..], row_count, col.as_ref(), col_name); + let mut is_first_row = true; + (0..num_rows).for_each(|_| { + let mut record: Vec<(&str, &[u8])> = Default::default(); + serializers + .iter_mut() + .zip(names.iter()) + // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` + .for_each(|(iter, name)| { + let item = iter.next().unwrap(); + record.push((name.as_ref(), item)); }); - base += row_count; - }); - } - rows + serialize_item(buffer, &record, format, false); + is_first_row = false; + }) } diff --git a/src/io/json/write/writer.rs b/src/io/json/write/writer.rs deleted file mode 100644 index af4aac17a5b..00000000000 --- a/src/io/json/write/writer.rs +++ /dev/null @@ -1,266 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! # JSON Writer -//! -//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of -//! JSON objects or JSON formatted byte streams. -//! -//! ## Writing JSON Objects -//! -//! To serialize [`RecordBatch`]es into array of -//! [JSON](https://docs.serde.rs/serde_json/) objects, use -//! [`write_record_batches`]: -//! -//! ``` -//! use std::sync::Arc; -//! -//! use arrow2::array::Int32Array; -//! use arrow2::datatypes::{DataType, Field, Schema}; -//! use arrow2::io::json; -//! use arrow2::record_batch::RecordBatch; -//! -//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); -//! let a = Int32Array::from_slice(&[1, 2, 3]); -//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); -//! -//! let json_rows = json::write_record_batches(&[batch]); -//! assert_eq!( -//! serde_json::Value::Object(json_rows[1].clone()), -//! serde_json::json!({"a": 2}), -//! ); -//! ``` -//! -//! ## Writing JSON formatted byte streams -//! -//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use -//! [`LineDelimitedWriter`]: -//! -//! ``` -//! use std::sync::Arc; -//! -//! use arrow2::array::Int32Array; -//! use arrow2::datatypes::{DataType, Field, Schema}; -//! use arrow2::io::json; -//! use arrow2::record_batch::RecordBatch; -//! -//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); -//! let a = Int32Array::from_slice(&[1, 2, 3]); -//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); -//! -//! // Write the record batch out as JSON -//! let buf = Vec::new(); -//! let mut writer = json::LineDelimitedWriter::new(buf); -//! writer.write_batches(&vec![batch]).unwrap(); -//! writer.finish().unwrap(); -//! -//! // Get the underlying buffer back, -//! let buf = writer.into_inner(); -//! assert_eq!(r#"{"a":1} -//! {"a":2} -//! {"a":3} -//!"#, String::from_utf8(buf).unwrap()) -//! ``` -//! -//! To serialize [`RecordBatch`]es into a well formed JSON array, use -//! [`ArrayWriter`]: -//! -//! ``` -//! use std::sync::Arc; -//! -//! use arrow2::array::Int32Array; -//! use arrow2::datatypes::{DataType, Field, Schema}; -//! use arrow2::io::json; -//! use arrow2::record_batch::RecordBatch; -//! -//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); -//! let a = Int32Array::from_slice(&[1, 2, 3]); -//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); -//! -//! // Write the record batch out as a JSON array -//! let buf = Vec::new(); -//! let mut writer = json::ArrayWriter::new(buf); -//! writer.write_batches(&vec![batch]).unwrap(); -//! writer.finish().unwrap(); -//! -//! // Get the underlying buffer back, -//! let buf = writer.into_inner(); -//! assert_eq!(r#"[{"a":1},{"a":2},{"a":3}]"#, String::from_utf8(buf).unwrap()) -//! ``` - -use std::{fmt::Debug, io::Write}; - -use serde_json::Value; - -use crate::error::Result; -use crate::record_batch::RecordBatch; - -use super::write_record_batches; - -/// This trait defines how to format a sequence of JSON objects to a -/// byte stream. -pub trait JsonFormat: Debug + Default { - #[inline] - /// write any bytes needed at the start of the file to the writer - fn start_stream(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } - - #[inline] - /// write any bytes needed for the start of each row - fn start_row(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> { - Ok(()) - } - - #[inline] - /// write any bytes needed for the end of each row - fn end_row(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } - - /// write any bytes needed for the start of each row - fn end_stream(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } -} - -/// Produces JSON output with one record per line. For example -/// -/// ```json -/// {"foo":1} -/// {"bar":1} -/// -/// ``` -#[derive(Debug, Default)] -pub struct LineDelimited {} - -impl JsonFormat for LineDelimited { - fn end_row(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"\n")?; - Ok(()) - } -} - -/// Produces JSON output as a single JSON array. For example -/// -/// ```json -/// [{"foo":1},{"bar":1}] -/// ``` -#[derive(Debug, Default)] -pub struct JsonArray {} - -impl JsonFormat for JsonArray { - fn start_stream(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"[")?; - Ok(()) - } - - fn start_row(&self, writer: &mut W, is_first_row: bool) -> Result<()> { - if !is_first_row { - writer.write_all(b",")?; - } - Ok(()) - } - - fn end_stream(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"]")?; - Ok(()) - } -} - -/// A JSON writer which serializes [`RecordBatch`]es to newline delimited JSON objects -pub type LineDelimitedWriter = Writer; - -/// A JSON writer which serializes [`RecordBatch`]es to JSON arrays -pub type ArrayWriter = Writer; - -/// A JSON writer which serializes [`RecordBatch`]es to a stream of -/// `u8` encoded JSON objects. See the module level documentation for -/// detailed usage and examples. The specific format of the stream is -/// controlled by the [`JsonFormat`] type parameter. -#[derive(Debug)] -pub struct Writer -where - W: Write, - F: JsonFormat, -{ - /// Underlying writer to use to write bytes - writer: W, - - /// Has the writer output any records yet? - started: bool, - - /// Is the writer finished? - finished: bool, - - /// Determines how the byte stream is formatted - format: F, -} - -impl Writer -where - W: Write, - F: JsonFormat, -{ - /// Construct a new writer - pub fn new(writer: W) -> Self { - Self { - writer, - started: false, - finished: false, - format: F::default(), - } - } - - /// Write a single JSON row to the output writer - pub fn write_row(&mut self, row: &Value) -> Result<()> { - let is_first_row = !self.started; - if !self.started { - self.format.start_stream(&mut self.writer)?; - self.started = true; - } - - self.format.start_row(&mut self.writer, is_first_row)?; - self.writer.write_all(&serde_json::to_vec(row)?)?; - self.format.end_row(&mut self.writer)?; - Ok(()) - } - - /// Convert the [`RecordBatch`] into JSON rows, and write them to the output - pub fn write_batches(&mut self, batches: &[RecordBatch]) -> Result<()> { - for row in write_record_batches(batches) { - self.write_row(&Value::Object(row))?; - } - Ok(()) - } - - /// Finishes the output stream. This function must be called after - /// all record batches have been produced. (e.g. producing the final `']'` if writing - /// arrays. - pub fn finish(&mut self) -> Result<()> { - if self.started && !self.finished { - self.format.end_stream(&mut self.writer)?; - self.finished = true; - } - Ok(()) - } - - /// Unwraps this `Writer`, returning the underlying writer - pub fn into_inner(self) -> W { - self.writer - } -} diff --git a/src/io/mod.rs b/src/io/mod.rs index e406de9847c..7591b655200 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -43,5 +43,5 @@ pub mod avro; #[cfg_attr(docsrs, doc(cfg(feature = "io_print")))] pub mod print; -#[cfg(any(feature = "io_csv_write", feature = "io_avro"))] +#[cfg(any(feature = "io_csv_write", feature = "io_avro", feature = "io_json"))] mod iterator; diff --git a/src/util/bench_util.rs b/src/util/bench_util.rs index d463f764314..3ba63ce37be 100644 --- a/src/util/bench_util.rs +++ b/src/util/bench_util.rs @@ -3,7 +3,6 @@ use rand::distributions::{Alphanumeric, Distribution, Standard}; use rand::{rngs::StdRng, Rng, SeedableRng}; -use crate::types::NaturalDataType; use crate::{array::*, types::NativeType}; /// Returns fixed seedable RNG @@ -14,7 +13,7 @@ pub fn seedable_rng() -> StdRng { /// Creates an random (but fixed-seeded) array of a given size and null density pub fn create_primitive_array(size: usize, null_density: f32) -> PrimitiveArray where - T: NativeType + NaturalDataType, + T: NativeType, Standard: Distribution, { let mut rng = seedable_rng(); @@ -37,7 +36,7 @@ pub fn create_primitive_array_with_seed( seed: u64, ) -> PrimitiveArray where - T: NativeType + NaturalDataType, + T: NativeType, Standard: Distribution, { let mut rng = StdRng::seed_from_u64(seed); diff --git a/src/util/mod.rs b/src/util/mod.rs index 6d94ce547b0..c88c76e6da4 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -4,6 +4,7 @@ feature = "compute", feature = "io_csv_write", feature = "io_csv_read", + feature = "io_json", feature = "compute_cast" ))] mod lexical; @@ -11,6 +12,7 @@ mod lexical; feature = "compute", feature = "io_csv_write", feature = "io_csv_read", + feature = "io_json", feature = "compute_cast" ))] pub use lexical::*; diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index a4988bdb6f3..0ebb6d9fbff 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -10,7 +10,7 @@ use arrow2::array::*; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::json::read as json_read; -use arrow2::io::json::LineDelimitedWriter; +use arrow2::io::json::write as json_write; use arrow2::record_batch::RecordBatch; fn read_batch(data: String, fields: Vec) -> Result { @@ -22,6 +22,18 @@ fn read_batch(data: String, fields: Vec) -> Result { json_read::deserialize(rows, fields) } +fn write_batch(batch: RecordBatch) -> Result> { + let format = json_write::LineDelimited::default(); + + let batches = vec![Ok(batch)].into_iter(); + + let blocks = json_write::Serializer::new(batches, vec![], format); + + let mut buf = Vec::new(); + json_write::write(&mut buf, format, blocks)?; + Ok(buf) +} + fn round_trip(data: String) -> Result<()> { let mut reader = Cursor::new(data); let fields = json_read::infer(&mut reader, None)?; @@ -29,13 +41,10 @@ fn round_trip(data: String) -> Result<()> { let batch = read_batch(data.clone(), fields)?; - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; let result = String::from_utf8(buf).unwrap(); + println!("{}", result); for (r, e) in result.lines().zip(data.lines()) { let mut result_json = serde_json::from_str::(r).unwrap(); let expected_json = serde_json::from_str::(e).unwrap(); diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 6baeee60f54..9bbec6dd155 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -5,12 +5,14 @@ use arrow2::{ bitmap::Bitmap, buffer::Buffer, datatypes::{DataType, Field, Schema}, - io::json::LineDelimitedWriter, + error::Result, record_batch::RecordBatch, }; +use super::*; + #[test] -fn write_simple_rows() { +fn write_simple_rows() -> Result<()> { let schema = Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Utf8, false), @@ -21,11 +23,7 @@ fn write_simple_rows() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -36,10 +34,11 @@ fn write_simple_rows() { {"c1":5,"c2":null} "# ); + Ok(()) } #[test] -fn write_nested_struct_with_validity() { +fn write_nested_struct_with_validity() -> Result<()> { let inner = vec![ Field::new("c121", DataType::Utf8, false), Field::new("c122", DataType::Int32, false), @@ -72,11 +71,7 @@ fn write_nested_struct_with_validity() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -85,10 +80,11 @@ fn write_nested_struct_with_validity() { {"c1":null,"c2":"c"} "# ); + Ok(()) } #[test] -fn write_nested_structs() { +fn write_nested_structs() -> Result<()> { let c121 = Field::new("c121", DataType::Utf8, false); let fields = vec![ Field::new("c11", DataType::Int32, false), @@ -120,11 +116,7 @@ fn write_nested_structs() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -133,10 +125,11 @@ fn write_nested_structs() { {"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} "# ); + Ok(()) } #[test] -fn write_struct_with_list_field() { +fn write_struct_with_list_field() -> Result<()> { let list_datatype = DataType::List(Box::new(Field::new("c_list", DataType::Utf8, false))); let field_c1 = Field::new("c1", list_datatype, false); let field_c2 = Field::new("c2", DataType::Int32, false); @@ -160,11 +153,7 @@ fn write_struct_with_list_field() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -175,10 +164,11 @@ fn write_struct_with_list_field() { {"c1":["e"],"c2":5} "# ); + Ok(()) } #[test] -fn write_nested_list() { +fn write_nested_list() -> Result<()> { let list_inner = DataType::List(Box::new(Field::new("b", DataType::Int32, false))); let list_datatype = DataType::List(Box::new(Field::new("a", list_inner, false))); let field_c1 = Field::new("c1", list_datatype, true); @@ -209,11 +199,7 @@ fn write_nested_list() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -222,10 +208,11 @@ fn write_nested_list() { {"c1":[[4,5,6]],"c2":null} "# ); + Ok(()) } #[test] -fn write_list_of_struct() { +fn write_list_of_struct() -> Result<()> { let inner = vec![Field::new("c121", DataType::Utf8, false)]; let fields = vec![ Field::new("c11", DataType::Int32, false), @@ -272,11 +259,7 @@ fn write_list_of_struct() { let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); - let mut buf = Vec::new(); - { - let mut writer = LineDelimitedWriter::new(&mut buf); - writer.write_batches(&[batch]).unwrap(); - } + let buf = write_batch(batch)?; assert_eq!( String::from_utf8(buf).unwrap(), @@ -285,4 +268,21 @@ fn write_list_of_struct() { {"c1":[null],"c2":3} "# ); + Ok(()) +} + +#[test] +fn write_escaped_utf8() -> Result<()> { + let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]); + let a = Utf8Array::::from(&vec![Some("a\na"), None]); + + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + let buf = write_batch(batch)?; + + assert_eq!( + String::from_utf8(buf).unwrap().as_bytes(), + b"{\"c1\":\"a\na\"}\n{\"c1\":null}\n" + ); + Ok(()) }