diff --git a/Cargo.toml b/Cargo.toml index 78cf2db0e87..7b7f9c48f70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,9 +39,7 @@ regex = { version = "^1.3", optional = true } streaming-iterator = { version = "0.1", optional = true } fallible-streaming-iterator = { version = "0.1", optional = true } -serde = { version = "^1.0", features = ["rc"], optional = true } -serde_derive = { version = "^1.0", optional = true } -serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } +json-deserializer = { version = "0.3", optional = true } indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format @@ -72,6 +70,9 @@ parquet2 = { version = "0.13", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } +serde = { version = "^1.0", features = ["rc"], optional = true } +serde_derive = { version = "^1.0", optional = true } +serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } @@ -134,7 +135,7 @@ io_csv_async = ["io_csv_read_async"] io_csv_read = ["csv", "lexical-core"] io_csv_read_async = ["csv-async", "lexical-core", "futures"] io_csv_write = ["csv-core", "streaming-iterator", "lexical-core"] -io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] +io_json = ["json-deserializer", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] io_ipc_read_async = ["io_ipc", "futures", "async-stream"] @@ -156,9 +157,9 @@ io_avro_compression = [ "crc", ] io_avro_async = ["io_avro", "futures", "async-stream"] -# io_json: its dependencies + error handling +# serde+serde_json: its dependencies + error handling # serde_derive: there is some derive around -io_json_integration = ["io_json", "serde_derive", "hex"] +io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"] io_print = ["comfy-table"] # the compute kernels. Disabling this significantly reduces compile time. compute_aggregate = ["multiversion"] diff --git a/benches/read_json.rs b/benches/read_json.rs index ae076522d57..5d0400c5921 100644 --- a/benches/read_json.rs +++ b/benches/read_json.rs @@ -14,13 +14,15 @@ fn prep(array: impl Array + 'static) -> (Vec, DataType) { // the operation of writing is IO-bounded. write::write(&mut data, blocks).unwrap(); - let dt = read::infer(&serde_json::from_slice(&data).unwrap()).unwrap(); + let value = read::json_deserializer::parse(&data).unwrap(); + + let dt = read::infer(&value).unwrap(); (data, dt) } fn bench_read(data: &[u8], dt: &DataType) { - let json = serde_json::from_slice(data).unwrap(); - read::deserialize(&json, dt.clone()).unwrap(); + let value = read::json_deserializer::parse(data).unwrap(); + read::deserialize(&value, dt.clone()).unwrap(); } fn add_benchmark(c: &mut Criterion) { diff --git a/examples/json_read.rs b/examples/json_read.rs index 1630aa65047..608980a62de 100644 --- a/examples/json_read.rs +++ b/examples/json_read.rs @@ -1,5 +1,5 @@ -use std::fs::File; -use std::io::BufReader; +/// Example of reading a JSON file. +use std::fs; use std::sync::Arc; use arrow2::array::Array; @@ -7,12 +7,16 @@ use arrow2::error::Result; use arrow2::io::json::read; fn read_path(path: &str) -> Result> { - // Example of reading a JSON file. - let reader = BufReader::new(File::open(path)?); - let json = serde_json::from_reader(reader)?; + // read the file into memory (IO-bounded) + let data = fs::read(path)?; + // create a non-owning struct of the data (CPU-bounded) + let json = read::json_deserializer::parse(&data)?; + + // use it to infer an Arrow schema (CPU-bounded) let data_type = read::infer(&json)?; + // and deserialize it (CPU-bounded) read::deserialize(&json, data_type) } diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs index 80b81535291..05f5fd43d92 100644 --- a/src/io/avro/read/decompress.rs +++ b/src/io/avro/read/decompress.rs @@ -9,6 +9,7 @@ use super::super::{Block, CompressedBlock}; use super::BlockStreamIterator; use super::Compression; +#[cfg(feature = "io_avro_compression")] const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); /// Decompresses an Avro block. diff --git a/src/io/avro/write/compress.rs b/src/io/avro/write/compress.rs index 19b69460eec..5121003a2b2 100644 --- a/src/io/avro/write/compress.rs +++ b/src/io/avro/write/compress.rs @@ -5,6 +5,7 @@ use crate::error::Result; use super::Compression; use super::{Block, CompressedBlock}; +#[cfg(feature = "io_avro_compression")] const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); /// Compresses a [`Block`] to a [`CompressedBlock`]. diff --git a/src/io/json/mod.rs b/src/io/json/mod.rs index ad406f4ffdd..ebbdc92b69f 100644 --- a/src/io/json/mod.rs +++ b/src/io/json/mod.rs @@ -5,8 +5,8 @@ pub mod write; use crate::error::Error; -impl From for Error { - fn from(error: serde_json::error::Error) -> Self { - Error::External("".to_string(), Box::new(error)) +impl From for Error { + fn from(error: json_deserializer::Error) -> Self { + Error::ExternalFormat(error.to_string()) } } diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index fbee18ab705..41d69f00c01 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -4,8 +4,7 @@ use std::{collections::hash_map::DefaultHasher, sync::Arc}; use hash_hasher::HashedMap; use indexmap::map::IndexMap as HashMap; -use num_traits::NumCast; -use serde_json::Value; +use json_deserializer::{Number, Value}; use crate::{ array::*, @@ -17,7 +16,7 @@ use crate::{ /// A function that converts a &Value into an optional tuple of a byte slice and a Value. /// This is used to create a dictionary, where the hashing depends on the DataType of the child object. -type Extract = Box Option<(u64, &Value)>>; +type Extract<'a> = Box) -> Option<(u64, &'a Value<'a>)>>; fn build_extract(data_type: &DataType) -> Extract { match data_type { @@ -27,11 +26,10 @@ fn build_extract(data_type: &DataType) -> Extract { hasher.write(v.as_bytes()); Some((hasher.finish(), value)) } - Value::Number(v) => v.as_f64().map(|x| { - let mut hasher = DefaultHasher::new(); - hasher.write(&x.to_le_bytes()); - (hasher.finish(), value) - }), + Value::Number(v) => match v { + Number::Float(_, _) => todo!(), + Number::Integer(_, _) => todo!(), + }, Value::Bool(v) => { let mut hasher = DefaultHasher::new(); hasher.write(&[*v as u8]); @@ -40,25 +38,24 @@ fn build_extract(data_type: &DataType) -> Extract { _ => None, }), DataType::Int32 | DataType::Int64 | DataType::Int16 | DataType::Int8 => { - Box::new(move |value| match &value { - Value::Number(v) => v.as_f64().map(|x| { + Box::new(move |value| { + let integer = match value { + Value::Number(number) => Some(deserialize_int_single::(*number)), + Value::Bool(number) => Some(if *number { 1i64 } else { 0i64 }), + _ => None, + }; + integer.map(|integer| { let mut hasher = DefaultHasher::new(); - hasher.write(&x.to_le_bytes()); + hasher.write(&integer.to_le_bytes()); (hasher.finish(), value) - }), - Value::Bool(v) => { - let mut hasher = DefaultHasher::new(); - hasher.write(&[*v as u8]); - Some((hasher.finish(), value)) - } - _ => None, + }) }) } _ => Box::new(|_| None), } } -fn deserialize_boolean>(rows: &[A]) -> BooleanArray { +fn deserialize_boolean<'a, A: Borrow>>(rows: &[A]) -> BooleanArray { let iter = rows.iter().map(|row| match row.borrow() { Value::Bool(v) => Some(v), _ => None, @@ -66,31 +63,125 @@ fn deserialize_boolean>(rows: &[A]) -> BooleanArray { BooleanArray::from_trusted_len_iter(iter) } -fn deserialize_int>( +fn deserialize_int_single(number: Number) -> T +where + T: NativeType + lexical_core::FromLexical + Pow10, +{ + match number { + Number::Float(fraction, exponent) => { + let integer = fraction.split(|x| *x == b'.').next().unwrap(); + let mut integer: T = lexical_core::parse(integer).unwrap(); + if !exponent.is_empty() { + let exponent: u32 = lexical_core::parse(exponent).unwrap(); + integer = integer.pow10(exponent); + } + integer + } + Number::Integer(integer, exponent) => { + let mut integer: T = lexical_core::parse(integer).unwrap(); + if !exponent.is_empty() { + let exponent: u32 = lexical_core::parse(exponent).unwrap(); + integer = integer.pow10(exponent); + } + integer + } + } +} + +trait Powi10: NativeType + num_traits::One + std::ops::Add { + fn powi10(self, exp: i32) -> Self; +} + +impl Powi10 for f32 { + #[inline] + fn powi10(self, exp: i32) -> Self { + self * 10.0f32.powi(exp) + } +} + +impl Powi10 for f64 { + #[inline] + fn powi10(self, exp: i32) -> Self { + self * 10.0f64.powi(exp) + } +} + +trait Pow10: NativeType + num_traits::One + std::ops::Add { + fn pow10(self, exp: u32) -> Self; +} + +macro_rules! impl_pow10 { + ($ty:ty) => { + impl Pow10 for $ty { + #[inline] + fn pow10(self, exp: u32) -> Self { + self * (10 as $ty).pow(exp) + } + } + }; +} +impl_pow10!(u8); +impl_pow10!(u16); +impl_pow10!(u32); +impl_pow10!(u64); +impl_pow10!(i8); +impl_pow10!(i16); +impl_pow10!(i32); +impl_pow10!(i64); + +fn deserialize_float_single(number: &Number) -> T +where + T: NativeType + lexical_core::FromLexical + Powi10, +{ + match number { + Number::Float(float, exponent) => { + let mut float: T = lexical_core::parse(float).unwrap(); + if !exponent.is_empty() { + let exponent: i32 = lexical_core::parse(exponent).unwrap(); + float = float.powi10(exponent); + } + float + } + Number::Integer(integer, exponent) => { + let mut float: T = lexical_core::parse(integer).unwrap(); + if !exponent.is_empty() { + let exponent: i32 = lexical_core::parse(exponent).unwrap(); + float = float.powi10(exponent); + } + float + } + } +} + +fn deserialize_int<'a, T: NativeType + lexical_core::FromLexical + Pow10, A: Borrow>>( rows: &[A], data_type: DataType, ) -> PrimitiveArray { let iter = rows.iter().map(|row| match row.borrow() { - Value::Number(number) => number.as_i64().and_then(num_traits::cast::), - Value::Bool(number) => num_traits::cast::(*number as i32), + Value::Number(number) => Some(deserialize_int_single(*number)), + Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); PrimitiveArray::from_trusted_len_iter(iter).to(data_type) } -fn deserialize_float>( +fn deserialize_float< + 'a, + T: NativeType + lexical_core::FromLexical + Powi10, + A: Borrow>, +>( rows: &[A], data_type: DataType, ) -> PrimitiveArray { let iter = rows.iter().map(|row| match row.borrow() { - Value::Number(number) => number.as_f64().and_then(num_traits::cast::), - Value::Bool(number) => num_traits::cast::(*number as i32), + Value::Number(number) => Some(deserialize_float_single(number)), + Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); PrimitiveArray::from_trusted_len_iter(iter).to(data_type) } -fn deserialize_binary>(rows: &[A]) -> BinaryArray { +fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> BinaryArray { let iter = rows.iter().map(|row| match row.borrow() { Value::String(v) => Some(v.as_bytes()), _ => None, @@ -98,17 +189,31 @@ fn deserialize_binary>(rows: &[A]) -> BinaryArray BinaryArray::from_trusted_len_iter(iter) } -fn deserialize_utf8>(rows: &[A]) -> Utf8Array { - let iter = rows.iter().map(|row| match row.borrow() { - Value::String(v) => Some(v.clone()), - Value::Number(v) => Some(v.to_string()), - Value::Bool(v) => Some(v.to_string()), - _ => None, - }); - Utf8Array::::from_trusted_len_iter(iter) +fn deserialize_utf8<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Utf8Array { + let mut array = MutableUtf8Array::::with_capacity(rows.len()); + let mut scratch = vec![]; + for row in rows { + match row.borrow() { + Value::String(v) => array.push(Some(v.as_ref())), + Value::Number(number) => match number { + Number::Integer(number, exponent) | Number::Float(number, exponent) => { + scratch.clear(); + scratch.extend_from_slice(*number); + scratch.push(b'e'); + scratch.extend_from_slice(*exponent); + } + }, + Value::Bool(v) => array.push(Some(if *v { "true" } else { "false" })), + _ => array.push_null(), + } + } + array.into() } -fn deserialize_list>(rows: &[A], data_type: DataType) -> ListArray { +fn deserialize_list<'a, O: Offset, A: Borrow>>( + rows: &[A], + data_type: DataType, +) -> ListArray { let child = ListArray::::get_child_type(&data_type); let mut validity = MutableBitmap::with_capacity(rows.len()); @@ -138,7 +243,7 @@ fn deserialize_list>(rows: &[A], data_type: DataType ListArray::::new(data_type, offsets.into(), values, validity.into()) } -fn deserialize_struct>(rows: &[A], data_type: DataType) -> StructArray { +fn deserialize_struct<'a, A: Borrow>>(rows: &[A], data_type: DataType) -> StructArray { let fields = StructArray::get_fields(&data_type); let mut values = fields @@ -173,7 +278,7 @@ fn deserialize_struct>(rows: &[A], data_type: DataType) -> Stru StructArray::new(data_type, values, validity.into()) } -fn deserialize_dictionary>( +fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow>>( rows: &[A], data_type: DataType, ) -> DictionaryArray { @@ -206,7 +311,10 @@ fn deserialize_dictionary>( DictionaryArray::::from_data(keys, values) } -pub(crate) fn _deserialize>(rows: &[A], data_type: DataType) -> Arc { +pub(crate) fn _deserialize<'a, A: Borrow>>( + rows: &[A], + data_type: DataType, +) -> Arc { match &data_type { DataType::Null => Arc::new(NullArray::new(data_type, rows.len())), DataType::Boolean => Arc::new(deserialize_boolean(rows)), diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index 336264c4313..2ea4f840583 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -1,8 +1,9 @@ use std::borrow::Borrow; +use std::collections::BTreeMap; use indexmap::map::IndexMap as HashMap; use indexmap::set::IndexSet as HashSet; -use serde_json::Value; +use json_deserializer::{Number, Value}; use crate::datatypes::*; use crate::error::Result; @@ -29,7 +30,7 @@ fn filter_map_nulls(dt: DataType) -> Option { } } -fn infer_object(inner: &serde_json::Map) -> Result { +fn infer_object(inner: &BTreeMap) -> Result { let fields = inner .iter() .filter_map(|(key, value)| { @@ -69,11 +70,10 @@ fn infer_array(values: &[Value]) -> Result { }) } -fn infer_number(n: &serde_json::Number) -> DataType { - if n.is_f64() { - DataType::Float64 - } else { - DataType::Int64 +fn infer_number(n: &Number) -> DataType { + match n { + Number::Float(..) => DataType::Float64, + Number::Integer(..) => DataType::Int64, } } diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index 908da67b51f..ed1ad17f103 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -6,3 +6,5 @@ pub(crate) use deserialize::_deserialize; pub use deserialize::deserialize; pub(crate) use infer_schema::coerce_data_type; pub use infer_schema::infer; + +pub use json_deserializer; diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index 8a416819f7c..2278a636f2c 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -1,5 +1,6 @@ //! APIs to write to JSON mod serialize; +mod utf8; pub use fallible_streaming_iterator::*; pub(crate) use serialize::new_serializer; diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index fb90c17ad0f..afaf7dab04e 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -13,6 +13,8 @@ use crate::temporal_conversions::{ use crate::util::lexical_to_bytes_mut; use crate::{array::*, datatypes::DataType, types::NativeType}; +use super::utf8; + fn boolean_serializer<'a>( array: &'a BooleanArray, ) -> Box + 'a + Send + Sync> { @@ -73,7 +75,7 @@ fn utf8_serializer<'a, O: Offset>( array.iter(), |x, buf| { if let Some(x) = x { - serde_json::to_writer(buf, x).unwrap(); + utf8::write_str(buf, x).unwrap(); } else { buf.extend_from_slice(b"null") } @@ -256,7 +258,7 @@ fn serialize_item(buffer: &mut Vec, record: &[(&str, &[u8])], is_first_row: buffer.push(b','); } first_item = false; - serde_json::to_writer(&mut *buffer, key).unwrap(); + utf8::write_str(buffer, key).unwrap(); buffer.push(b':'); buffer.extend(*value); } diff --git a/src/io/json/write/utf8.rs b/src/io/json/write/utf8.rs new file mode 100644 index 00000000000..b8c98522178 --- /dev/null +++ b/src/io/json/write/utf8.rs @@ -0,0 +1,138 @@ +// Adapted from https://github.com/serde-rs/json/blob/f901012df66811354cb1d490ad59480d8fdf77b5/src/ser.rs +use std::io; + +pub fn write_str(writer: &mut W, value: &str) -> io::Result<()> +where + W: io::Write, +{ + writer.write_all(b"\"")?; + let bytes = value.as_bytes(); + + let mut start = 0; + + for (i, &byte) in bytes.iter().enumerate() { + let escape = ESCAPE[byte as usize]; + if escape == 0 { + continue; + } + + if start < i { + writer.write_all(&bytes[start..i])?; + } + + let char_escape = CharEscape::from_escape_table(escape, byte); + write_char_escape(writer, char_escape)?; + + start = i + 1; + } + + if start != bytes.len() { + writer.write_all(&bytes[start..])?; + } + writer.write_all(b"\"") +} + +const BB: u8 = b'b'; // \x08 +const TT: u8 = b't'; // \x09 +const NN: u8 = b'n'; // \x0A +const FF: u8 = b'f'; // \x0C +const RR: u8 = b'r'; // \x0D +const QU: u8 = b'"'; // \x22 +const BS: u8 = b'\\'; // \x5C +const UU: u8 = b'u'; // \x00...\x1F except the ones above +const __: u8 = 0; + +// Lookup table of escape sequences. A value of b'x' at index i means that byte +// i is escaped as "\x" in JSON. A value of 0 means that byte i is not escaped. +static ESCAPE: [u8; 256] = [ + // 1 2 3 4 5 6 7 8 9 A B C D E F + UU, UU, UU, UU, UU, UU, UU, UU, BB, TT, NN, UU, FF, RR, UU, UU, // 0 + UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, // 1 + __, __, QU, __, __, __, __, __, __, __, __, __, __, __, __, __, // 2 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 3 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 4 + __, __, __, __, __, __, __, __, __, __, __, __, BS, __, __, __, // 5 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 6 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 7 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 8 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 9 + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // A + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // B + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // C + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // D + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // E + __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F +]; + +/// Represents a character escape code in a type-safe manner. +pub enum CharEscape { + /// An escaped quote `"` + Quote, + /// An escaped reverse solidus `\` + ReverseSolidus, + // An escaped solidus `/` + //Solidus, + /// An escaped backspace character (usually escaped as `\b`) + Backspace, + /// An escaped form feed character (usually escaped as `\f`) + FormFeed, + /// An escaped line feed character (usually escaped as `\n`) + LineFeed, + /// An escaped carriage return character (usually escaped as `\r`) + CarriageReturn, + /// An escaped tab character (usually escaped as `\t`) + Tab, + /// An escaped ASCII plane control character (usually escaped as + /// `\u00XX` where `XX` are two hex characters) + AsciiControl(u8), +} + +impl CharEscape { + #[inline] + fn from_escape_table(escape: u8, byte: u8) -> CharEscape { + match escape { + self::BB => CharEscape::Backspace, + self::TT => CharEscape::Tab, + self::NN => CharEscape::LineFeed, + self::FF => CharEscape::FormFeed, + self::RR => CharEscape::CarriageReturn, + self::QU => CharEscape::Quote, + self::BS => CharEscape::ReverseSolidus, + self::UU => CharEscape::AsciiControl(byte), + _ => unreachable!(), + } + } +} + +#[inline] +fn write_char_escape(writer: &mut W, char_escape: CharEscape) -> io::Result<()> +where + W: io::Write, +{ + use self::CharEscape::*; + + let s = match char_escape { + Quote => b"\\\"", + ReverseSolidus => b"\\\\", + //Solidus => b"\\/", + Backspace => b"\\b", + FormFeed => b"\\f", + LineFeed => b"\\n", + CarriageReturn => b"\\r", + Tab => b"\\t", + AsciiControl(byte) => { + static HEX_DIGITS: [u8; 16] = *b"0123456789abcdef"; + let bytes = &[ + b'\\', + b'u', + b'0', + b'0', + HEX_DIGITS[(byte >> 4) as usize], + HEX_DIGITS[(byte & 0xF) as usize], + ]; + return writer.write_all(bytes); + } + }; + + writer.write_all(s) +} diff --git a/src/io/json_integration/mod.rs b/src/io/json_integration/mod.rs index 279f2ec55f0..dc522c60c88 100644 --- a/src/io/json_integration/mod.rs +++ b/src/io/json_integration/mod.rs @@ -5,6 +5,8 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::Value; +use crate::error::Error; + pub mod read; pub mod write; @@ -117,3 +119,9 @@ pub struct ArrowJsonColumn { /// the children pub children: Option>, } + +impl From for Error { + fn from(error: serde_json::Error) -> Self { + Error::ExternalFormat(error.to_string()) + } +} diff --git a/src/io/ndjson/read/deserialize.rs b/src/io/ndjson/read/deserialize.rs index 49e1ef64925..dae87d66d74 100644 --- a/src/io/ndjson/read/deserialize.rs +++ b/src/io/ndjson/read/deserialize.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use serde_json::Value; +use json_deserializer::parse; use crate::array::Array; use crate::datatypes::DataType; @@ -22,7 +22,7 @@ pub fn deserialize(rows: &[String], data_type: DataType) -> Result Result>( - rows: impl Iterator, +pub fn deserialize_iter<'a>( + rows: impl Iterator, data_type: DataType, ) -> Result, Error> { // deserialize strings to `Value`s let rows = rows - .map(|row| serde_json::from_str(row.as_ref()).map_err(Error::from)) - .collect::, Error>>()?; + .map(|row| parse(row.as_bytes()).map_err(Error::from)) + .collect::, Error>>()?; // deserialize &[Value] to Array Ok(_deserialize(&rows, data_type)) diff --git a/src/io/ndjson/read/file.rs b/src/io/ndjson/read/file.rs index 2b0174a02bf..6ed5497489a 100644 --- a/src/io/ndjson/read/file.rs +++ b/src/io/ndjson/read/file.rs @@ -2,8 +2,7 @@ use std::io::BufRead; use fallible_streaming_iterator::FallibleStreamingIterator; use indexmap::set::IndexSet as HashSet; -use serde_json; -use serde_json::Value; +use json_deserializer::parse; use crate::{ datatypes::DataType, @@ -115,7 +114,7 @@ pub fn infer( let mut data_types = HashSet::new(); while let Some(rows) = reader.next()? { - let value: Value = serde_json::from_str(&rows[0])?; // 0 because it is row by row + let value = parse(rows[0].as_bytes())?; // 0 because it is row by row let data_type = infer_json(&value)?; if data_type != DataType::Null { data_types.insert(data_type); @@ -134,7 +133,7 @@ pub fn infer( pub fn infer_iter>(rows: impl Iterator) -> Result { let mut data_types = HashSet::new(); for row in rows { - let v: Value = serde_json::from_str(row.as_ref())?; + let v = parse(row.as_ref().as_bytes())?; let data_type = infer_json(&v)?; if data_type != DataType::Null { data_types.insert(data_type); diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index db1b067bb42..e584324931b 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -7,7 +7,7 @@ use super::*; #[test] fn read_json() -> Result<()> { - let data = r#"[ + let data = br#"[ { "a": 1 }, @@ -19,7 +19,7 @@ fn read_json() -> Result<()> { } ]"#; - let json = serde_json::from_slice(data.as_bytes())?; + let json = json_deserializer::parse(data)?; let data_type = read::infer(&json)?; diff --git a/tests/it/io/ndjson/read.rs b/tests/it/io/ndjson/read.rs index 66ea5e0aba5..ef3e6882e56 100644 --- a/tests/it/io/ndjson/read.rs +++ b/tests/it/io/ndjson/read.rs @@ -156,7 +156,7 @@ fn invalid_infer_schema() -> Result<()> { let re = ndjson_read::infer(&mut Cursor::new("city,lat,lng"), None); assert_eq!( re.err().unwrap().to_string(), - "External error: expected value at line 1 column 1", + "External format error: InvalidToken(99)", ); Ok(()) } @@ -249,7 +249,7 @@ fn invalid_read_record() -> Result<()> { assert_eq!( arrays.err().unwrap().to_string(), - "External error: expected value at line 1 column 1", + "External format error: InvalidToken(99)", ); Ok(()) }