Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved perf of json read
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 4, 2022
1 parent 696f915 commit 4307087
Show file tree
Hide file tree
Showing 17 changed files with 345 additions and 78 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ regex = { version = "^1.3", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
json-deserializer = { version = "0.3", optional = true }
indexmap = { version = "^1.6", optional = true }

# used to print columns in a nice columnar format
Expand Down Expand Up @@ -72,6 +70,9 @@ parquet2 = { version = "0.13", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
Expand Down Expand Up @@ -134,7 +135,7 @@ io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_json = ["json-deserializer", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
Expand All @@ -156,9 +157,9 @@ io_avro_compression = [
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
io_print = ["comfy-table"]
# the compute kernels. Disabling this significantly reduces compile time.
compute_aggregate = ["multiversion"]
Expand Down
8 changes: 5 additions & 3 deletions benches/read_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ fn prep(array: impl Array + 'static) -> (Vec<u8>, DataType) {
// the operation of writing is IO-bounded.
write::write(&mut data, blocks).unwrap();

let dt = read::infer(&serde_json::from_slice(&data).unwrap()).unwrap();
let value = read::json_deserializer::parse(&data).unwrap();

let dt = read::infer(&value).unwrap();
(data, dt)
}

fn bench_read(data: &[u8], dt: &DataType) {
let json = serde_json::from_slice(data).unwrap();
read::deserialize(&json, dt.clone()).unwrap();
let value = read::json_deserializer::parse(data).unwrap();
read::deserialize(&value, dt.clone()).unwrap();
}

fn add_benchmark(c: &mut Criterion) {
Expand Down
14 changes: 9 additions & 5 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use std::fs::File;
use std::io::BufReader;
/// Example of reading a JSON file.
use std::fs;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let reader = BufReader::new(File::open(path)?);
let json = serde_json::from_reader(reader)?;
// read the file into memory (IO-bounded)
let data = fs::read(path)?;

// create a non-owning struct of the data (CPU-bounded)
let json = read::json_deserializer::parse(&data)?;

// use it to infer an Arrow schema (CPU-bounded)
let data_type = read::infer(&json)?;

// and deserialize it (CPU-bounded)
read::deserialize(&json, data_type)
}

Expand Down
1 change: 1 addition & 0 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::super::{Block, CompressedBlock};
use super::BlockStreamIterator;
use super::Compression;

#[cfg(feature = "io_avro_compression")]
const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);

/// Decompresses an Avro block.
Expand Down
1 change: 1 addition & 0 deletions src/io/avro/write/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::error::Result;
use super::Compression;
use super::{Block, CompressedBlock};

#[cfg(feature = "io_avro_compression")]
const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);

/// Compresses a [`Block`] to a [`CompressedBlock`].
Expand Down
6 changes: 3 additions & 3 deletions src/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod write;

use crate::error::Error;

impl From<serde_json::error::Error> for Error {
fn from(error: serde_json::error::Error) -> Self {
Error::External("".to_string(), Box::new(error))
impl From<json_deserializer::Error> for Error {
fn from(error: json_deserializer::Error) -> Self {
Error::ExternalFormat(error.to_string())
}
}
184 changes: 146 additions & 38 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::{collections::hash_map::DefaultHasher, sync::Arc};

use hash_hasher::HashedMap;
use indexmap::map::IndexMap as HashMap;
use num_traits::NumCast;
use serde_json::Value;
use json_deserializer::{Number, Value};

use crate::{
array::*,
Expand All @@ -17,7 +16,7 @@ use crate::{

/// A function that converts a &Value into an optional tuple of a byte slice and a Value.
/// This is used to create a dictionary, where the hashing depends on the DataType of the child object.
type Extract = Box<dyn Fn(&Value) -> Option<(u64, &Value)>>;
type Extract<'a> = Box<dyn Fn(&'a Value<'a>) -> Option<(u64, &'a Value<'a>)>>;

fn build_extract(data_type: &DataType) -> Extract {
match data_type {
Expand All @@ -27,11 +26,10 @@ fn build_extract(data_type: &DataType) -> Extract {
hasher.write(v.as_bytes());
Some((hasher.finish(), value))
}
Value::Number(v) => v.as_f64().map(|x| {
let mut hasher = DefaultHasher::new();
hasher.write(&x.to_le_bytes());
(hasher.finish(), value)
}),
Value::Number(v) => match v {
Number::Float(_, _) => todo!(),
Number::Integer(_, _) => todo!(),
},
Value::Bool(v) => {
let mut hasher = DefaultHasher::new();
hasher.write(&[*v as u8]);
Expand All @@ -40,75 +38,182 @@ fn build_extract(data_type: &DataType) -> Extract {
_ => None,
}),
DataType::Int32 | DataType::Int64 | DataType::Int16 | DataType::Int8 => {
Box::new(move |value| match &value {
Value::Number(v) => v.as_f64().map(|x| {
Box::new(move |value| {
let integer = match value {
Value::Number(number) => Some(deserialize_int_single::<i64>(*number)),
Value::Bool(number) => Some(if *number { 1i64 } else { 0i64 }),
_ => None,
};
integer.map(|integer| {
let mut hasher = DefaultHasher::new();
hasher.write(&x.to_le_bytes());
hasher.write(&integer.to_le_bytes());
(hasher.finish(), value)
}),
Value::Bool(v) => {
let mut hasher = DefaultHasher::new();
hasher.write(&[*v as u8]);
Some((hasher.finish(), value))
}
_ => None,
})
})
}
_ => Box::new(|_| None),
}
}

fn deserialize_boolean<A: Borrow<Value>>(rows: &[A]) -> BooleanArray {
fn deserialize_boolean<'a, A: Borrow<Value<'a>>>(rows: &[A]) -> BooleanArray {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Bool(v) => Some(v),
_ => None,
});
BooleanArray::from_trusted_len_iter(iter)
}

fn deserialize_int<T: NativeType + NumCast, A: Borrow<Value>>(
fn deserialize_int_single<T>(number: Number) -> T
where
T: NativeType + lexical_core::FromLexical + Pow10,
{
match number {
Number::Float(fraction, exponent) => {
let integer = fraction.split(|x| *x == b'.').next().unwrap();
let mut integer: T = lexical_core::parse(integer).unwrap();
if !exponent.is_empty() {
let exponent: u32 = lexical_core::parse(exponent).unwrap();
integer = integer.pow10(exponent);
}
integer
}
Number::Integer(integer, exponent) => {
let mut integer: T = lexical_core::parse(integer).unwrap();
if !exponent.is_empty() {
let exponent: u32 = lexical_core::parse(exponent).unwrap();
integer = integer.pow10(exponent);
}
integer
}
}
}

trait Powi10: NativeType + num_traits::One + std::ops::Add {
fn powi10(self, exp: i32) -> Self;
}

impl Powi10 for f32 {
#[inline]
fn powi10(self, exp: i32) -> Self {
self * 10.0f32.powi(exp)
}
}

impl Powi10 for f64 {
#[inline]
fn powi10(self, exp: i32) -> Self {
self * 10.0f64.powi(exp)
}
}

trait Pow10: NativeType + num_traits::One + std::ops::Add {
fn pow10(self, exp: u32) -> Self;
}

macro_rules! impl_pow10 {
($ty:ty) => {
impl Pow10 for $ty {
#[inline]
fn pow10(self, exp: u32) -> Self {
self * (10 as $ty).pow(exp)
}
}
};
}
impl_pow10!(u8);
impl_pow10!(u16);
impl_pow10!(u32);
impl_pow10!(u64);
impl_pow10!(i8);
impl_pow10!(i16);
impl_pow10!(i32);
impl_pow10!(i64);

fn deserialize_float_single<T>(number: &Number) -> T
where
T: NativeType + lexical_core::FromLexical + Powi10,
{
match number {
Number::Float(float, exponent) => {
let mut float: T = lexical_core::parse(float).unwrap();
if !exponent.is_empty() {
let exponent: i32 = lexical_core::parse(exponent).unwrap();
float = float.powi10(exponent);
}
float
}
Number::Integer(integer, exponent) => {
let mut float: T = lexical_core::parse(integer).unwrap();
if !exponent.is_empty() {
let exponent: i32 = lexical_core::parse(exponent).unwrap();
float = float.powi10(exponent);
}
float
}
}
}

fn deserialize_int<'a, T: NativeType + lexical_core::FromLexical + Pow10, A: Borrow<Value<'a>>>(
rows: &[A],
data_type: DataType,
) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Number(number) => number.as_i64().and_then(num_traits::cast::<i64, T>),
Value::Bool(number) => num_traits::cast::<i32, T>(*number as i32),
Value::Number(number) => Some(deserialize_int_single(*number)),
Value::Bool(number) => Some(if *number { T::one() } else { T::default() }),
_ => None,
});
PrimitiveArray::from_trusted_len_iter(iter).to(data_type)
}

fn deserialize_float<T: NativeType + NumCast, A: Borrow<Value>>(
fn deserialize_float<
'a,
T: NativeType + lexical_core::FromLexical + Powi10,
A: Borrow<Value<'a>>,
>(
rows: &[A],
data_type: DataType,
) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Number(number) => number.as_f64().and_then(num_traits::cast::<f64, T>),
Value::Bool(number) => num_traits::cast::<i32, T>(*number as i32),
Value::Number(number) => Some(deserialize_float_single(number)),
Value::Bool(number) => Some(if *number { T::one() } else { T::default() }),
_ => None,
});
PrimitiveArray::from_trusted_len_iter(iter).to(data_type)
}

fn deserialize_binary<O: Offset, A: Borrow<Value>>(rows: &[A]) -> BinaryArray<O> {
fn deserialize_binary<'a, O: Offset, A: Borrow<Value<'a>>>(rows: &[A]) -> BinaryArray<O> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::String(v) => Some(v.as_bytes()),
_ => None,
});
BinaryArray::from_trusted_len_iter(iter)
}

fn deserialize_utf8<O: Offset, A: Borrow<Value>>(rows: &[A]) -> Utf8Array<O> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::String(v) => Some(v.clone()),
Value::Number(v) => Some(v.to_string()),
Value::Bool(v) => Some(v.to_string()),
_ => None,
});
Utf8Array::<O>::from_trusted_len_iter(iter)
fn deserialize_utf8<'a, O: Offset, A: Borrow<Value<'a>>>(rows: &[A]) -> Utf8Array<O> {
let mut array = MutableUtf8Array::<O>::with_capacity(rows.len());
let mut scratch = vec![];
for row in rows {
match row.borrow() {
Value::String(v) => array.push(Some(v.as_ref())),
Value::Number(number) => match number {
Number::Integer(number, exponent) | Number::Float(number, exponent) => {
scratch.clear();
scratch.extend_from_slice(*number);
scratch.push(b'e');
scratch.extend_from_slice(*exponent);
}
},
Value::Bool(v) => array.push(Some(if *v { "true" } else { "false" })),
_ => array.push_null(),
}
}
array.into()
}

