From 2b0fe1472a3e1653db59b68fb475675b00ebedcd Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 24 Dec 2021 07:37:43 +0000 Subject: [PATCH] split IO-bounded from CPU-bounded tasks --- examples/json_read.rs | 46 ++++ guide/src/SUMMARY.md | 1 + guide/src/io/json_read.md | 10 + src/io/json/mod.rs | 3 +- src/io/json/read/deserialize.rs | 154 ++++++------ src/io/json/read/infer_schema.rs | 397 ++++++++++++------------------- src/io/json/read/iterator.rs | 56 +++++ src/io/json/read/mod.rs | 50 ++-- src/io/json/read/reader.rs | 288 ---------------------- src/io/json/read/util.rs | 77 ------ tests/it/io/json/mod.rs | 35 ++- tests/it/io/json/read.rs | 132 ++++------ 12 files changed, 453 insertions(+), 796 deletions(-) create mode 100644 examples/json_read.rs create mode 100644 guide/src/io/json_read.md create mode 100644 src/io/json/read/iterator.rs delete mode 100644 src/io/json/read/reader.rs delete mode 100644 src/io/json/read/util.rs diff --git a/examples/json_read.rs b/examples/json_read.rs new file mode 100644 index 00000000000..ddc4ba5f9be --- /dev/null +++ b/examples/json_read.rs @@ -0,0 +1,46 @@ +use std::fs::File; +use std::io::BufReader; + +use arrow2::error::Result; +use arrow2::io::json::read; +use arrow2::record_batch::RecordBatch; + +fn read_path(path: &str, projection: Option>) -> Result { + // Example of reading a JSON file. + let mut reader = BufReader::new(File::open(path)?); + + let fields = read::infer_and_reset(&mut reader, None)?; + + let fields = if let Some(projection) = projection { + fields + .into_iter() + .filter(|field| projection.contains(&field.name().as_ref())) + .collect() + } else { + fields + }; + + // at most 1024 rows. This container can be re-used across batches. + let mut rows = vec![String::default(); 1024]; + + // Reads up to 1024 rows. + // this is IO-intensive and performs minimal CPU work. In particular, + // no deserialization is performed. + let read = read::read_rows(&mut reader, &mut rows)?; + let rows = &rows[..read]; + + // deserialize `rows` into a `RecordBatch`. This is CPU-intensive, has no IO, + // and can be performed on a different thread pool via a channel. + read::deserialize(&rows, fields) +} + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + + let batch = read_path(file_path, None)?; + println!("{:#?}", batch); + Ok(()) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 9719504ee56..d444beac250 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -18,3 +18,4 @@ - [Write Arrow](./io/ipc_write.md) - [Read Avro](./io/avro_read.md) - [Write Avro](./io/avro_write.md) + - [Read JSON](./io/json_read.md) diff --git a/guide/src/io/json_read.md b/guide/src/io/json_read.md new file mode 100644 index 00000000000..66b70952707 --- /dev/null +++ b/guide/src/io/json_read.md @@ -0,0 +1,10 @@ +# JSON read + +When compiled with feature `io_json`, you can use this crate to read JSON files. + +```rust +{{#include ../../../examples/json_read.rs}} +``` + +Note how deserialization can be performed on a separate thread pool to avoid +blocking the runtime (see also [here](https://ryhl.io/blog/async-what-is-blocking/)). diff --git a/src/io/json/mod.rs b/src/io/json/mod.rs index e5acb380a90..7411038a7d5 100644 --- a/src/io/json/mod.rs +++ b/src/io/json/mod.rs @@ -2,10 +2,9 @@ #![forbid(unsafe_code)] //! Convert data between the Arrow memory format and JSON line-delimited records. -mod read; +pub mod read; mod write; -pub use read::*; pub use write::*; use crate::error::ArrowError; diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index d9b099e78a0..a5b56c74c5c 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -1,20 +1,4 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - +use std::borrow::Borrow; use std::hash::Hasher; use std::{collections::hash_map::DefaultHasher, sync::Arc}; @@ -23,6 +7,9 @@ use indexmap::map::IndexMap as HashMap; use num_traits::NumCast; use serde_json::Value; +use crate::datatypes::{Field, Schema}; +use crate::error::ArrowError; +use crate::record_batch::RecordBatch; use crate::{ array::*, bitmap::MutableBitmap, @@ -73,8 +60,19 @@ fn build_extract(data_type: &DataType) -> Extract { } } -fn read_int(rows: &[&Value], data_type: DataType) -> PrimitiveArray { - let iter = rows.iter().map(|row| match row { +fn deserialize_boolean>(rows: &[A]) -> BooleanArray { + let iter = rows.iter().map(|row| match row.borrow() { + Value::Bool(v) => Some(v), + _ => None, + }); + BooleanArray::from_trusted_len_iter(iter) +} + +fn deserialize_int>( + rows: &[A], + data_type: DataType, +) -> PrimitiveArray { + let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => number.as_i64().and_then(num_traits::cast::), Value::Bool(number) => num_traits::cast::(*number as i32), _ => None, @@ -82,8 +80,11 @@ fn read_int(rows: &[&Value], data_type: DataType) -> Pr PrimitiveArray::from_trusted_len_iter(iter).to(data_type) } -fn read_float(rows: &[&Value], data_type: DataType) -> PrimitiveArray { - let iter = rows.iter().map(|row| match row { +fn deserialize_float>( + rows: &[A], + data_type: DataType, +) -> PrimitiveArray { + let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => number.as_f64().and_then(num_traits::cast::), Value::Bool(number) => num_traits::cast::(*number as i32), _ => None, @@ -91,24 +92,16 @@ fn read_float(rows: &[&Value], data_type: DataType) -> PrimitiveArray::from_trusted_len_iter(iter).to(data_type) } -fn read_binary(rows: &[&Value]) -> BinaryArray { - let iter = rows.iter().map(|row| match row { +fn deserialize_binary>(rows: &[A]) -> BinaryArray { + let iter = rows.iter().map(|row| match row.borrow() { Value::String(v) => Some(v.as_bytes()), _ => None, }); BinaryArray::from_trusted_len_iter(iter) } -fn read_boolean(rows: &[&Value]) -> BooleanArray { - let iter = rows.iter().map(|row| match row { - Value::Bool(v) => Some(v), - _ => None, - }); - BooleanArray::from_trusted_len_iter(iter) -} - -fn read_utf8(rows: &[&Value]) -> Utf8Array { - let iter = rows.iter().map(|row| match row { +fn deserialize_utf8>(rows: &[A]) -> Utf8Array { + let iter = rows.iter().map(|row| match row.borrow() { Value::String(v) => Some(v.clone()), Value::Number(v) => Some(v.to_string()), Value::Bool(v) => Some(v.to_string()), @@ -117,15 +110,15 @@ fn read_utf8(rows: &[&Value]) -> Utf8Array { Utf8Array::::from_trusted_len_iter(iter) } -fn read_list(rows: &[&Value], data_type: DataType) -> ListArray { +fn deserialize_list>(rows: &[A], data_type: DataType) -> ListArray { let child = ListArray::::get_child_type(&data_type); let mut validity = MutableBitmap::with_capacity(rows.len()); - let mut inner = Vec::<&Value>::with_capacity(rows.len()); let mut offsets = Vec::::with_capacity(rows.len() + 1); + let mut inner = vec![]; offsets.push(O::zero()); rows.iter().fold(O::zero(), |mut length, row| { - match row { + match row.borrow() { Value::Array(value) => { inner.extend(value.iter()); validity.push(true); @@ -142,26 +135,21 @@ fn read_list(rows: &[&Value], data_type: DataType) -> ListArray { } }); - let values = read(&inner, child.clone()); + let values = _deserialize(&inner, child.clone()); ListArray::::from_data(data_type, offsets.into(), values, validity.into()) } -fn read_struct(rows: &[&Value], data_type: DataType) -> StructArray { +fn deserialize_struct>(rows: &[A], data_type: DataType) -> StructArray { let fields = StructArray::get_fields(&data_type); let mut values = fields .iter() - .map(|f| { - ( - f.name(), - (f.data_type(), Vec::<&Value>::with_capacity(rows.len())), - ) - }) + .map(|f| (f.name(), (f.data_type(), vec![]))) .collect::>(); rows.iter().for_each(|row| { - match row { + match row.borrow() { Value::Object(value) => { values .iter_mut() @@ -177,23 +165,26 @@ fn read_struct(rows: &[&Value], data_type: DataType) -> StructArray { let values = values .into_iter() - .map(|(_, (data_type, values))| read(&values, data_type.clone())) + .map(|(_, (data_type, values))| _deserialize(&values, data_type.clone())) .collect::>(); StructArray::from_data(data_type, values, None) } -fn read_dictionary(rows: &[&Value], data_type: DataType) -> DictionaryArray { +fn deserialize_dictionary>( + rows: &[A], + data_type: DataType, +) -> DictionaryArray { let child = DictionaryArray::::get_child(&data_type); let mut map = HashedMap::::default(); let extractor = build_extract(child); - let mut inner = Vec::<&Value>::with_capacity(rows.len()); + let mut inner = vec![]; let keys = rows .iter() - .map(|x| extractor(x)) + .map(|x| extractor(x.borrow())) .map(|item| match item { Some((hash, v)) => match map.get(&hash) { Some(key) => Some(*key), @@ -209,20 +200,22 @@ fn read_dictionary(rows: &[&Value], data_type: DataType) -> Di }) .collect::>(); - let values = read(&inner, child.clone()); + let values = _deserialize(&inner, child.clone()); DictionaryArray::::from_data(keys, values) } -pub fn read(rows: &[&Value], data_type: DataType) -> Arc { +fn _deserialize>(rows: &[A], data_type: DataType) -> Arc { match &data_type { DataType::Null => Arc::new(NullArray::from_data(data_type, rows.len())), - DataType::Boolean => Arc::new(read_boolean(rows)), - DataType::Int8 => Arc::new(read_int::(rows, data_type)), - DataType::Int16 => Arc::new(read_int::(rows, data_type)), + DataType::Boolean => Arc::new(deserialize_boolean(rows)), + DataType::Int8 => Arc::new(deserialize_int::(rows, data_type)), + DataType::Int16 => Arc::new(deserialize_int::(rows, data_type)), DataType::Int32 | DataType::Date32 | DataType::Time32(_) - | DataType::Interval(IntervalUnit::YearMonth) => Arc::new(read_int::(rows, data_type)), + | DataType::Interval(IntervalUnit::YearMonth) => { + Arc::new(deserialize_int::(rows, data_type)) + } DataType::Interval(IntervalUnit::DayTime) => { unimplemented!("There is no natural representation of DayTime in JSON.") } @@ -230,24 +223,24 @@ pub fn read(rows: &[&Value], data_type: DataType) -> Arc { | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => Arc::new(read_int::(rows, data_type)), - DataType::UInt8 => Arc::new(read_int::(rows, data_type)), - DataType::UInt16 => Arc::new(read_int::(rows, data_type)), - DataType::UInt32 => Arc::new(read_int::(rows, data_type)), - DataType::UInt64 => Arc::new(read_int::(rows, data_type)), + | DataType::Duration(_) => Arc::new(deserialize_int::(rows, data_type)), + DataType::UInt8 => Arc::new(deserialize_int::(rows, data_type)), + DataType::UInt16 => Arc::new(deserialize_int::(rows, data_type)), + DataType::UInt32 => Arc::new(deserialize_int::(rows, data_type)), + DataType::UInt64 => Arc::new(deserialize_int::(rows, data_type)), DataType::Float16 => unreachable!(), - DataType::Float32 => Arc::new(read_float::(rows, data_type)), - DataType::Float64 => Arc::new(read_float::(rows, data_type)), - DataType::Utf8 => Arc::new(read_utf8::(rows)), - DataType::LargeUtf8 => Arc::new(read_utf8::(rows)), - DataType::List(_) => Arc::new(read_list::(rows, data_type)), - DataType::LargeList(_) => Arc::new(read_list::(rows, data_type)), - DataType::Binary => Arc::new(read_binary::(rows)), - DataType::LargeBinary => Arc::new(read_binary::(rows)), - DataType::Struct(_) => Arc::new(read_struct(rows, data_type)), + DataType::Float32 => Arc::new(deserialize_float::(rows, data_type)), + DataType::Float64 => Arc::new(deserialize_float::(rows, data_type)), + DataType::Utf8 => Arc::new(deserialize_utf8::(rows)), + DataType::LargeUtf8 => Arc::new(deserialize_utf8::(rows)), + DataType::List(_) => Arc::new(deserialize_list::(rows, data_type)), + DataType::LargeList(_) => Arc::new(deserialize_list::(rows, data_type)), + DataType::Binary => Arc::new(deserialize_binary::(rows)), + DataType::LargeBinary => Arc::new(deserialize_binary::(rows)), + DataType::Struct(_) => Arc::new(deserialize_struct(rows, data_type)), DataType::Dictionary(key_type, _) => { match_integer_type!(key_type, |$T| { - Arc::new(read_dictionary::<$T>(rows, data_type)) + Arc::new(deserialize_dictionary::<$T, _>(rows, data_type)) }) } _ => todo!(), @@ -258,3 +251,24 @@ pub fn read(rows: &[&Value], data_type: DataType) -> Arc { */ } } + +/// Deserializes `rows` into a [`RecordBatch`] according to `fields`. +/// This is CPU-bounded. +pub fn deserialize>( + rows: &[A], + fields: Vec, +) -> Result { + let data_type = DataType::Struct(fields); + + // convert rows to `Value` + let rows = rows + .iter() + .map(|row| { + let row: Value = serde_json::from_str(row.as_ref()).map_err(ArrowError::from)?; + Ok(row) + }) + .collect::, ArrowError>>()?; + + let (fields, columns, _) = deserialize_struct(&rows, data_type).into_data(); + RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) +} diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index 56a851bad18..cef77dab210 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -1,92 +1,22 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::borrow::Borrow; +use std::io::{BufRead, Seek, SeekFrom}; use indexmap::map::IndexMap as HashMap; use indexmap::set::IndexSet as HashSet; use serde_json::Value; -use super::util::ValueIter; - use crate::datatypes::*; use crate::error::{ArrowError, Result}; -/// Coerce data type during inference -/// -/// * `Int64` and `Float64` should be `Float64` -/// * Lists and scalars are coerced to a list of a compatible scalar -/// * All other types are coerced to `Utf8` -fn coerce_data_type(dt: &[&DataType]) -> DataType { - use DataType::*; - if dt.len() == 1 { - return dt[0].clone(); - } else if dt.len() > 2 { - return List(Box::new(Field::new("item", Utf8, true))); - } - let (lhs, rhs) = (dt[0], dt[1]); +use super::iterator::ValueIter; - return match (lhs, rhs) { - (lhs, rhs) if lhs == rhs => lhs.clone(), - (List(lhs), List(rhs)) => { - let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]); - List(Box::new(Field::new("item", inner, true))) - } - (scalar, List(list)) => { - let inner = coerce_data_type(&[scalar, list.data_type()]); - List(Box::new(Field::new("item", inner, true))) - } - (List(list), scalar) => { - let inner = coerce_data_type(&[scalar, list.data_type()]); - List(Box::new(Field::new("item", inner, true))) - } - (Float64, Int64) => Float64, - (Int64, Float64) => Float64, - (Int64, Boolean) => Int64, - (Boolean, Int64) => Int64, - (_, _) => Utf8, - }; -} +type Tracker = HashMap>; -/// Generate schema from JSON field names and inferred data types -fn generate_schema(spec: HashMap>) -> Schema { - let fields: Vec = spec - .iter() - .map(|(k, hs)| { - let v: Vec<&DataType> = hs.iter().collect(); - Field::new(k, coerce_data_type(&v), true) - }) - .collect(); - Schema::new(fields) -} - -/// Infer the fields of a JSON file by reading the first n records of the buffer, with -/// `max_read_records` controlling the maximum number of records to read. -/// -/// If `max_read_records` is not set, the whole file is read to infer its field types. -/// -/// This function will not seek back to the start of the `reader`. The user has to manage the -/// original file's cursor. This function is useful when the `reader`'s cursor is not available -/// (does not implement [`Seek`]), such is the case for compressed streams decoders. -/// +/// Infers the fields of a JSON file by reading the first `number_of_rows` rows. /// # Examples /// ``` -/// use std::io::{BufReader, Cursor, SeekFrom, Seek}; -/// use arrow2::io::json::infer_json_schema; +/// use std::io::Cursor; +/// use arrow2::io::json::read::infer; /// /// let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} /// {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} @@ -95,190 +25,179 @@ fn generate_schema(spec: HashMap>) -> Schema { /// "#; /// /// // file's cursor's offset at 0 -/// let mut reader = BufReader::new(Cursor::new(data)); -/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap(); +/// let mut reader = Cursor::new(data); +/// let fields = infer(&mut reader, None).unwrap(); /// ``` -pub fn infer_json_schema( - reader: &mut BufReader, - max_read_records: Option, -) -> Result { - infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records)) +pub fn infer(reader: &mut R, number_of_rows: Option) -> Result> { + infer_iterator(ValueIter::new(reader, number_of_rows)) } -/// Infer the fields of a JSON file by reading all items from the JSON Value Iterator. -pub fn infer_json_schema_from_iterator(value_iter: I) -> Result +/// Infer [`Field`]s from an iterator of [`Value`]. +pub fn infer_iterator(value_iter: I) -> Result> where - I: Iterator>, + I: Iterator>, + A: Borrow, { - let mut values: HashMap> = HashMap::new(); + let mut values: Tracker = Tracker::new(); for record in value_iter { - match record? { - Value::Object(map) => { - let res = map.iter().try_for_each(|(k, v)| { - match v { - Value::Array(a) => { - // collect the data types in array - let types: Result>> = a - .iter() - .map(|a| match a { - Value::Null => Ok(None), - Value::Number(n) => { - if n.is_i64() { - Ok(Some(&DataType::Int64)) - } else { - Ok(Some(&DataType::Float64)) - } - } - Value::Bool(_) => Ok(Some(&DataType::Boolean)), - Value::String(_) => Ok(Some(&DataType::Utf8)), - Value::Array(_) | Value::Object(_) => { - Err(ArrowError::NotYetImplemented( - "Nested lists and structs not supported".to_string(), - )) - } - }) - .collect(); - match types { - Ok(types) => { - // unwrap the Option and discard None values (from - // JSON nulls) - let mut types: Vec<&DataType> = - types.into_iter().flatten().collect(); - types.dedup(); - // if a record contains only nulls, it is not - // added to values - if !types.is_empty() { - let dt = coerce_data_type(&types); - - if values.contains_key(k) { - let x = values.get_mut(k).unwrap(); - x.insert(DataType::List(Box::new(Field::new( - "item", dt, true, - )))); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(DataType::List(Box::new(Field::new( - "item", dt, true, - )))); - values.insert(k.to_string(), hs); - } - } - Ok(()) - } - Err(e) => Err(e), - } - } - Value::Bool(_) => { - if values.contains_key(k) { - let x = values.get_mut(k).unwrap(); - x.insert(DataType::Boolean); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(DataType::Boolean); - values.insert(k.to_string(), hs); - } - Ok(()) - } - Value::Null => { - // do nothing, we treat json as nullable by default when - // inferring - Ok(()) - } - Value::Number(n) => { - if n.is_f64() { - if values.contains_key(k) { - let x = values.get_mut(k).unwrap(); - x.insert(DataType::Float64); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(DataType::Float64); - values.insert(k.to_string(), hs); - } - } else { - // default to i64 - if values.contains_key(k) { - let x = values.get_mut(k).unwrap(); - x.insert(DataType::Int64); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(DataType::Int64); - values.insert(k.to_string(), hs); - } - } - Ok(()) - } - Value::String(_) => { - if values.contains_key(k) { - let x = values.get_mut(k).unwrap(); - x.insert(DataType::Utf8); - } else { - // create hashset and add value type - let mut hs = HashSet::new(); - hs.insert(DataType::Utf8); - values.insert(k.to_string(), hs); - } - Ok(()) - } - Value::Object(_) => Err(ArrowError::NotYetImplemented( - "Inferring schema from nested JSON structs currently not supported" - .to_string(), - )), - } - }); - match res { - Ok(()) => {} - Err(e) => return Err(e), - } - } - value => { - return Err(ArrowError::ExternalFormat(format!( - "Expected JSON record to be an object, found {:?}", - value - ))); - } - }; + match record?.borrow() { + Value::Object(map) => map.iter().try_for_each(|(k, v)| { + let data_type = infer_value(v)?; + add_or_insert(&mut values, k, data_type); + Result::Ok(()) + }), + value => Err(ArrowError::ExternalFormat(format!( + "Expected JSON record to be an object, found {:?}", + value + ))), + }?; } - Ok(generate_schema(values)) + Ok(resolve_fields(values)) } -/// Infer the fields of a JSON file by reading the first n records of the file, with -/// `max_read_records` controlling the maximum number of records to read. -/// -/// If `max_read_records` is not set, the whole file is read to infer its field types. +/// Infer the fields of a JSON file from `number_of_rows` in `reader`. /// -/// Contrary to [`infer_json_schema`], this function will seek back to the start of the `reader`. -/// That way, the `reader` can be used immediately afterwards. +/// This function seeks back to the start of the `reader`. /// /// # Examples /// ``` /// use std::fs::File; -/// use std::io::{BufReader, Cursor}; -/// use arrow2::io::json::infer_json_schema_from_seekable; +/// use std::io::Cursor; +/// use arrow2::io::json::read::infer_and_reset; /// /// let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} /// {"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} /// {"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} /// {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} /// "#; -/// let mut reader = BufReader::new(Cursor::new(data)); -/// let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap(); +/// let mut reader = Cursor::new(data); +/// let fields = infer_and_reset(&mut reader, None).unwrap(); /// // cursor's position automatically set at 0 /// ``` -pub fn infer_json_schema_from_seekable( - reader: &mut BufReader, - max_read_records: Option, -) -> Result { - let schema = infer_json_schema(reader, max_read_records); - // return the reader seek back to the start +pub fn infer_and_reset( + reader: &mut R, + number_of_rows: Option, +) -> Result> { + let fields = infer(reader, number_of_rows); reader.seek(SeekFrom::Start(0))?; + fields +} - schema +fn infer_value(value: &Value) -> Result { + Ok(match value { + Value::Bool(_) => DataType::Boolean, + Value::Array(array) => infer_array(array)?, + Value::Null => DataType::Null, + Value::Number(number) => infer_number(number), + Value::String(_) => DataType::Utf8, + Value::Object(_) => { + return Err(ArrowError::NotYetImplemented( + "Inferring schema from nested JSON structs currently not supported".to_string(), + )) + } + }) +} + +fn infer_array(array: &[Value]) -> Result { + let types = array.iter().map(|a| { + Ok(match a { + Value::Null => None, + Value::Number(n) => Some(infer_number(n)), + Value::Bool(_) => Some(DataType::Boolean), + Value::String(_) => Some(DataType::Utf8), + Value::Array(array) => Some(infer_array(array)?), + Value::Object(_) => { + return Err(ArrowError::NotYetImplemented( + "Nested structs not yet supported".to_string(), + )) + } + }) + }); + // discard None values and deduplicate entries + let types = types + .into_iter() + .map(|x| x.transpose()) + .flatten() + .collect::>>()?; + + // if a record contains only nulls, it is not + // added to values + Ok(if !types.is_empty() { + let types = types.into_iter().collect::>(); + let dt = coerce_data_type(&types); + DataType::List(Box::new(Field::new("item", dt, true))) + } else { + DataType::Null + }) +} + +fn infer_number(n: &serde_json::Number) -> DataType { + if n.is_f64() { + DataType::Float64 + } else { + DataType::Int64 + } +} + +fn add_or_insert(values: &mut Tracker, key: &str, data_type: DataType) { + if data_type == DataType::Null { + return; + } + if values.contains_key(key) { + let x = values.get_mut(key).unwrap(); + x.insert(data_type); + } else { + // create hashset and add value type + let mut hs = HashSet::new(); + hs.insert(data_type); + values.insert(key.to_string(), hs); + } +} + +fn resolve_fields(spec: HashMap>) -> Vec { + spec.iter() + .map(|(k, hs)| { + let v: Vec<&DataType> = hs.iter().collect(); + Field::new(k, coerce_data_type(&v), true) + }) + .collect() +} + +/// Coerce an heterogeneous set of [`DataType`] into a single one. Rules: +/// * `Int64` and `Float64` are `Float64` +/// * Lists and scalars are coerced to a list of a compatible scalar +/// * All other types are coerced to `Utf8` +fn coerce_data_type>(dt: &[A]) -> DataType { + use DataType::*; + if dt.len() == 1 { + return dt[0].borrow().clone(); + } else if dt.len() > 2 { + return List(Box::new(Field::new("item", Utf8, true))); + } + let (lhs, rhs) = (dt[0].borrow(), dt[1].borrow()); + + return match (lhs, rhs) { + (lhs, rhs) if lhs == rhs => lhs.clone(), + (List(lhs), List(rhs)) => { + let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]); + List(Box::new(Field::new("item", inner, true))) + } + (scalar, List(list)) => { + let inner = coerce_data_type(&[scalar, list.data_type()]); + List(Box::new(Field::new("item", inner, true))) + } + (List(list), scalar) => { + let inner = coerce_data_type(&[scalar, list.data_type()]); + List(Box::new(Field::new("item", inner, true))) + } + (Float64, Int64) => Float64, + (Int64, Float64) => Float64, + (Int64, Boolean) => Int64, + (Boolean, Int64) => Int64, + (_, _) => Utf8, + }; } #[cfg(test)] @@ -290,21 +209,21 @@ mod test { use crate::datatypes::DataType::*; assert_eq!( + coerce_data_type(&[Float64, List(Box::new(Field::new("item", Float64, true)))]), List(Box::new(Field::new("item", Float64, true))), - coerce_data_type(&[&Float64, &List(Box::new(Field::new("item", Float64, true)))]) ); assert_eq!( + coerce_data_type(&[Float64, List(Box::new(Field::new("item", Int64, true)))]), List(Box::new(Field::new("item", Float64, true))), - coerce_data_type(&[&Float64, &List(Box::new(Field::new("item", Int64, true)))]) ); assert_eq!( + coerce_data_type(&[Int64, List(Box::new(Field::new("item", Int64, true)))]), List(Box::new(Field::new("item", Int64, true))), - coerce_data_type(&[&Int64, &List(Box::new(Field::new("item", Int64, true)))]) ); // boolean and number are incompatible, return utf8 assert_eq!( + coerce_data_type(&[Boolean, List(Box::new(Field::new("item", Float64, true)))]), List(Box::new(Field::new("item", Utf8, true))), - coerce_data_type(&[&Boolean, &List(Box::new(Field::new("item", Float64, true)))]) ); } } diff --git a/src/io/json/read/iterator.rs b/src/io/json/read/iterator.rs new file mode 100644 index 00000000000..7670fa32e8f --- /dev/null +++ b/src/io/json/read/iterator.rs @@ -0,0 +1,56 @@ +use std::io::BufRead; + +use serde_json::Value; + +use crate::error::{ArrowError, Result}; + +#[derive(Debug)] +pub struct ValueIter<'a, R: BufRead> { + reader: &'a mut R, + remaining: usize, + // reuse line buffer to avoid allocation on each record + line_buf: String, +} + +impl<'a, R: BufRead> ValueIter<'a, R> { + pub fn new(reader: &'a mut R, number_of_rows: Option) -> Self { + Self { + reader, + remaining: number_of_rows.unwrap_or(usize::MAX), + line_buf: String::new(), + } + } +} + +impl<'a, R: BufRead> Iterator for ValueIter<'a, R> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.remaining == 0 { + return None; + } + + loop { + self.line_buf.truncate(0); + match self.reader.read_line(&mut self.line_buf) { + Ok(0) => { + // read_line returns 0 when stream reached EOF + return None; + } + Err(e) => { + return Some(Err(ArrowError::from(e))); + } + _ => { + let trimmed_s = self.line_buf.trim(); + if trimmed_s.is_empty() { + // ignore empty lines + continue; + } + + self.remaining -= 1; + return Some(serde_json::from_str(trimmed_s).map_err(ArrowError::from)); + } + } + } + } +} diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index e34de3a0fab..0e534a78358 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -1,24 +1,34 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - +//! APIs to read and deserialize from JSON mod deserialize; mod infer_schema; -mod reader; -mod util; +mod iterator; + +use crate::error::{ArrowError, Result}; +pub use deserialize::deserialize; pub use infer_schema::*; -pub use reader::*; + +/// Reads rows from `reader` into `rows`. Returns the number of read items. +/// IO-bounded. +pub fn read_rows(reader: &mut R, rows: &mut [String]) -> Result { + let mut row_number = 0; + for row in rows.iter_mut() { + loop { + row.truncate(0); + let _ = reader.read_line(row).map_err(|e| { + ArrowError::External(format!(" at line {}", row_number), Box::new(e)) + })?; + if row.is_empty() { + break; + } + if !row.trim().is_empty() { + break; + } + } + if row.is_empty() { + break; + } + row_number += 1; + } + Ok(row_number) +} diff --git a/src/io/json/read/reader.rs b/src/io/json/read/reader.rs deleted file mode 100644 index fe5fb252a4f..00000000000 --- a/src/io/json/read/reader.rs +++ /dev/null @@ -1,288 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io::{BufReader, Read, Seek}; -use std::sync::Arc; - -use serde_json::Value; - -use crate::array::*; -use crate::datatypes::*; -use crate::error::{ArrowError, Result}; -use crate::record_batch::RecordBatch; - -use super::{deserialize::read, infer_json_schema_from_seekable, util::ValueIter}; - -#[derive(Debug)] -struct Decoder { - /// Explicit schema for the JSON file - schema: Arc, - /// Optional projection for which columns to load (case-sensitive names) - projection: Option>, - /// Batch size (number of records to load each time) - batch_size: usize, -} - -impl Decoder { - /// Create a new JSON decoder from any value that implements the `Iterator>` - /// trait. - pub fn new(schema: Arc, batch_size: usize, projection: Option>) -> Self { - let schema = match &projection { - Some(projection) => { - let fields = schema.fields(); - let projected_fields: Vec = fields - .iter() - .filter_map(|field| { - if projection.contains(field.name()) { - Some(field.clone()) - } else { - None - } - }) - .collect(); - - Arc::new(Schema::new(projected_fields)) - } - None => schema, - }; - - Self { - schema, - projection, - batch_size, - } - } - - /// Returns the schema of the reader, useful for getting the schema without reading - /// record batches - pub fn schema(&self) -> &Arc { - &self.schema - } - - /// Read the next batch of records - pub fn next_batch(&self, value_iter: &mut I) -> Result> - where - I: Iterator>, - { - let rows = value_iter - .take(self.batch_size) - .map(|value| { - let v = value?; - match v { - Value::Object(_) => Ok(v), - _ => Err(ArrowError::ExternalFormat(format!( - "Row needs to be of type object, got: {:?}", - v - ))), - } - }) - .collect::>>()?; - let rows = rows.iter().collect::>(); - - if rows.is_empty() { - // reached end of file - return Ok(None); - } - - let projection = self.projection.clone().unwrap_or_else(Vec::new); - - let projected_fields = if projection.is_empty() { - self.schema.fields().to_vec() - } else { - projection - .iter() - .map(|name| self.schema.column_with_name(name).map(|x| x.1.clone())) - .flatten() - .collect() - }; - - let data_type = DataType::Struct(projected_fields.clone()); - let array = read(&rows, data_type); - let array = array.as_any().downcast_ref::().unwrap(); - let arrays = array.values().to_vec(); - - let projected_schema = Arc::new(Schema::new(projected_fields)); - - RecordBatch::try_new(projected_schema, arrays).map(Some) - } -} - -/// JSON Reader -/// -/// This JSON reader allows JSON line-delimited files to be read into the Arrow memory -/// model. Records are loaded in batches and are then converted from row-based data to -/// columnar data. -/// -/// Example: -/// -/// ``` -/// use std::sync::Arc; -/// use arrow2::datatypes::{DataType, Field, Schema}; -/// use arrow2::io::json; -/// use std::io::{Cursor, BufReader}; -/// -/// let schema = Arc::new(Schema::new(vec![ -/// Field::new("a", DataType::Int64, true), -/// Field::new("b", DataType::Float32, true), -/// Field::new("c", DataType::Boolean, true), -/// Field::new("d", DataType::Utf8, true), -/// ])); -/// -/// let data = r#"{"a":1, "b":2.0, "c":false, "d":"4"} -/// {"a":-10, "b":-3.5, "c":true, "d":null} -/// {"a":100000000, "b":0.6, "d":"text"}"#; -/// let mut reader = BufReader::new(Cursor::new(data)); -/// let mut reader = json::Reader::new(&mut reader, schema, 1024, None); -/// let batch = reader.next().unwrap().unwrap(); -/// ``` -#[derive(Debug)] -pub struct Reader { - reader: BufReader, - /// JSON value decoder - decoder: Decoder, -} - -impl Reader { - /// Create a new JSON Reader from any value that implements the `Read` trait. - /// - /// If reading a `File`, you can customise the Reader, such as to enable schema - /// inference, use `ReaderBuilder`. - pub fn new( - reader: R, - schema: Arc, - batch_size: usize, - projection: Option>, - ) -> Self { - Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection) - } - - /// Create a new JSON Reader from a `BufReader` - /// - /// To customize the schema, such as to enable schema inference, use `ReaderBuilder` - pub fn from_buf_reader( - reader: BufReader, - schema: Arc, - batch_size: usize, - projection: Option>, - ) -> Self { - Self { - reader, - decoder: Decoder::new(schema, batch_size, projection), - } - } - - /// Returns the schema of the reader, useful for getting the schema without reading - /// record batches - pub fn schema(&self) -> &Arc { - self.decoder.schema() - } - - /// Read the next batch of records - #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Result> { - self.decoder - .next_batch(&mut ValueIter::new(&mut self.reader, None)) - } -} - -/// JSON file reader builder -#[derive(Debug)] -pub struct ReaderBuilder { - /// Optional schema for the JSON file - /// - /// If the schema is not supplied, the reader will try to infer the schema - /// based on the JSON structure. - schema: Option>, - /// Optional maximum number of records to read during schema inference - /// - /// If a number is not provided, all the records are read. - max_records: Option, - /// Batch size (number of records to load each time) - /// - /// The default batch size when using the `ReaderBuilder` is 1024 records - batch_size: usize, - /// Optional projection for which columns to load (zero-based column indices) - projection: Option>, -} - -impl Default for ReaderBuilder { - fn default() -> Self { - Self { - schema: None, - max_records: None, - batch_size: 1024, - projection: None, - } - } -} - -impl ReaderBuilder { - ///! Returns a new [`ReaderBuilder`]. - pub fn new() -> Self { - Self::default() - } - - /// Set the JSON file's schema - pub fn with_schema(mut self, schema: Arc) -> Self { - self.schema = Some(schema); - self - } - - /// Set the JSON reader to infer the schema of the file - pub fn infer_schema(mut self, max_records: Option) -> Self { - // remove any schema that is set - self.schema = None; - self.max_records = max_records; - self - } - - /// Set the batch size (number of records to load at one time) - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; - self - } - - /// Set the reader's column projection - pub fn with_projection(mut self, projection: Vec) -> Self { - self.projection = Some(projection); - self - } - - /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, source: R) -> Result> - where - R: Read + Seek, - { - let mut buf_reader = BufReader::new(source); - - // check if schema should be inferred - let schema = match self.schema { - Some(schema) => schema, - None => Arc::new(infer_json_schema_from_seekable( - &mut buf_reader, - self.max_records, - )?), - }; - - Ok(Reader::from_buf_reader( - buf_reader, - schema, - self.batch_size, - self.projection, - )) - } -} diff --git a/src/io/json/read/util.rs b/src/io/json/read/util.rs deleted file mode 100644 index c06efa236a3..00000000000 --- a/src/io/json/read/util.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io::{BufRead, BufReader, Read}; - -use serde_json::Value; - -use crate::error::{ArrowError, Result}; - -#[derive(Debug)] -pub(super) struct ValueIter<'a, R: Read> { - reader: &'a mut BufReader, - max_read_records: Option, - record_count: usize, - // reuse line buffer to avoid allocation on each record - line_buf: String, -} - -impl<'a, R: Read> ValueIter<'a, R> { - pub fn new(reader: &'a mut BufReader, max_read_records: Option) -> Self { - Self { - reader, - max_read_records, - record_count: 0, - line_buf: String::new(), - } - } -} - -impl<'a, R: Read> Iterator for ValueIter<'a, R> { - type Item = Result; - - fn next(&mut self) -> Option { - if let Some(max) = self.max_read_records { - if self.record_count >= max { - return None; - } - } - - loop { - self.line_buf.truncate(0); - match self.reader.read_line(&mut self.line_buf) { - Ok(0) => { - // read_line returns 0 when stream reached EOF - return None; - } - Err(e) => { - return Some(Err(ArrowError::from(e))); - } - _ => { - let trimmed_s = self.line_buf.trim(); - if trimmed_s.is_empty() { - // ignore empty lines - continue; - } - - self.record_count += 1; - return Some(serde_json::from_str(trimmed_s).map_err(ArrowError::from)); - } - } - } - } -} diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index 1b70e79d1c5..a4988bdb6f3 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -8,14 +8,26 @@ use serde_json::Value; use arrow2::array::*; use arrow2::datatypes::*; -use arrow2::io::json::{LineDelimitedWriter, ReaderBuilder}; +use arrow2::error::Result; +use arrow2::io::json::read as json_read; +use arrow2::io::json::LineDelimitedWriter; +use arrow2::record_batch::RecordBatch; + +fn read_batch(data: String, fields: Vec) -> Result { + let mut reader = Cursor::new(data); + + let mut rows = vec![String::default(); 1024]; + let read = json_read::read_rows(&mut reader, &mut rows)?; + let rows = &rows[..read]; + json_read::deserialize(rows, fields) +} + +fn round_trip(data: String) -> Result<()> { + let mut reader = Cursor::new(data); + let fields = json_read::infer(&mut reader, None)?; + let data = reader.into_inner(); -fn round_trip(data: String) { - let builder = ReaderBuilder::new() - .infer_schema(None) - .with_batch_size(1024); - let mut reader = builder.build(Cursor::new(data.clone())).unwrap(); - let batch = reader.next().unwrap().unwrap(); + let batch = read_batch(data.clone(), fields)?; let mut buf = Vec::new(); { @@ -39,18 +51,19 @@ fn round_trip(data: String) { assert_eq!(result_json, expected_json); } } + Ok(()) } #[test] -fn round_trip_basics() { +fn round_trip_basics() -> Result<()> { let (data, _, _) = case_basics(); - round_trip(data); + round_trip(data) } #[test] -fn round_trip_list() { +fn round_trip_list() -> Result<()> { let (data, _, _) = case_list(); - round_trip(data); + round_trip(data) } fn case_list() -> (String, Schema, Vec>) { diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 824ae1806d6..c0aa763032b 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -1,18 +1,17 @@ -use std::io::BufReader; use std::{io::Cursor, sync::Arc}; use arrow2::array::*; use arrow2::datatypes::*; -use arrow2::{bitmap::Bitmap, buffer::Buffer, error::Result, io::json::*}; +use arrow2::io::json::read; +use arrow2::{bitmap::Bitmap, buffer::Buffer, error::Result}; -use crate::io::json::*; +use super::*; #[test] fn basic() -> Result<()> { let (data, schema, columns) = case_basics(); - let mut reader = ReaderBuilder::new().build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch(data, schema.fields.clone())?; assert_eq!(&schema, batch.schema().as_ref()); @@ -24,13 +23,10 @@ fn basic() -> Result<()> { } #[test] -fn basics_with_schema_projection() -> Result<()> { +fn basic_projection() -> Result<()> { let (data, schema, columns) = case_basics_schema(); - let mut reader = ReaderBuilder::new() - .with_schema(Arc::new(schema.clone())) - .build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch(data, schema.fields.clone())?; assert_eq!(&schema, batch.schema().as_ref()); @@ -45,9 +41,7 @@ fn basics_with_schema_projection() -> Result<()> { fn lists() -> Result<()> { let (data, schema, columns) = case_list(); - let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64); - let mut reader = builder.build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch(data, schema.fields.clone())?; assert_eq!(&schema, batch.schema().as_ref()); @@ -66,9 +60,10 @@ fn line_break_in_values() -> Result<()> { {"a":null} "#; - let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64); - let mut reader = builder.build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch( + data.to_string(), + vec![Field::new("a", DataType::Utf8, true)], + )?; let expected = Utf8Array::::from(&[Some("aa\n\n"), Some("aa\n"), None]); @@ -78,8 +73,7 @@ fn line_break_in_values() -> Result<()> { #[test] fn invalid_infer_schema() -> Result<()> { - let re = - infer_json_schema_from_seekable(&mut BufReader::new(Cursor::new("city,lat,lng")), None); + let re = read::infer(&mut Cursor::new("city,lat,lng"), None); assert_eq!( re.err().unwrap().to_string(), "External error: expected value at line 1 column 1", @@ -89,16 +83,15 @@ fn invalid_infer_schema() -> Result<()> { #[test] fn invalid_read_record() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new( + let fields = vec![Field::new( "a", DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]), true, - )])); - let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64); - let mut data = Cursor::new("city,lat,lng"); - let mut reader = builder.build(&mut data)?; + )]; + let batch = read_batch("city,lat,lng".to_string(), fields); + assert_eq!( - reader.next().err().unwrap().to_string(), + batch.err().unwrap().to_string(), "External error: expected value at line 1 column 1", ); Ok(()) @@ -108,9 +101,7 @@ fn invalid_read_record() -> Result<()> { fn nested_struct_arrays() -> Result<()> { let (data, schema, columns) = case_struct(); - let builder = ReaderBuilder::new().with_schema(Arc::new(schema.clone())); - let mut reader = builder.build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch(data, schema.fields.clone())?; assert_eq!(&schema, batch.schema().as_ref()); @@ -122,7 +113,7 @@ fn nested_struct_arrays() -> Result<()> { } #[test] -fn nested_list_arrays() { +fn nested_list_arrays() -> Result<()> { let d_field = Field::new("d", DataType::Utf8, true); let c_field = Field::new("c", DataType::Struct(vec![d_field.clone()]), true); let b_field = Field::new("b", DataType::Boolean, true); @@ -133,16 +124,17 @@ fn nested_list_arrays() { ); let a_list_data_type = DataType::List(Box::new(a_struct_field)); let a_field = Field::new("a", a_list_data_type.clone(), true); - let schema = Arc::new(Schema::new(vec![a_field])); - let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64); - let content = r#" + let fields = vec![a_field]; + + let data = r#" {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]} {"a": [{"b": false, "c": null}]} {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]} {"a": null} {"a": []} "#; - let mut reader = builder.build(Cursor::new(content)).unwrap(); + + let batch = read_batch(data.to_string(), fields)?; // build expected output let d = Utf8Array::::from(&vec![ @@ -176,41 +168,38 @@ fn nested_list_arrays() { Some(Bitmap::from_u8_slice([0b00010111], 5)), ); - // compare `a` with result from json reader - let batch = reader.next().unwrap().unwrap(); - let read = batch.column(0); - assert_eq!(expected, read.as_ref()); + assert_eq!(expected, batch.column(0).as_ref()); + Ok(()) } #[test] fn skip_empty_lines() { - let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64); - let content = " + let data = " {\"a\": 1} {\"a\": 2} {\"a\": 3}"; - let mut reader = builder.build(Cursor::new(content)).unwrap(); - let batch = reader.next().unwrap().unwrap(); + + let batch = read_batch( + data.to_string(), + vec![Field::new("a", DataType::Int64, true)], + ) + .unwrap(); assert_eq!(1, batch.num_columns()); assert_eq!(3, batch.num_rows()); - - let schema = reader.schema(); - let c = schema.column_with_name("a").unwrap(); - assert_eq!(&DataType::Int64, c.1.data_type()); } #[test] fn row_type_validation() { - let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64); - let content = " + let data = " [1, \"hello\"] \"world\""; - let re = builder.build(Cursor::new(content)); + + let batch = read::infer(&mut Cursor::new(data.to_string()), None); assert_eq!( - re.err().unwrap().to_string(), + batch.err().unwrap().to_string(), r#"External format error: Expected JSON record to be an object, found Array([Number(1), String("hello")])"#, ); } @@ -219,49 +208,14 @@ fn row_type_validation() { fn list_of_string_dictionary_from_with_nulls() -> Result<()> { let (data, schema, columns) = case_dict(); - let builder = ReaderBuilder::new() - .with_schema(Arc::new(schema)) - .with_batch_size(64); - let mut reader = builder.build(Cursor::new(data))?; - let batch = reader.next()?.unwrap(); + let batch = read_batch(data, schema.fields.clone())?; - assert_eq!(reader.schema(), batch.schema()); + assert_eq!(&schema, batch.schema().as_ref()); assert_eq!(columns[0].as_ref(), batch.columns()[0].as_ref()); Ok(()) } -#[test] -fn with_multiple_batches() -> Result<()> { - let data = r#" - {"a":1} - {"a":null} - {} - {"a":1} - {"a":7} - {"a":1} - {"a":1} - {"a":5} - {"a":1} - {"a":1} - {"a":1} - {} - "#; - - let builder = ReaderBuilder::new() - .infer_schema(Some(4)) - .with_batch_size(5); - let mut reader = builder.build(Cursor::new(data))?; - - let mut num_records = Vec::new(); - while let Some(rb) = reader.next()? { - num_records.push(rb.num_rows()); - } - - assert_eq!(vec![5, 5, 2], num_records); - Ok(()) -} - #[test] fn infer_schema_mixed_list() -> Result<()> { let data = r#"{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} @@ -270,7 +224,7 @@ fn infer_schema_mixed_list() -> Result<()> { {"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} "#; - let schema = Schema::new(vec![ + let fields = vec![ Field::new("a", DataType::Int64, true), Field::new( "b", @@ -287,10 +241,10 @@ fn infer_schema_mixed_list() -> Result<()> { DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), true, ), - ]); + ]; - let inferred_schema = infer_json_schema(&mut BufReader::new(Cursor::new(data)), None)?; + let result = read::infer(&mut Cursor::new(data), None)?; - assert_eq!(inferred_schema, schema); + assert_eq!(result, fields); Ok(()) }