diff --git a/benches/write_json.rs b/benches/write_json.rs index 8f2df8411bd..9f52140303c 100644 --- a/benches/write_json.rs +++ b/benches/write_json.rs @@ -1,55 +1,44 @@ -use std::sync::Arc; - use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::chunk::Chunk; -use arrow2::error::Result; +use arrow2::error::ArrowError; use arrow2::io::json::write; use arrow2::util::bench_util::*; -fn write_batch(columns: &Chunk>) -> Result<()> { +fn write_array(array: Box) -> Result<(), ArrowError> { let mut writer = vec![]; - let format = write::Format::Json; - let batches = vec![Ok(columns.clone())].into_iter(); + let arrays = vec![Ok(array)].into_iter(); - // Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded) - let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format); + // Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded) + let blocks = write::Serializer::new(arrays, vec![]); // the operation of writing is IO-bounded. - write::write(&mut writer, format, blocks)?; + write::write(&mut writer, blocks)?; Ok(()) } -fn make_chunk(array: impl Array + 'static) -> Chunk> { - Chunk::new(vec![Arc::new(array) as Arc]) -} - fn add_benchmark(c: &mut Criterion) { (10..=18).step_by(2).for_each(|log2_size| { let size = 2usize.pow(log2_size); let array = create_primitive_array::(size, 0.1); - let columns = make_chunk(array); c.bench_function(&format!("json write i32 2^{}", log2_size), |b| { - b.iter(|| write_batch(&columns)) + b.iter(|| write_array(Box::new(array.clone()))) }); let array = create_string_array::(size, 100, 0.1, 42); - let columns = make_chunk(array); c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| { - b.iter(|| write_batch(&columns)) + b.iter(|| write_array(Box::new(array.clone()))) }); let array = create_primitive_array::(size, 0.1); - let columns = make_chunk(array); c.bench_function(&format!("json write f64 2^{}", log2_size), |b| { - b.iter(|| write_batch(&columns)) + b.iter(|| write_array(Box::new(array.clone()))) }); }); } diff --git a/examples/json_read.rs b/examples/json_read.rs index b6a25a74ef6..1630aa65047 100644 --- a/examples/json_read.rs +++ b/examples/json_read.rs @@ -3,23 +3,17 @@ use std::io::BufReader; use std::sync::Arc; use arrow2::array::Array; -use arrow2::error::{ArrowError, Result}; +use arrow2::error::Result; use arrow2::io::json::read; fn read_path(path: &str) -> Result> { // Example of reading a JSON file. let reader = BufReader::new(File::open(path)?); - let data = serde_json::from_reader(reader)?; + let json = serde_json::from_reader(reader)?; - let values = if let serde_json::Value::Array(values) = data { - Ok(values) - } else { - Err(ArrowError::InvalidArgumentError("".to_string())) - }?; + let data_type = read::infer(&json)?; - let data_type = read::infer_rows(&values)?; - - Ok(read::deserialize_json(&values, data_type)) + read::deserialize(&json, data_type) } fn main() -> Result<()> { diff --git a/examples/json_write.rs b/examples/json_write.rs index 086d18022c3..fc5191de51a 100644 --- a/examples/json_write.rs +++ b/examples/json_write.rs @@ -1,42 +1,32 @@ use std::fs::File; -use std::sync::Arc; use arrow2::{ array::{Array, Int32Array}, - chunk::Chunk, - error::Result, + error::ArrowError, io::json::write, }; -fn write_batches(path: &str, names: Vec, batches: &[Chunk>]) -> Result<()> { +fn write_array(path: &str, array: Box) -> Result<(), ArrowError> { let mut writer = File::create(path)?; - let format = write::Format::Json; - let batches = batches.iter().cloned().map(Ok); + let arrays = vec![Ok(array)].into_iter(); - // Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded) - let blocks = write::Serializer::new(batches, names, vec![], format); + // Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded) + let blocks = write::Serializer::new(arrays, vec![]); // the operation of writing is IO-bounded. - write::write(&mut writer, format, blocks)?; + write::write(&mut writer, blocks)?; Ok(()) } -fn main() -> Result<()> { - let array = Arc::new(Int32Array::from(&[ - Some(0), - None, - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - ])) as Arc; - - write_batches( - "example.json", - vec!["c1".to_string()], - &[Chunk::new(vec![array.clone()]), Chunk::new(vec![array])], - ) +fn main() -> Result<(), ArrowError> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + + let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]); + + write_array(file_path, Box::new(array)) } diff --git a/examples/ndjson_read.rs b/examples/ndjson_read.rs index 1df6e2b6e59..0490b23e2f1 100644 --- a/examples/ndjson_read.rs +++ b/examples/ndjson_read.rs @@ -1,48 +1,40 @@ use std::fs::File; -use std::io::BufReader; +use std::io::{BufReader, Seek}; use std::sync::Arc; use arrow2::array::Array; -use arrow2::chunk::Chunk; use arrow2::error::Result; -use arrow2::io::json::read; +use arrow2::io::ndjson::read; +use arrow2::io::ndjson::read::FallibleStreamingIterator; -fn read_path(path: &str, projection: Option>) -> Result>> { - // Example of reading a NDJSON file. +fn read_path(path: &str) -> Result>> { + let batch_size = 1024; // number of rows per array let mut reader = BufReader::new(File::open(path)?); - let fields = read::infer_and_reset(&mut reader, None)?; - - let fields = if let Some(projection) = projection { - fields - .into_iter() - .filter(|field| projection.contains(&field.name.as_ref())) - .collect() - } else { - fields - }; - - // at most 1024 rows. This container can be re-used across batches. - let mut rows = vec![String::default(); 1024]; - - // Reads up to 1024 rows. - // this is IO-intensive and performs minimal CPU work. In particular, - // no deserialization is performed. - let read = read::read_rows(&mut reader, &mut rows)?; - let rows = &rows[..read]; - - // deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO, - // and can be performed on a different thread pool via a channel. - read::deserialize(rows, &fields) + let data_type = read::infer(&mut reader, None)?; + reader.rewind()?; + + let mut reader = read::FileReader::new(reader, vec!["".to_string(); batch_size], None); + + let mut arrays = vec![]; + // `next` is IO-bounded + while let Some(rows) = reader.next()? { + // `deserialize` is CPU-bounded + let array = read::deserialize(rows, data_type.clone())?; + arrays.push(array); + } + + Ok(arrays) } fn main() -> Result<()> { + // Example of reading a NDJSON file from a path use std::env; let args: Vec = env::args().collect(); let file_path = &args[1]; - let batch = read_path(file_path, None)?; - println!("{:#?}", batch); + let arrays = read_path(file_path)?; + println!("{:#?}", arrays); Ok(()) } diff --git a/examples/ndjson_write.rs b/examples/ndjson_write.rs new file mode 100644 index 00000000000..91a0e1a9ed7 --- /dev/null +++ b/examples/ndjson_write.rs @@ -0,0 +1,35 @@ +use std::fs::File; + +use arrow2::array::{Array, Int32Array}; +use arrow2::error::Result; +use arrow2::io::ndjson::write; + +fn write_path(path: &str, array: Box) -> Result<()> { + let writer = File::create(path)?; + + let serializer = write::Serializer::new(vec![Ok(array)].into_iter(), vec![]); + + let mut writer = write::FileWriter::new(writer, serializer); + writer.by_ref().collect::>() +} + +fn main() -> Result<()> { + // Example of reading a NDJSON file from a path + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + + let array = Box::new(Int32Array::from(&[ + Some(0), + None, + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ])); + + write_path(file_path, array)?; + Ok(()) +} diff --git a/guide/src/README.md b/guide/src/README.md index 3042526a2e8..de2cd418bea 100644 --- a/guide/src/README.md +++ b/guide/src/README.md @@ -5,8 +5,10 @@ interoperability with the arrow format. The typical use-case for this library is to perform CPU and memory-intensive analytics in a format that supports heterogeneous data structures, null values, and IPC and FFI interfaces across languages. -Arrow2 is divided into three main parts: +Arrow2 is divided into 5 main parts: * a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions; * a [high-level API](./high_level.md) to operate with arrow arrays; -* a [metadata API](./metadata.md) to declare and operate with logical types and metadata. +* a [metadata API](./metadata.md) to declare and operate with logical types and metadata; +* a [compute API](./compute.md) with operators to operate over arrays; +* an [IO API](./io/README.md) with interfaces to read from, and write to, other formats. diff --git a/guide/src/io/json_read.md b/guide/src/io/json_read.md index cd3a19f4b22..41c7a71d77d 100644 --- a/guide/src/io/json_read.md +++ b/guide/src/io/json_read.md @@ -14,3 +14,16 @@ This crate also supports reading JSON, at the expense of being unable to read th ```rust {{#include ../../../examples/json_read.rs}} ``` + +## Metadata and inference + +This crate uses the following mapping between Arrow's data type and JSON: + +| `JSON` | `DataType` | +| ------ | ---------- | +| Bool | Boolean | +| Int | Int64 | +| Float | Float64 | +| String | Utf8 | +| List | List | +| Object | Struct | diff --git a/guide/src/io/json_write.md b/guide/src/io/json_write.md index b237e014eca..2483a056d00 100644 --- a/guide/src/io/json_write.md +++ b/guide/src/io/json_write.md @@ -1,8 +1,14 @@ # Write JSON -When compiled with feature `io_json`, you can use this crate to write JSON files. -The following example writes a batch as a JSON file: +When compiled with feature `io_json`, you can use this crate to write JSON. +The following example writes an array to JSON: ```rust {{#include ../../../examples/json_write.rs}} ``` + +Likewise, you can also use it to write to NDJSON: + +```rust +{{#include ../../../examples/ndjson_write.rs}} +``` diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 4a1b2255b29..7f5844ab1a3 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -10,8 +10,7 @@ use serde_json::Value; use crate::{ array::*, bitmap::MutableBitmap, - chunk::Chunk, - datatypes::{DataType, Field, IntervalUnit}, + datatypes::{DataType, IntervalUnit}, error::ArrowError, types::NativeType, }; @@ -203,7 +202,7 @@ fn deserialize_dictionary>( DictionaryArray::::from_data(keys, values) } -fn _deserialize>(rows: &[A], data_type: DataType) -> Arc { +pub(crate) fn _deserialize>(rows: &[A], data_type: DataType) -> Arc { match &data_type { DataType::Null => Arc::new(NullArray::from_data(data_type, rows.len())), DataType::Boolean => Arc::new(deserialize_boolean(rows)), @@ -251,30 +250,20 @@ fn _deserialize>(rows: &[A], data_type: DataType) -> Arc>( - rows: &[A], - fields: &[Field], -) -> Result>, ArrowError> { - let data_type = DataType::Struct(fields.to_vec()); - - // convert rows to `Value` - let rows = rows - .iter() - .map(|row| { - let row: Value = serde_json::from_str(row.as_ref()).map_err(ArrowError::from)?; - Ok(row) - }) - .collect::, ArrowError>>()?; - - let (_, columns, _) = deserialize_struct(&rows, data_type).into_data(); - Ok(Chunk::new(columns)) -} - -/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`]. -/// -/// This function allows consuming deserialized JSON to Arrow. -pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc { - _deserialize(rows, data_type) +/// # Error +/// This function errors iff either: +/// * `json` is not a [`Value::Array`] +/// * `data_type` is neither [`DataType::List`] nor [`DataType::LargeList`] +pub fn deserialize(json: &Value, data_type: DataType) -> Result, ArrowError> { + match json { + Value::Array(rows) => match data_type { + DataType::List(inner) | DataType::LargeList(inner) => { + Ok(_deserialize(rows, inner.data_type)) + } + _ => Err(ArrowError::nyi("read an Array from a non-Array data type")), + }, + _ => Err(ArrowError::nyi("read an Array from a non-Array JSON")), + } } diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index b8fb01038c9..0bfbaeb31c1 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -1,143 +1,64 @@ use std::borrow::Borrow; -use std::io::{BufRead, Seek, SeekFrom}; use indexmap::map::IndexMap as HashMap; use indexmap::set::IndexSet as HashSet; use serde_json::Value; use crate::datatypes::*; -use crate::error::{ArrowError, Result}; - -use super::iterator::ValueIter; - -type Tracker = HashMap>; +use crate::error::Result; const ITEM_NAME: &str = "item"; -/// Infers the fields of a JSON file by reading the first `number_of_rows` rows. -/// # Examples -/// ``` -/// use std::io::Cursor; -/// use arrow2::io::json::read::infer; -/// -/// let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} -/// {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} -/// {"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} -/// {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} -/// "#; -/// -/// // file's cursor's offset at 0 -/// let mut reader = Cursor::new(data); -/// let fields = infer(&mut reader, None).unwrap(); -/// ``` -pub fn infer(reader: &mut R, number_of_rows: Option) -> Result> { - infer_iterator(ValueIter::new(reader, number_of_rows)) -} - -/// Infer [`Field`]s from an iterator of [`Value`]. -pub fn infer_iterator(value_iter: I) -> Result> -where - I: Iterator>, - A: Borrow, -{ - let mut values: Tracker = Tracker::new(); - - for record in value_iter { - match record?.borrow() { - Value::Object(map) => map.iter().try_for_each(|(k, v)| { - let data_type = infer_value(v)?; - add_or_insert(&mut values, k, data_type); - Result::Ok(()) - }), - value => Err(ArrowError::ExternalFormat(format!( - "Expected JSON record to be an object, found {:?}", - value - ))), - }?; - } - - Ok(resolve_fields(values)) -} - -/// Infer the fields of a JSON file from `number_of_rows` in `reader`. -/// -/// This function seeks back to the start of the `reader`. -/// -/// # Examples -/// ``` -/// use std::fs::File; -/// use std::io::Cursor; -/// use arrow2::io::json::read::infer_and_reset; -/// -/// let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} -/// {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} -/// {"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} -/// {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} -/// "#; -/// let mut reader = Cursor::new(data); -/// let fields = infer_and_reset(&mut reader, None).unwrap(); -/// // cursor's position automatically set at 0 -/// ``` -pub fn infer_and_reset( - reader: &mut R, - number_of_rows: Option, -) -> Result> { - let fields = infer(reader, number_of_rows); - reader.seek(SeekFrom::Start(0))?; - fields -} - -fn infer_value(value: &Value) -> Result { - Ok(match value { +/// Infers [`DataType`] from [`Value`]. +pub fn infer(json: &Value) -> Result { + Ok(match json { Value::Bool(_) => DataType::Boolean, Value::Array(array) => infer_array(array)?, Value::Null => DataType::Null, Value::Number(number) => infer_number(number), Value::String(_) => DataType::Utf8, - Value::Object(inner) => { - let fields = inner - .iter() - .map(|(key, value)| infer_value(value).map(|dt| Field::new(key, dt, true))) - .collect::>>()?; - DataType::Struct(fields) - } + Value::Object(inner) => infer_object(inner)?, }) } -/// Infers a [`DataType`] from a list of JSON values -pub fn infer_rows(rows: &[Value]) -> Result { - let types = rows.iter().map(|a| { - Ok(match a { - Value::Null => None, - Value::Number(n) => Some(infer_number(n)), - Value::Bool(_) => Some(DataType::Boolean), - Value::String(_) => Some(DataType::Utf8), - Value::Array(array) => Some(infer_array(array)?), - Value::Object(inner) => { - let fields = inner - .iter() - .map(|(key, value)| infer_value(value).map(|dt| Field::new(key, dt, true))) - .collect::>>()?; - Some(DataType::Struct(fields)) - } +fn filter_map_nulls(dt: DataType) -> Option { + if dt == DataType::Null { + None + } else { + Some(dt) + } +} + +fn infer_object(inner: &serde_json::Map) -> Result { + let fields = inner + .iter() + .filter_map(|(key, value)| { + infer(value) + .map(|dt| filter_map_nulls(dt).map(|dt| (key, dt))) + .transpose() + }) + .map(|maybe_dt| { + let (key, dt) = maybe_dt?; + Ok(Field::new(key, dt, true)) }) - }); - // discard None values and deduplicate entries - let types = types - .into_iter() - .filter_map(|x| x.transpose()) + .collect::>>()?; + Ok(DataType::Struct(fields)) +} + +fn infer_array(values: &[Value]) -> Result { + let types = values + .iter() + .map(infer) + .filter_map(|x| x.map(filter_map_nulls).transpose()) + // deduplicate entries .collect::>>()?; - Ok(if !types.is_empty() { + let dt = if !types.is_empty() { let types = types.into_iter().collect::>(); coerce_data_type(&types) } else { DataType::Null - }) -} - -fn infer_array(values: &[Value]) -> Result { - let dt = infer_rows(values)?; + }; // if a record contains only nulls, it is not // added to values @@ -156,36 +77,12 @@ fn infer_number(n: &serde_json::Number) -> DataType { } } -fn add_or_insert(values: &mut Tracker, key: &str, data_type: DataType) { - if data_type == DataType::Null { - return; - } - if values.contains_key(key) { - let x = values.get_mut(key).unwrap(); - x.insert(data_type); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(data_type); - values.insert(key.to_string(), hs); - } -} - -fn resolve_fields(spec: HashMap>) -> Vec { - spec.iter() - .map(|(k, hs)| { - let v: Vec<&DataType> = hs.iter().collect(); - Field::new(k, coerce_data_type(&v), true) - }) - .collect() -} - /// Coerce an heterogeneous set of [`DataType`] into a single one. Rules: /// * `Int64` and `Float64` are `Float64` /// * Lists and scalars are coerced to a list of a compatible scalar /// * Structs contain the union of all fields /// * All other types are coerced to `Utf8` -fn coerce_data_type>(datatypes: &[A]) -> DataType { +pub(crate) fn coerce_data_type>(datatypes: &[A]) -> DataType { use DataType::*; let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow()); @@ -206,14 +103,16 @@ fn coerce_data_type>(datatypes: &[A]) -> DataType { }); // group fields by unique let fields = fields.iter().fold( - HashMap::<&String, Vec<&DataType>>::new(), + HashMap::<&String, HashSet<&DataType>>::new(), |mut acc, field| { match acc.entry(&field.name) { indexmap::map::Entry::Occupied(mut v) => { - v.get_mut().push(&field.data_type); + v.get_mut().insert(&field.data_type); } indexmap::map::Entry::Vacant(v) => { - v.insert(vec![&field.data_type]); + let mut a = HashSet::new(); + a.insert(&field.data_type); + v.insert(a); } } acc @@ -222,7 +121,10 @@ fn coerce_data_type>(datatypes: &[A]) -> DataType { // and finally, coerce each of the fields within the same name let fields = fields .into_iter() - .map(|(name, dts)| Field::new(name, coerce_data_type(&dts), true)) + .map(|(name, dts)| { + let dts = dts.into_iter().collect::>(); + Field::new(name, coerce_data_type(&dts), true) + }) .collect(); return Struct(fields); } else if datatypes.len() > 2 { diff --git a/src/io/json/read/iterator.rs b/src/io/json/read/iterator.rs deleted file mode 100644 index 7670fa32e8f..00000000000 --- a/src/io/json/read/iterator.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::io::BufRead; - -use serde_json::Value; - -use crate::error::{ArrowError, Result}; - -#[derive(Debug)] -pub struct ValueIter<'a, R: BufRead> { - reader: &'a mut R, - remaining: usize, - // reuse line buffer to avoid allocation on each record - line_buf: String, -} - -impl<'a, R: BufRead> ValueIter<'a, R> { - pub fn new(reader: &'a mut R, number_of_rows: Option) -> Self { - Self { - reader, - remaining: number_of_rows.unwrap_or(usize::MAX), - line_buf: String::new(), - } - } -} - -impl<'a, R: BufRead> Iterator for ValueIter<'a, R> { - type Item = Result; - - fn next(&mut self) -> Option { - if self.remaining == 0 { - return None; - } - - loop { - self.line_buf.truncate(0); - match self.reader.read_line(&mut self.line_buf) { - Ok(0) => { - // read_line returns 0 when stream reached EOF - return None; - } - Err(e) => { - return Some(Err(ArrowError::from(e))); - } - _ => { - let trimmed_s = self.line_buf.trim(); - if trimmed_s.is_empty() { - // ignore empty lines - continue; - } - - self.remaining -= 1; - return Some(serde_json::from_str(trimmed_s).map_err(ArrowError::from)); - } - } - } - } -} diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index 741f375480d..908da67b51f 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -1,34 +1,8 @@ //! APIs to read and deserialize from JSON mod deserialize; mod infer_schema; -mod iterator; -use crate::error::{ArrowError, Result}; - -pub use deserialize::{deserialize, deserialize_json}; -pub use infer_schema::*; - -/// Reads rows from `reader` into `rows`. Returns the number of read items. -/// IO-bounded. -pub fn read_rows(reader: &mut R, rows: &mut [String]) -> Result { - let mut row_number = 0; - for row in rows.iter_mut() { - loop { - row.truncate(0); - let _ = reader.read_line(row).map_err(|e| { - ArrowError::External(format!(" at line {}", row_number), Box::new(e)) - })?; - if row.is_empty() { - break; - } - if !row.trim().is_empty() { - break; - } - } - if row.is_empty() { - break; - } - row_number += 1; - } - Ok(row_number) -} +pub(crate) use deserialize::_deserialize; +pub use deserialize::deserialize; +pub(crate) use infer_schema::coerce_data_type; +pub use infer_schema::infer; diff --git a/src/io/json/write/format.rs b/src/io/json/write/format.rs deleted file mode 100644 index 66d85100d3e..00000000000 --- a/src/io/json/write/format.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::{fmt::Debug, io::Write}; - -use crate::error::Result; - -/// Trait defining how to format a sequence of JSON objects to a byte stream. -pub trait JsonFormat: Debug + Default + Copy { - #[inline] - /// write any bytes needed at the start of the file to the writer - fn start_stream(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } - - #[inline] - /// write any bytes needed for the start of each row - fn start_row(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> { - Ok(()) - } - - #[inline] - /// write any bytes needed for the end of each row - fn end_row(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } - - /// write any bytes needed for the start of each row - fn end_stream(&self, _writer: &mut W) -> Result<()> { - Ok(()) - } -} - -/// Produces JSON output with one record per line. For example -/// -/// ```json -/// {"foo":1} -/// {"bar":1} -/// -/// ``` -#[derive(Debug, Default, Clone, Copy)] -pub struct LineDelimited {} - -impl JsonFormat for LineDelimited { - #[inline] - fn end_row(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"\n")?; - Ok(()) - } -} - -/// Produces JSON output as a single JSON array. For example -/// -/// ```json -/// [{"foo":1},{"bar":1}] -/// ``` -#[derive(Debug, Default, Clone, Copy)] -pub struct JsonArray {} - -impl JsonFormat for JsonArray { - #[inline] - fn start_stream(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"[")?; - Ok(()) - } - - #[inline] - fn start_row(&self, writer: &mut W, is_first_row: bool) -> Result<()> { - if !is_first_row { - writer.write_all(b",")?; - } - Ok(()) - } - - #[inline] - fn end_stream(&self, writer: &mut W) -> Result<()> { - writer.write_all(b"]")?; - Ok(()) - } -} diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index b1ac14ee536..ca941a76ebe 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -1,103 +1,50 @@ //! APIs to write to JSON -mod format; mod serialize; pub use fallible_streaming_iterator::*; -pub use serialize::serialize; +pub(crate) use serialize::new_serializer; +use serialize::serialize; -use crate::{ - array::Array, - chunk::Chunk, - error::{ArrowError, Result}, -}; -use format::*; +use crate::{array::Array, error::ArrowError}; -/// The supported variations of JSON supported -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum Format { - /// JSON - Json, - /// NDJSON (http://ndjson.org/) - NewlineDelimitedJson, -} - -fn _write(writer: &mut W, format: F, mut blocks: I) -> Result<()> -where - W: std::io::Write, - F: JsonFormat, - I: FallibleStreamingIterator, -{ - format.start_stream(writer)?; - let mut is_first_row = true; - while let Some(block) = blocks.next()? { - format.start_row(writer, is_first_row)?; - is_first_row = false; - writer.write_all(block)?; - } - format.end_stream(writer)?; - Ok(()) -} - -/// Writes blocks of JSON-encoded data into `writer` according to format [`Format`]. +/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON /// # Implementation -/// This is IO-bounded -pub fn write(writer: &mut W, format: Format, blocks: I) -> Result<()> -where - W: std::io::Write, - I: FallibleStreamingIterator, -{ - match format { - Format::Json => _write(writer, JsonArray::default(), blocks), - Format::NewlineDelimitedJson => _write(writer, LineDelimited::default(), blocks), - } -} - -/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] to bytes. -/// Advancing it is CPU-bounded +/// Advancing this iterator CPU-bounded +#[derive(Debug, Clone)] pub struct Serializer where A: AsRef, - I: Iterator>>, + I: Iterator>, { - batches: I, - names: Vec, + arrays: I, buffer: Vec, - format: Format, } impl Serializer where A: AsRef, - I: Iterator>>, + I: Iterator>, { /// Creates a new [`Serializer`]. - pub fn new(batches: I, names: Vec, buffer: Vec, format: Format) -> Self { - Self { - batches, - names, - buffer, - format, - } + pub fn new(arrays: I, buffer: Vec) -> Self { + Self { arrays, buffer } } } impl FallibleStreamingIterator for Serializer where A: AsRef, - I: Iterator>>, + I: Iterator>, { type Item = [u8]; type Error = ArrowError; - fn advance(&mut self) -> Result<()> { + fn advance(&mut self) -> Result<(), ArrowError> { self.buffer.clear(); - self.batches + self.arrays .next() - .map(|maybe_chunk| { - maybe_chunk - .map(|columns| serialize(&self.names, &columns, self.format, &mut self.buffer)) - }) + .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer))) .transpose()?; Ok(()) } @@ -110,3 +57,22 @@ where } } } + +/// Writes valid JSON from an iterator of (assumed JSON-encoded) bytes to `writer` +pub fn write(writer: &mut W, mut blocks: I) -> Result<(), ArrowError> +where + W: std::io::Write, + I: FallibleStreamingIterator, +{ + writer.write_all(&[b'['])?; + let mut is_first_row = true; + while let Some(block) = blocks.next()? { + if !is_first_row { + writer.write_all(&[b','])?; + } + is_first_row = false; + writer.write_all(block)?; + } + writer.write_all(&[b']'])?; + Ok(()) +} diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index f9d195f036f..3b1f9e087b5 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -4,7 +4,6 @@ use std::io::Write; use streaming_iterator::StreamingIterator; use crate::bitmap::utils::zip_validity; -use crate::chunk::Chunk; use crate::datatypes::TimeUnit; use crate::io::iterator::BufStreamingIterator; use crate::temporal_conversions::{ @@ -14,9 +13,6 @@ use crate::temporal_conversions::{ use crate::util::lexical_to_bytes_mut; use crate::{array::*, datatypes::DataType, types::NativeType}; -use super::format::{JsonArray, JsonFormat, LineDelimited}; -use super::Format; - fn boolean_serializer<'a>( array: &'a BooleanArray, ) -> Box + 'a + Send + Sync> { @@ -95,7 +91,7 @@ fn struct_serializer<'a>( let item = iter.next().unwrap(); record.push((name, item)); }); - serialize_item(buf, &record, JsonArray::default(), true); + serialize_item(buf, &record, true); } else { serializers.iter_mut().for_each(|iter| { let _ = iter.next(); @@ -187,7 +183,7 @@ where )) } -fn new_serializer<'a>( +pub(crate) fn new_serializer<'a>( array: &'a dyn Array, ) -> Box + 'a + Send + Sync> { match array.data_type().to_logical_type() { @@ -226,13 +222,10 @@ fn new_serializer<'a>( } } -fn serialize_item( - buffer: &mut Vec, - record: &[(&str, &[u8])], - format: F, - is_first_row: bool, -) { - format.start_row(buffer, is_first_row).unwrap(); +fn serialize_item(buffer: &mut Vec, record: &[(&str, &[u8])], is_first_row: bool) { + if !is_first_row { + buffer.push(b','); + } buffer.push(b'{'); let mut first_item = true; for (key, value) in record { @@ -245,52 +238,18 @@ fn serialize_item( buffer.extend(*value); } buffer.push(b'}'); - format.end_row(buffer).unwrap(); } -/// Serializes a (name, array) to a valid JSON to `buffer` -/// This is CPU-bounded -fn _serialize(names: &[N], columns: &Chunk, format: F, buffer: &mut Vec) -where - N: AsRef, - A: AsRef, - F: JsonFormat, -{ - let num_rows = columns.len(); - - let mut serializers: Vec<_> = columns - .arrays() - .iter() - .map(|array| new_serializer(array.as_ref())) - .collect(); - - let mut is_first_row = true; - (0..num_rows).for_each(|_| { - let mut record: Vec<(&str, &[u8])> = Default::default(); - serializers - .iter_mut() - .zip(names.iter()) - // `unwrap` is infalible because `array.len()` equals `len` on `Chunk` - .for_each(|(iter, name)| { - let item = iter.next().unwrap(); - record.push((name.as_ref(), item)); - }); - serialize_item(buffer, &record, format, is_first_row); - is_first_row = false; - }) -} +/// Serializes `array` to a valid JSON to `buffer` +/// # Implementation +/// This operation is CPU-bounded +pub(crate) fn serialize(array: &dyn Array, buffer: &mut Vec) { + let mut serializer = new_serializer(array); -/// Serializes a (name, array) to a valid JSON to `buffer` -/// This is CPU-bounded -pub fn serialize(names: &[N], columns: &Chunk, format: Format, buffer: &mut Vec) -where - N: AsRef, - A: AsRef, -{ - match format { - Format::Json => _serialize(names, columns, JsonArray::default(), buffer), - Format::NewlineDelimitedJson => { - _serialize(names, columns, LineDelimited::default(), buffer) + (0..array.len()).for_each(|i| { + if i != 0 { + buffer.push(b','); } - } + buffer.extend_from_slice(serializer.next().unwrap()); + }); } diff --git a/src/io/mod.rs b/src/io/mod.rs index cb4ff4bd00a..1b5c39ed7c8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -19,6 +19,9 @@ pub mod csv; #[cfg(feature = "io_json")] #[cfg_attr(docsrs, doc(cfg(feature = "io_json")))] pub mod json; +#[cfg(feature = "io_json")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_json")))] +pub mod ndjson; #[cfg(feature = "io_ipc")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] diff --git a/src/io/ndjson/mod.rs b/src/io/ndjson/mod.rs new file mode 100644 index 00000000000..1448a5af6ef --- /dev/null +++ b/src/io/ndjson/mod.rs @@ -0,0 +1,3 @@ +//! APIs to read from and write to NDJSON +pub mod read; +pub mod write; diff --git a/src/io/ndjson/read/deserialize.rs b/src/io/ndjson/read/deserialize.rs new file mode 100644 index 00000000000..8364fec15db --- /dev/null +++ b/src/io/ndjson/read/deserialize.rs @@ -0,0 +1,26 @@ +use std::sync::Arc; + +use serde_json::Value; + +use crate::array::Array; +use crate::datatypes::DataType; +use crate::error::ArrowError; + +use super::super::super::json::read::_deserialize; + +/// Deserializes rows into an [`Array`] of [`DataType`]. +/// # Implementation +/// This function is CPU-bounded. +/// This function is guaranteed to return an array of length equal to `rows.len()`. +/// # Errors +/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON). +pub fn deserialize(rows: &[String], data_type: DataType) -> Result, ArrowError> { + // deserialize strings to `Value`s + let rows = rows + .iter() + .map(|row| serde_json::from_str(row.as_ref()).map_err(ArrowError::from)) + .collect::, ArrowError>>()?; + + // deserialize &[Value] to Array + Ok(_deserialize(&rows, data_type)) +} diff --git a/src/io/ndjson/read/file.rs b/src/io/ndjson/read/file.rs new file mode 100644 index 00000000000..9ec9a73a8c6 --- /dev/null +++ b/src/io/ndjson/read/file.rs @@ -0,0 +1,121 @@ +use std::io::BufRead; + +use fallible_streaming_iterator::FallibleStreamingIterator; +use indexmap::set::IndexSet as HashSet; +use serde_json; +use serde_json::Value; + +use crate::{ + datatypes::DataType, + error::{ArrowError, Result}, +}; + +use super::super::super::json::read::{coerce_data_type, infer as infer_json}; + +/// Reads up to a number of lines from `reader` into `rows` bounded by `limit`. +fn read_rows(reader: &mut R, rows: &mut [String], limit: usize) -> Result { + if limit == 0 { + return Ok(0); + } + let mut row_number = 0; + for row in rows.iter_mut() { + loop { + row.clear(); + let _ = reader.read_line(row).map_err(|e| { + ArrowError::External(format!(" at line {}", row_number), Box::new(e)) + })?; + if row.is_empty() { + break; + } + if !row.trim().is_empty() { + break; + } + } + if row.is_empty() { + break; + } + row_number += 1; + if row_number == limit { + break; + } + } + Ok(row_number) +} + +/// A [`FallibleStreamingIterator`] of NDJSON rows. +/// +/// This iterator is used to read chunks of an NDJSON in batches. +/// This iterator is guaranteed to yield at least one row. +/// # Implementantion +/// Advancing this iterator is IO-bounded, but does require parsing each byte to find end of lines. +/// # Error +/// Advancing this iterator errors iff the reader errors. +pub struct FileReader { + reader: R, + rows: Vec, + number_of_rows: usize, + remaining: usize, +} + +impl FileReader { + /// Creates a new [`FileReader`] from a reader and `rows`. + /// + /// The number of items in `rows` denotes the batch size. + pub fn new(reader: R, rows: Vec, limit: Option) -> Self { + Self { + reader, + rows, + remaining: limit.unwrap_or(usize::MAX), + number_of_rows: 0, + } + } + + /// Deconstruct [`FileReader`] into the reader and the internal buffer. + pub fn into_inner(self) -> (R, Vec) { + (self.reader, self.rows) + } +} + +impl FallibleStreamingIterator for FileReader { + type Error = ArrowError; + type Item = [String]; + + fn advance(&mut self) -> Result<()> { + self.number_of_rows = read_rows(&mut self.reader, &mut self.rows, self.remaining)?; + self.remaining -= self.number_of_rows; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if self.number_of_rows > 0 { + Some(&self.rows[..self.number_of_rows]) + } else { + None + } + } +} + +/// Infers the [`DataType`] from an NDJSON file, optionally only using `number_of_rows` rows. +/// +/// # Implementantion +/// This implementation reads the file line by line and infers the type of each line. +/// It performs both `O(N)` IO and CPU-bounded operations where `N` is the number of rows. +pub fn infer( + reader: &mut R, + number_of_rows: Option, +) -> Result { + let rows = vec!["".to_string(); 1]; // 1 <=> read row by row + let mut reader = FileReader::new(reader, rows, number_of_rows); + + let mut data_types = HashSet::new(); + while let Some(rows) = reader.next()? { + let value: Value = serde_json::from_str(&rows[0])?; // 0 because it is row by row + let data_type = infer_json(&value)?; + if data_type != DataType::Null { + data_types.insert(data_type); + } + } + + let v: Vec<&DataType> = data_types.iter().collect(); + Ok(coerce_data_type(&v)) +} diff --git a/src/io/ndjson/read/mod.rs b/src/io/ndjson/read/mod.rs new file mode 100644 index 00000000000..5c52bd183fc --- /dev/null +++ b/src/io/ndjson/read/mod.rs @@ -0,0 +1,8 @@ +//! APIs to read and deserialize [NDJSON](http://ndjson.org/). + +pub use fallible_streaming_iterator::FallibleStreamingIterator; + +mod deserialize; +mod file; +pub use deserialize::deserialize; +pub use file::{infer, FileReader}; diff --git a/src/io/ndjson/write/mod.rs b/src/io/ndjson/write/mod.rs new file mode 100644 index 00000000000..456e492de2a --- /dev/null +++ b/src/io/ndjson/write/mod.rs @@ -0,0 +1,119 @@ +//! APIs to serialize and write to [NDJSON](http://ndjson.org/). +use std::io::Write; + +pub use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::array::Array; +use crate::error::ArrowError; + +use super::super::json::write::new_serializer; + +fn serialize(array: &dyn Array, buffer: &mut Vec) { + let mut serializer = new_serializer(array); + (0..array.len()).for_each(|_| { + buffer.extend_from_slice(serializer.next().unwrap()); + buffer.push(b'\n'); + }); +} + +/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid NDJSON +/// where every line is an element of the array. +/// # Implementation +/// Advancing this iterator CPU-bounded +#[derive(Debug, Clone)] +pub struct Serializer +where + A: AsRef, + I: Iterator>, +{ + arrays: I, + buffer: Vec, +} + +impl Serializer +where + A: AsRef, + I: Iterator>, +{ + /// Creates a new [`Serializer`]. + pub fn new(arrays: I, buffer: Vec) -> Self { + Self { arrays, buffer } + } +} + +impl FallibleStreamingIterator for Serializer +where + A: AsRef, + I: Iterator>, +{ + type Item = [u8]; + + type Error = ArrowError; + + fn advance(&mut self) -> Result<(), ArrowError> { + self.buffer.clear(); + self.arrays + .next() + .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer))) + .transpose()?; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if !self.buffer.is_empty() { + Some(&self.buffer) + } else { + None + } + } +} + +/// An iterator adapter that receives an implementer of [`Write`] and +/// an implementer of [`FallibleStreamingIterator`] (such as [`Serializer`]) +/// and writes a valid NDJSON +/// # Implementation +/// Advancing this iterator mixes CPU-bounded (serializing arrays) tasks and IO-bounded (write to the writer). +pub struct FileWriter +where + W: Write, + I: FallibleStreamingIterator, +{ + writer: W, + iterator: I, +} + +impl FileWriter +where + W: Write, + I: FallibleStreamingIterator, +{ + /// Creates a new [`FileWriter`]. + pub fn new(writer: W, iterator: I) -> Self { + Self { writer, iterator } + } + + /// Returns the inner content of this iterator + /// + /// There are two use-cases for this function: + /// * to continue writing to its writer + /// * to re-use an internal buffer of its iterator + pub fn into_inner(self) -> (W, I) { + (self.writer, self.iterator) + } +} + +impl Iterator for FileWriter +where + W: Write, + I: FallibleStreamingIterator, +{ + type Item = Result<(), ArrowError>; + + fn next(&mut self) -> Option { + let item = self.iterator.next().transpose()?; + Some(item.and_then(|x| { + self.writer.write_all(x)?; + Ok(()) + })) + } +} diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index ecd1e93530e..785789ae0e3 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -1,310 +1,16 @@ mod read; mod write; -use std::io::Cursor; use std::sync::Arc; use arrow2::array::*; -use arrow2::bitmap::Bitmap; -use arrow2::buffer::Buffer; -use arrow2::chunk::Chunk; -use arrow2::datatypes::*; use arrow2::error::Result; -use arrow2::io::json::read as json_read; use arrow2::io::json::write as json_write; -fn read_batch(data: String, fields: &[Field]) -> Result>> { - let mut reader = Cursor::new(data); +fn write_batch(array: Box) -> Result> { + let mut serializer = json_write::Serializer::new(vec![Ok(array)].into_iter(), vec![]); - let mut rows = vec![String::default(); 1024]; - let read = json_read::read_rows(&mut reader, &mut rows)?; - let rows = &rows[..read]; - json_read::deserialize(rows, fields) -} - -fn write_batch>( - batch: Chunk, - names: Vec, - format: json_write::Format, -) -> Result> { - let batches = vec![Ok(batch)].into_iter(); - - let blocks = json_write::Serializer::new(batches, names, vec![], format); - - let mut buf = Vec::new(); - json_write::write(&mut buf, format, blocks)?; + let mut buf = vec![]; + json_write::write(&mut buf, &mut serializer)?; Ok(buf) } - -fn round_trip(data: String) -> Result<()> { - let mut reader = Cursor::new(data); - let fields = json_read::infer(&mut reader, None)?; - let data = reader.into_inner(); - - let columns = read_batch(data, &fields)?; - - let buf = write_batch( - columns.clone(), - fields.iter().map(|x| x.name.clone()).collect(), - json_write::Format::NewlineDelimitedJson, - )?; - - let new_chunk = read_batch(String::from_utf8(buf).unwrap(), &fields)?; - - assert_eq!(columns, new_chunk); - Ok(()) -} - -#[test] -fn round_trip_basics() -> Result<()> { - let (data, _, _) = case_basics(); - round_trip(data) -} - -#[test] -fn round_trip_list() -> Result<()> { - let (data, _, _) = case_list(); - round_trip(data) -} - -fn case_list() -> (String, Vec, Vec>) { - let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"} - {"a":-10, "b":null, "c":[true, true]} - {"a":null, "b":[2.1, null, -6.2], "c":[false, null], "d":"text"} - "# - .to_string(); - - let fields = vec![ - Field::new("a", DataType::Int64, true), - Field::new( - "b", - DataType::List(Box::new(Field::new("item", DataType::Float64, true))), - true, - ), - Field::new( - "c", - DataType::List(Box::new(Field::new("item", DataType::Boolean, true))), - true, - ), - Field::new("d", DataType::Utf8, true), - ]; - - let a = Int64Array::from(&[Some(1), Some(-10), None]); - let mut b = MutableListArray::>::new(); - b.try_extend(vec![ - Some(vec![Some(2.0), Some(1.3), Some(-6.1)]), - None, - Some(vec![Some(2.1), None, Some(-6.2)]), - ]) - .unwrap(); - let b: ListArray = b.into(); - - let mut c = MutableListArray::::new(); - c.try_extend(vec![ - Some(vec![Some(false), Some(true)]), - Some(vec![Some(true), Some(true)]), - Some(vec![Some(false), None]), - ]) - .unwrap(); - let c: ListArray = c.into(); - - let d = Utf8Array::::from(&[Some("4"), None, Some("text")]); - - let columns = vec![ - Box::new(a) as Box, - Box::new(b), - Box::new(c), - Box::new(d), - ]; - - (data, fields, columns) -} - -fn case_dict() -> (String, Vec, Vec>) { - let data = r#"{"machine": "a", "events": [null, "Elect Leader", "Do Ballot"]} - {"machine": "b", "events": ["Do Ballot", null, "Send Data", "Elect Leader"]} - {"machine": "c", "events": ["Send Data"]} - {"machine": "c"} - {"machine": "c", "events": null} - "# - .to_string(); - - let data_type = DataType::List(Box::new(Field::new( - "item", - DataType::Dictionary(u64::KEY_TYPE, Box::new(DataType::Utf8), false), - true, - ))); - - let fields = vec![Field::new("events", data_type, true)]; - - type A = MutableDictionaryArray>; - - let mut array = MutableListArray::::new(); - array - .try_extend(vec![ - Some(vec![None, Some("Elect Leader"), Some("Do Ballot")]), - Some(vec![ - Some("Do Ballot"), - None, - Some("Send Data"), - Some("Elect Leader"), - ]), - Some(vec![Some("Send Data")]), - None, - None, - ]) - .unwrap(); - - let array: ListArray = array.into(); - - (data, fields, vec![Box::new(array) as Box]) -} - -fn case_basics() -> (String, Vec, Vec>) { - let data = r#"{"a":1, "b":2.0, "c":false, "d":"4"} - {"a":-10, "b":-3.5, "c":true, "d":null} - {"a":100000000, "b":0.6, "d":"text"}"# - .to_string(); - let fields = vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Float64, true), - Field::new("c", DataType::Boolean, true), - Field::new("d", DataType::Utf8, true), - ]; - let columns = vec![ - Box::new(Int64Array::from_slice(&[1, -10, 100000000])) as Box, - Box::new(Float64Array::from_slice(&[2.0, -3.5, 0.6])), - Box::new(BooleanArray::from(&[Some(false), Some(true), None])), - Box::new(Utf8Array::::from(&[Some("4"), None, Some("text")])), - ]; - (data, fields, columns) -} - -fn case_projection() -> (String, Vec, Vec>) { - let data = r#"{"a":1, "b":2.0, "c":false, "d":"4", "e":"4"} - {"a":10, "b":-3.5, "c":true, "d":null, "e":"text"} - {"a":100000000, "b":0.6, "d":"text"}"# - .to_string(); - let fields = vec![ - Field::new("a", DataType::UInt32, true), - Field::new("b", DataType::Float32, true), - Field::new("c", DataType::Boolean, true), - // note how "d" is not here - Field::new("e", DataType::Binary, true), - ]; - let columns = vec![ - Box::new(UInt32Array::from_slice(&[1, 10, 100000000])) as Box, - Box::new(Float32Array::from_slice(&[2.0, -3.5, 0.6])), - Box::new(BooleanArray::from(&[Some(false), Some(true), None])), - Box::new(BinaryArray::::from(&[ - Some(b"4".as_ref()), - Some(b"text".as_ref()), - None, - ])), - ]; - (data, fields, columns) -} - -fn case_struct() -> (String, Vec, Vec>) { - let data = r#"{"a": {"b": true, "c": {"d": "text"}}} - {"a": {"b": false, "c": null}} - {"a": {"b": true, "c": {"d": "text"}}} - {"a": 1}"# - .to_string(); - - let d_field = Field::new("d", DataType::Utf8, true); - let c_field = Field::new("c", DataType::Struct(vec![d_field.clone()]), true); - let a_field = Field::new( - "a", - DataType::Struct(vec![ - Field::new("b", DataType::Boolean, true), - c_field.clone(), - ]), - true, - ); - let fields = vec![a_field]; - - // build expected output - let d = Utf8Array::::from(&vec![Some("text"), None, Some("text"), None]); - let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); - - let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]); - let expected = StructArray::from_data( - DataType::Struct(vec![Field::new("b", DataType::Boolean, true), c_field]), - vec![Arc::new(b), Arc::new(c)], - None, - ); - - (data, fields, vec![Box::new(expected) as Box]) -} - -fn case_nested_list() -> (String, Vec, Vec>) { - let d_field = Field::new("d", DataType::Utf8, true); - let c_field = Field::new("c", DataType::Struct(vec![d_field.clone()]), true); - let b_field = Field::new("b", DataType::Boolean, true); - let a_struct_field = Field::new( - "a", - DataType::Struct(vec![b_field.clone(), c_field.clone()]), - true, - ); - let a_list_data_type = DataType::List(Box::new(a_struct_field)); - let a_field = Field::new("a", a_list_data_type.clone(), true); - - let data = r#" - {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]} - {"a": [{"b": false, "c": null}]} - {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]} - {"a": null} - {"a": []} - "#.to_string(); - - // build expected output - let d = Utf8Array::::from(&vec![ - Some("a_text"), - Some("b_text"), - None, - Some("c_text"), - Some("d_text"), - None, - ]); - - let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); - - let b = BooleanArray::from(vec![ - Some(true), - Some(false), - Some(false), - Some(true), - None, - Some(true), - ]); - let a_struct = StructArray::from_data( - DataType::Struct(vec![b_field, c_field]), - vec![Arc::new(b) as Arc, Arc::new(c) as Arc], - None, - ); - let expected = ListArray::from_data( - a_list_data_type, - Buffer::from_slice([0i32, 2, 3, 6, 6, 6]), - Arc::new(a_struct) as Arc, - Some(Bitmap::from_u8_slice([0b00010111], 5)), - ); - - ( - data, - vec![a_field], - vec![Box::new(expected) as Box], - ) -} - -fn case(case: &str) -> (String, Vec, Vec>) { - match case { - "basics" => case_basics(), - "projection" => case_projection(), - "list" => case_list(), - "dict" => case_dict(), - "struct" => case_struct(), - "nested_list" => case_nested_list(), - _ => todo!(), - } -} diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 66caf83a302..db1b067bb42 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -1,178 +1,10 @@ -use std::io::Cursor; - use arrow2::array::*; use arrow2::datatypes::*; -use arrow2::error::ArrowError; use arrow2::error::Result; use arrow2::io::json::read; use super::*; -fn test_case(case_: &str) -> Result<()> { - let (data, fields, columns) = case(case_); - - let batch = read_batch(data, &fields)?; - - columns - .iter() - .zip(batch.columns()) - .for_each(|(expected, result)| assert_eq!(expected.as_ref(), result.as_ref())); - Ok(()) -} - -#[test] -fn basic() -> Result<()> { - test_case("basics") -} - -#[test] -fn projection() -> Result<()> { - test_case("projection") -} - -#[test] -fn dictionary() -> Result<()> { - test_case("dict") -} - -#[test] -fn list() -> Result<()> { - test_case("list") -} - -#[test] -fn nested_struct() -> Result<()> { - test_case("struct") -} - -#[test] -fn nested_list() -> Result<()> { - test_case("nested_list") -} - -#[test] -fn line_break_in_values() -> Result<()> { - let data = r#" - {"a":"aa\n\n"} - {"a":"aa\n"} - {"a":null} - "#; - - let batch = read_batch(data.to_string(), &[Field::new("a", DataType::Utf8, true)])?; - - let expected = Utf8Array::::from(&[Some("aa\n\n"), Some("aa\n"), None]); - - assert_eq!(expected, batch.columns()[0].as_ref()); - Ok(()) -} - -#[test] -fn invalid_infer_schema() -> Result<()> { - let re = read::infer(&mut Cursor::new("city,lat,lng"), None); - assert_eq!( - re.err().unwrap().to_string(), - "External error: expected value at line 1 column 1", - ); - Ok(()) -} - -#[test] -fn invalid_read_record() -> Result<()> { - let fields = vec![Field::new( - "a", - DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]), - true, - )]; - let batch = read_batch("city,lat,lng".to_string(), &fields); - - assert_eq!( - batch.err().unwrap().to_string(), - "External error: expected value at line 1 column 1", - ); - Ok(()) -} - -#[test] -fn skip_empty_lines() { - let data = " - {\"a\": 1} - - {\"a\": 2} - - {\"a\": 3}"; - - let batch = read_batch(data.to_string(), &[Field::new("a", DataType::Int64, true)]).unwrap(); - - assert_eq!(1, batch.arrays().len()); - assert_eq!(3, batch.len()); -} - -#[test] -fn row_type_validation() { - let data = " - [1, \"hello\"] - \"world\""; - - let batch = read::infer(&mut Cursor::new(data.to_string()), None); - assert_eq!( - batch.err().unwrap().to_string(), - r#"External format error: Expected JSON record to be an object, found Array([Number(1), String("hello")])"#, - ); -} - -#[test] -fn infer_schema_mixed_list() -> Result<()> { - let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} - {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} - {"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} - {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} - "#; - - let fields = vec![ - Field::new("a", DataType::Int64, true), - Field::new( - "b", - DataType::List(Box::new(Field::new("item", DataType::Float64, true))), - true, - ), - Field::new( - "c", - DataType::List(Box::new(Field::new("item", DataType::Boolean, true))), - true, - ), - Field::new("d", DataType::Utf8, true), - ]; - - let result = read::infer(&mut Cursor::new(data), None)?; - - assert_eq!(result, fields); - Ok(()) -} - -#[test] -fn infer_nested_struct() -> Result<()> { - let data = r#"{"a": {"a": 2.0, "b": 2}} - {"a": {"b": 2}} - {"a": {"a": 2.0, "b": 2, "c": true}} - {"a": {"a": 2.0, "b": 2}} - "#; - - let fields = vec![Field::new( - "a", - DataType::Struct(vec![ - Field::new("a", DataType::Float64, true), - Field::new("b", DataType::Int64, true), - Field::new("c", DataType::Boolean, true), - ]), - true, - )]; - - let result = read::infer(&mut Cursor::new(data), None)?; - - assert_eq!(result, fields); - Ok(()) -} - #[test] fn read_json() -> Result<()> { let data = r#"[ @@ -187,17 +19,11 @@ fn read_json() -> Result<()> { } ]"#; - let data = serde_json::from_slice(data.as_bytes())?; - - let values = if let serde_json::Value::Array(values) = data { - Ok(values) - } else { - Err(ArrowError::InvalidArgumentError("".to_string())) - }?; + let json = serde_json::from_slice(data.as_bytes())?; - let data_type = read::infer_rows(&values)?; + let data_type = read::infer(&json)?; - let result = read::deserialize_json(&values, data_type); + let result = read::deserialize(&json, data_type)?; let expected = StructArray::from_data( DataType::Struct(vec![Field::new("a", DataType::Int64, true)]), diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 3a9af6165cf..69e4bfa9d94 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -4,59 +4,57 @@ use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, TimeUnit}, error::Result, }; use super::*; +macro_rules! test { + ($array:expr, $expected:expr) => {{ + let buf = write_batch(Box::new($array))?; + assert_eq!(String::from_utf8(buf).unwrap(), $expected); + Ok(()) + }}; +} + #[test] -fn write_simple_rows() -> Result<()> { - let a = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); - let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); - - let batch = Chunk::try_new(vec![&a as &dyn Array, &b])?; - - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; - - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":1,"c2":"a"} -{"c1":2,"c2":"b"} -{"c1":3,"c2":"c"} -{"c1":null,"c2":"d"} -{"c1":5,"c2":null} -"# - ); - Ok(()) +fn int32() -> Result<()> { + let array = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); + //let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); + + let expected = r#"[1,2,3,null,5]"#; + + test!(array, expected) } #[test] -fn write_simple_rows_array() -> Result<()> { - let a = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); - let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); +fn utf8() -> Result<()> { + let array = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); - let batch = Chunk::try_new(vec![&a as &dyn Array, &b]).unwrap(); + let expected = r#"["a","b","c","d",null]"#; - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::Json, - )?; + test!(array, expected) +} - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"[{"c1":1,"c2":"a"},{"c1":2,"c2":"b"},{"c1":3,"c2":"c"},{"c1":null,"c2":"d"},{"c1":5,"c2":null}]"# - ); - Ok(()) +#[test] +fn struct_() -> Result<()> { + let c1 = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); + let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); + + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); + + let expected = r#"[{"c1":1,"c2":"a"},{"c1":2,"c2":"b"},{"c1":3,"c2":"c"},{"c1":null,"c2":"d"},{"c1":5,"c2":null}]"#; + + test!(array, expected) } #[test] -fn write_nested_struct_with_validity() -> Result<()> { +fn nested_struct_with_validity() -> Result<()> { let inner = vec![ Field::new("c121", DataType::Utf8, false), Field::new("c122", DataType::Int32, false), @@ -83,26 +81,19 @@ fn write_nested_struct_with_validity() -> Result<()> { ); let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = r#"[{"c1":{"c11":1,"c12":null},"c2":"a"},{"c1":{"c11":null,"c12":{"c121":"f","c122":null}},"c2":"b"},{"c1":null,"c2":"c"}]"#; - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":{"c11":1,"c12":null},"c2":"a"} -{"c1":{"c11":null,"c12":{"c121":"f","c122":null}},"c2":"b"} -{"c1":null,"c2":"c"} -"# - ); - Ok(()) + test!(array, expected) } #[test] -fn write_nested_structs() -> Result<()> { +fn nested_struct() -> Result<()> { let c121 = Field::new("c121", DataType::Utf8, false); let fields = vec![ Field::new("c11", DataType::Int32, false), @@ -128,26 +119,19 @@ fn write_nested_structs() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = r#"[{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"},{"c1":{"c11":null,"c12":{"c121":"f"}},"c2":"b"},{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}]"#; - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"} -{"c1":{"c11":null,"c12":{"c121":"f"}},"c2":"b"} -{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} -"# - ); - Ok(()) + test!(array, expected) } #[test] -fn write_struct_with_list_field() -> Result<()> { +fn struct_with_list_field() -> Result<()> { let iter = vec![vec!["a", "a1"], vec!["b"], vec!["c"], vec!["d"], vec!["e"]]; let iter = iter @@ -160,32 +144,23 @@ fn write_struct_with_list_field() -> Result<()> { false, ); a.try_extend(iter).unwrap(); - let a: ListArray = a.into(); - - let b = PrimitiveArray::from_slice([1, 2, 3, 4, 5]); - - let batch = Chunk::try_new(vec![&a as &dyn Array, &b]).unwrap(); - - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; - - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":["a","a1"],"c2":1} -{"c1":["b"],"c2":2} -{"c1":["c"],"c2":3} -{"c1":["d"],"c2":4} -{"c1":["e"],"c2":5} -"# - ); - Ok(()) + let c1: ListArray = a.into(); + + let c2 = PrimitiveArray::from_slice([1, 2, 3, 4, 5]); + + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); + + let expected = r#"[{"c1":["a","a1"],"c2":1},{"c1":["b"],"c2":2},{"c1":["c"],"c2":3},{"c1":["d"],"c2":4},{"c1":["e"],"c2":5}]"#; + + test!(array, expected) } #[test] -fn write_nested_list() -> Result<()> { +fn nested_list() -> Result<()> { let iter = vec![ vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])], vec![], @@ -208,26 +183,20 @@ fn write_nested_list() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); - let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = + r#"[{"c1":[[1,2],[3]],"c2":"foo"},{"c1":[],"c2":"bar"},{"c1":[[4,5,6]],"c2":null}]"#; - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":[[1,2],[3]],"c2":"foo"} -{"c1":[],"c2":"bar"} -{"c1":[[4,5,6]],"c2":null} -"# - ); - Ok(()) + test!(array, expected) } #[test] -fn write_list_of_struct() -> Result<()> { +fn list_of_struct() -> Result<()> { let inner = vec![Field::new("c121", DataType::Utf8, false)]; let fields = vec![ Field::new("c11", DataType::Int32, false), @@ -269,106 +238,54 @@ fn write_list_of_struct() -> Result<()> { let c2 = Int32Array::from_slice(&[1, 2, 3]); - let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let data_type = DataType::Struct(vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ]); + let array = StructArray::from_data(data_type, vec![Arc::new(c1) as _, Arc::new(c2)], None); - let buf = write_batch( - batch, - vec!["c1".to_string(), "c2".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = r#"[{"c1":[{"c11":1,"c12":null},{"c11":null,"c12":{"c121":"f"}}],"c2":1},{"c1":null,"c2":2},{"c1":[null],"c2":3}]"#; - assert_eq!( - String::from_utf8(buf).unwrap(), - r#"{"c1":[{"c11":1,"c12":null},{"c11":null,"c12":{"c121":"f"}}],"c2":1} -{"c1":null,"c2":2} -{"c1":[null],"c2":3} -"# - ); - Ok(()) + test!(array, expected) } #[test] -fn write_escaped_utf8() -> Result<()> { - let a = Utf8Array::::from(&vec![Some("a\na"), None]); - - let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap(); +fn escaped_end_of_line_in_utf8() -> Result<()> { + let array = Utf8Array::::from(&vec![Some("a\na"), None]); - let buf = write_batch( - batch, - vec!["c1".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = r#"["a\na",null]"#; - assert_eq!( - String::from_utf8(buf).unwrap().as_bytes(), - b"{\"c1\":\"a\\na\"}\n{\"c1\":null}\n" - ); - Ok(()) + test!(array, expected) } #[test] -fn write_quotation_marks_in_utf8() -> Result<()> { - let a = Utf8Array::::from(&vec![Some("a\"a"), None]); +fn escaped_quotation_marks_in_utf8() -> Result<()> { + let array = Utf8Array::::from(&vec![Some("a\"a"), None]); - let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap(); + let expected = r#"["a\"a",null]"#; - let buf = write_batch( - batch, - vec!["c1".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; - - assert_eq!( - String::from_utf8(buf).unwrap().as_bytes(), - b"{\"c1\":\"a\\\"a\"}\n{\"c1\":null}\n" - ); - Ok(()) + test!(array, expected) } #[test] fn write_date32() -> Result<()> { - let a = PrimitiveArray::from_data(DataType::Date32, vec![1000i32, 8000, 10000].into(), None); - - let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap(); - - let buf = write_batch( - batch, - vec!["c1".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; - - assert_eq!( - std::str::from_utf8(&buf).unwrap(), - r#"{"c1":"1972-09-27"} -{"c1":"1991-11-27"} -{"c1":"1997-05-19"} -"# - ); - Ok(()) + let array = + PrimitiveArray::from_data(DataType::Date32, vec![1000i32, 8000, 10000].into(), None); + + let expected = r#"["1972-09-27","1991-11-27","1997-05-19"]"#; + + test!(array, expected) } #[test] fn write_timestamp() -> Result<()> { - let a = PrimitiveArray::from_data( + let array = PrimitiveArray::from_data( DataType::Timestamp(TimeUnit::Second, None), vec![10i64, 1 << 32, 1 << 33].into(), None, ); - let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap(); - - let buf = write_batch( - batch, - vec!["c1".to_string()], - json_write::Format::NewlineDelimitedJson, - )?; + let expected = r#"["1970-01-01 00:00:10","2106-02-07 06:28:16","2242-03-16 12:56:32"]"#; - assert_eq!( - std::str::from_utf8(&buf).unwrap(), - r#"{"c1":"1970-01-01 00:00:10"} -{"c1":"2106-02-07 06:28:16"} -{"c1":"2242-03-16 12:56:32"} -"# - ); - Ok(()) + test!(array, expected) } diff --git a/tests/it/io/mod.rs b/tests/it/io/mod.rs index 75296ecf766..cfd49263a20 100644 --- a/tests/it/io/mod.rs +++ b/tests/it/io/mod.rs @@ -4,6 +4,9 @@ mod print; #[cfg(feature = "io_json")] mod json; +#[cfg(feature = "io_json")] +mod ndjson; + #[cfg(feature = "io_ipc")] mod ipc; diff --git a/tests/it/io/ndjson/mod.rs b/tests/it/io/ndjson/mod.rs new file mode 100644 index 00000000000..9f9cd0ab6b6 --- /dev/null +++ b/tests/it/io/ndjson/mod.rs @@ -0,0 +1,312 @@ +mod read; + +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::bitmap::Bitmap; +use arrow2::buffer::Buffer; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::ndjson::write as ndjson_write; + +use read::{infer, read_and_deserialize}; + +fn round_trip(ndjson: String) -> Result<()> { + let data_type = infer(&ndjson)?; + + let expected = read_and_deserialize(&ndjson, &data_type, 1000)?; + + let arrays = expected.clone().into_iter().map(Ok); + + let serializer = ndjson_write::Serializer::new(arrays, vec![]); + + let mut writer = ndjson_write::FileWriter::new(vec![], serializer); + writer.by_ref().collect::>()?; // write + let buf = writer.into_inner().0; + + let new_chunk = read_and_deserialize(std::str::from_utf8(&buf).unwrap(), &data_type, 1000)?; + + assert_eq!(expected, new_chunk); + Ok(()) +} + +#[test] +fn round_trip_basics() -> Result<()> { + let (data, _) = case_basics(); + round_trip(data) +} + +#[test] +fn round_trip_list() -> Result<()> { + let (data, _) = case_list(); + round_trip(data) +} + +fn case_list() -> (String, Arc) { + let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":"4"} + {"a":-10, "b":null, "c":[true, true]} + {"a":null, "b":[2.1, null, -6.2], "c":[false, null], "d":"text"} + "# + .to_string(); + + let data_type = DataType::Struct(vec![ + Field::new("a", DataType::Int64, true), + Field::new( + "b", + DataType::List(Box::new(Field::new("item", DataType::Float64, true))), + true, + ), + Field::new( + "c", + DataType::List(Box::new(Field::new("item", DataType::Boolean, true))), + true, + ), + Field::new("d", DataType::Utf8, true), + ]); + + let a = Int64Array::from(&[Some(1), Some(-10), None]); + let mut b = MutableListArray::>::new(); + b.try_extend(vec![ + Some(vec![Some(2.0), Some(1.3), Some(-6.1)]), + None, + Some(vec![Some(2.1), None, Some(-6.2)]), + ]) + .unwrap(); + let b: ListArray = b.into(); + + let mut c = MutableListArray::::new(); + c.try_extend(vec![ + Some(vec![Some(false), Some(true)]), + Some(vec![Some(true), Some(true)]), + Some(vec![Some(false), None]), + ]) + .unwrap(); + let c: ListArray = c.into(); + + let d = Utf8Array::::from(&[Some("4"), None, Some("text")]); + + let array = StructArray::from_data( + data_type, + vec![ + Arc::new(a) as Arc, + Arc::new(b), + Arc::new(c), + Arc::new(d), + ], + None, + ); + + (data, Arc::new(array)) +} + +fn case_dict() -> (String, Arc) { + let data = r#"{"machine": "a", "events": [null, "Elect Leader", "Do Ballot"]} + {"machine": "b", "events": ["Do Ballot", null, "Send Data", "Elect Leader"]} + {"machine": "c", "events": ["Send Data"]} + {"machine": "c"} + {"machine": "c", "events": null} + "# + .to_string(); + + let data_type = DataType::List(Box::new(Field::new( + "item", + DataType::Dictionary(u64::KEY_TYPE, Box::new(DataType::Utf8), false), + true, + ))); + + let fields = vec![Field::new("events", data_type, true)]; + + type A = MutableDictionaryArray>; + + let mut array = MutableListArray::::new(); + array + .try_extend(vec![ + Some(vec![None, Some("Elect Leader"), Some("Do Ballot")]), + Some(vec![ + Some("Do Ballot"), + None, + Some("Send Data"), + Some("Elect Leader"), + ]), + Some(vec![Some("Send Data")]), + None, + None, + ]) + .unwrap(); + + let array: ListArray = array.into(); + + ( + data, + Arc::new(StructArray::from_data( + DataType::Struct(fields), + vec![Arc::new(array) as Arc], + None, + )), + ) +} + +fn case_basics() -> (String, Arc) { + let data = r#"{"a":1, "b":2.0, "c":false, "d":"4"} + {"a":-10, "b":-3.5, "c":true, "d":null} + {"a":100000000, "b":0.6, "d":"text"}"# + .to_string(); + let data_type = DataType::Struct(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Float64, true), + Field::new("c", DataType::Boolean, true), + Field::new("d", DataType::Utf8, true), + ]); + let array = StructArray::from_data( + data_type, + vec![ + Arc::new(Int64Array::from_slice(&[1, -10, 100000000])) as Arc, + Arc::new(Float64Array::from_slice(&[2.0, -3.5, 0.6])), + Arc::new(BooleanArray::from(&[Some(false), Some(true), None])), + Arc::new(Utf8Array::::from(&[Some("4"), None, Some("text")])), + ], + None, + ); + (data, Arc::new(array)) +} + +fn case_projection() -> (String, Arc) { + let data = r#"{"a":1, "b":2.0, "c":false, "d":"4", "e":"4"} + {"a":10, "b":-3.5, "c":true, "d":null, "e":"text"} + {"a":100000000, "b":0.6, "d":"text"}"# + .to_string(); + let data_type = DataType::Struct(vec![ + Field::new("a", DataType::UInt32, true), + Field::new("b", DataType::Float32, true), + Field::new("c", DataType::Boolean, true), + // note how "d" is not here + Field::new("e", DataType::Binary, true), + ]); + let array = StructArray::from_data( + data_type, + vec![ + Arc::new(UInt32Array::from_slice(&[1, 10, 100000000])) as Arc, + Arc::new(Float32Array::from_slice(&[2.0, -3.5, 0.6])), + Arc::new(BooleanArray::from(&[Some(false), Some(true), None])), + Arc::new(BinaryArray::::from(&[ + Some(b"4".as_ref()), + Some(b"text".as_ref()), + None, + ])), + ], + None, + ); + (data, Arc::new(array)) +} + +fn case_struct() -> (String, Arc) { + let data = r#"{"a": {"b": true, "c": {"d": "text"}}} + {"a": {"b": false, "c": null}} + {"a": {"b": true, "c": {"d": "text"}}} + {"a": 1}"# + .to_string(); + + let d_field = Field::new("d", DataType::Utf8, true); + let c_field = Field::new("c", DataType::Struct(vec![d_field.clone()]), true); + let a_field = Field::new( + "a", + DataType::Struct(vec![ + Field::new("b", DataType::Boolean, true), + c_field.clone(), + ]), + true, + ); + let fields = vec![a_field]; + + // build expected output + let d = Utf8Array::::from(&vec![Some("text"), None, Some("text"), None]); + let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); + + let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]); + let inner = DataType::Struct(vec![Field::new("b", DataType::Boolean, true), c_field]); + let expected = StructArray::from_data(inner, vec![Arc::new(b), Arc::new(c)], None); + + let data_type = DataType::Struct(fields); + + ( + data, + Arc::new(StructArray::from_data( + data_type, + vec![Arc::new(expected) as Arc], + None, + )), + ) +} + +fn case_nested_list() -> (String, Arc) { + let d_field = Field::new("d", DataType::Utf8, true); + let c_field = Field::new("c", DataType::Struct(vec![d_field.clone()]), true); + let b_field = Field::new("b", DataType::Boolean, true); + let a_struct_field = Field::new( + "a", + DataType::Struct(vec![b_field.clone(), c_field.clone()]), + true, + ); + let a_list_data_type = DataType::List(Box::new(a_struct_field)); + let a_field = Field::new("a", a_list_data_type.clone(), true); + + let data = r#" + {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]} + {"a": [{"b": false, "c": null}]} + {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]} + {"a": null} + {"a": []} + "#.to_string(); + + // build expected output + let d = Utf8Array::::from(&vec![ + Some("a_text"), + Some("b_text"), + None, + Some("c_text"), + Some("d_text"), + None, + ]); + + let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); + + let b = BooleanArray::from(vec![ + Some(true), + Some(false), + Some(false), + Some(true), + None, + Some(true), + ]); + let a_struct = StructArray::from_data( + DataType::Struct(vec![b_field, c_field]), + vec![Arc::new(b) as Arc, Arc::new(c) as Arc], + None, + ); + let expected = ListArray::from_data( + a_list_data_type, + Buffer::from_slice([0i32, 2, 3, 6, 6, 6]), + Arc::new(a_struct) as Arc, + Some(Bitmap::from_u8_slice([0b00010111], 5)), + ); + + let array = Arc::new(StructArray::from_data( + DataType::Struct(vec![a_field]), + vec![Arc::new(expected)], + None, + )); + + (data, array) +} + +fn case(case: &str) -> (String, Arc) { + match case { + "basics" => case_basics(), + "projection" => case_projection(), + "list" => case_list(), + "dict" => case_dict(), + "struct" => case_struct(), + "nested_list" => case_nested_list(), + _ => todo!(), + } +} diff --git a/tests/it/io/ndjson/read.rs b/tests/it/io/ndjson/read.rs new file mode 100644 index 00000000000..489fd548503 --- /dev/null +++ b/tests/it/io/ndjson/read.rs @@ -0,0 +1,246 @@ +use std::io::Cursor; +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::datatypes::{DataType, Field}; +use arrow2::error::Result; +use arrow2::io::ndjson::read as ndjson_read; +use arrow2::io::ndjson::read::FallibleStreamingIterator; + +use super::*; + +fn test_case(case_: &str) -> Result<()> { + let (ndjson, expected) = case(case_); + + let data_type = expected.data_type().clone(); + + let mut arrays = read_and_deserialize(&ndjson, &data_type, 1000)?; + + assert_eq!(arrays.len(), 1); + assert_eq!(expected, arrays.pop().unwrap()); + Ok(()) +} + +pub fn infer(ndjson: &str) -> Result { + ndjson_read::infer(&mut Cursor::new(ndjson), None) +} + +pub fn read_and_deserialize( + ndjson: &str, + data_type: &DataType, + batch_size: usize, +) -> Result>> { + let reader = Cursor::new(ndjson); + + let mut reader = ndjson_read::FileReader::new(reader, vec!["".to_string(); batch_size], None); + + let mut chunks = vec![]; + while let Some(rows) = reader.next()? { + chunks.push(ndjson_read::deserialize(rows, data_type.clone())?); + } + + Ok(chunks) +} + +#[test] +fn infer_nullable() -> Result<()> { + let ndjson = r#"true + false + null + true + "#; + let expected = DataType::Boolean; + + let result = infer(ndjson)?; + + assert_eq!(result, expected); + Ok(()) +} + +fn case_nested_struct() -> (String, Arc) { + let ndjson = r#"{"a": {"a": 2.0, "b": 2}} + {"a": {"b": 2}} + {"a": {"a": 2.0, "b": 2, "c": true}} + {"a": {"a": 2.0, "b": 2}} + "#; + + let inner = DataType::Struct(vec![ + Field::new("a", DataType::Float64, true), + Field::new("b", DataType::Int64, true), + Field::new("c", DataType::Boolean, true), + ]); + + let data_type = DataType::Struct(vec![Field::new("a", inner.clone(), true)]); + + let values = vec![ + Arc::new(Float64Array::from([Some(2.0), None, Some(2.0), Some(2.0)])) as Arc, + Arc::new(Int64Array::from([Some(2), Some(2), Some(2), Some(2)])), + Arc::new(BooleanArray::from([None, None, Some(true), None])), + ]; + + let values = vec![Arc::new(StructArray::from_data(inner, values, None)) as Arc]; + + let array = Arc::new(StructArray::from_data(data_type, values, None)); + + (ndjson.to_string(), array) +} + +#[test] +fn infer_nested_struct() -> Result<()> { + let (ndjson, array) = case_nested_struct(); + + let result = infer(&ndjson)?; + + assert_eq!(&result, array.data_type()); + Ok(()) +} + +#[test] +fn read_nested_struct() -> Result<()> { + let (ndjson, expected) = case_nested_struct(); + + let data_type = infer(&ndjson)?; + + let result = read_and_deserialize(&ndjson, &data_type, 100)?; + + assert_eq!(result, vec![expected]); + Ok(()) +} + +#[test] +fn read_nested_struct_batched() -> Result<()> { + let (ndjson, expected) = case_nested_struct(); + let batch_size = 2; + + // create a chunked array by batch_size from the (un-chunked) expected + let expected: Vec> = (0..(expected.len() + batch_size - 1) / batch_size) + .map(|offset| expected.slice(offset * batch_size, batch_size).into()) + .collect(); + + let data_type = infer(&ndjson)?; + + let result = read_and_deserialize(&ndjson, &data_type, batch_size)?; + + assert_eq!(result, expected,); + Ok(()) +} + +#[test] +fn invalid_infer_schema() -> Result<()> { + let re = ndjson_read::infer(&mut Cursor::new("city,lat,lng"), None); + assert_eq!( + re.err().unwrap().to_string(), + "External error: expected value at line 1 column 1", + ); + Ok(()) +} + +#[test] +fn infer_schema_mixed_list() -> Result<()> { + let ndjson = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} + {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} + {"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} + {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} + "#; + + let expected = DataType::Struct(vec![ + Field::new("a", DataType::Int64, true), + Field::new( + "b", + DataType::List(Box::new(Field::new("item", DataType::Float64, true))), + true, + ), + Field::new( + "c", + DataType::List(Box::new(Field::new("item", DataType::Boolean, true))), + true, + ), + Field::new("d", DataType::Utf8, true), + ]); + + let result = infer(ndjson)?; + + assert_eq!(result, expected); + Ok(()) +} + +#[test] +fn basic() -> Result<()> { + test_case("basics") +} + +#[test] +fn projection() -> Result<()> { + test_case("projection") +} + +#[test] +fn dictionary() -> Result<()> { + test_case("dict") +} + +#[test] +fn list() -> Result<()> { + test_case("list") +} + +#[test] +fn nested_struct() -> Result<()> { + test_case("struct") +} + +#[test] +fn nested_list() -> Result<()> { + test_case("nested_list") +} + +#[test] +fn line_break_in_values() -> Result<()> { + let ndjson = r#" + "aa\n\n" + "aa\n" + null + "#; + + let data_type = DataType::Utf8; + let arrays = read_and_deserialize(ndjson, &data_type, 1000)?; + + let expected = Utf8Array::::from(&[Some("aa\n\n"), Some("aa\n"), None]); + + assert_eq!(expected, arrays[0].as_ref()); + Ok(()) +} + +#[test] +fn invalid_read_record() -> Result<()> { + let fields = vec![Field::new( + "a", + DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]), + true, + )]; + let data_type = DataType::Struct(fields); + let arrays = read_and_deserialize("city,lat,lng", &data_type, 1000); + + assert_eq!( + arrays.err().unwrap().to_string(), + "External error: expected value at line 1 column 1", + ); + Ok(()) +} + +#[test] +fn skip_empty_lines() -> Result<()> { + let ndjson = " + {\"a\": 1} + + {\"a\": 2} + + {\"a\": 3}"; + + let data_type = DataType::Struct(vec![Field::new("a", DataType::Int64, true)]); + let arrays = read_and_deserialize(ndjson, &data_type, 1000)?; + + assert_eq!(1, arrays.len()); + assert_eq!(3, arrays[0].len()); + Ok(()) +}