fn deserialize_list<O: Offset, A: Borrow<Value>>(rows: &[A], data_type: DataType) -> ListArray<O> {
fn deserialize_list<'a, O: Offset, A: Borrow<Value<'a>>>(
rows: &[A],
data_type: DataType,
) -> ListArray<O> {
let child = ListArray::<O>::get_child_type(&data_type);

let mut validity = MutableBitmap::with_capacity(rows.len());
Expand Down Expand Up @@ -138,7 +243,7 @@ fn deserialize_list<O: Offset, A: Borrow<Value>>(rows: &[A], data_type: DataType
ListArray::<O>::new(data_type, offsets.into(), values, validity.into())
}

fn deserialize_struct<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> StructArray {
fn deserialize_struct<'a, A: Borrow<Value<'a>>>(rows: &[A], data_type: DataType) -> StructArray {
let fields = StructArray::get_fields(&data_type);

let mut values = fields
Expand Down Expand Up @@ -173,7 +278,7 @@ fn deserialize_struct<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Stru
StructArray::new(data_type, values, validity.into())
}

fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow<Value<'a>>>(
rows: &[A],
data_type: DataType,
) -> DictionaryArray<K> {
Expand Down Expand Up @@ -206,7 +311,10 @@ fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
DictionaryArray::<K>::from_data(keys, values)
}

pub(crate) fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
pub(crate) fn _deserialize<'a, A: Borrow<Value<'a>>>(
rows: &[A],
data_type: DataType,
) -> Arc<dyn Array> {
match &data_type {
DataType::Null => Arc::new(NullArray::new(data_type, rows.len())),
DataType::Boolean => Arc::new(deserialize_boolean(rows)),
Expand Down
Loading

0 comments on commit 4307087

Please sign in to comment.