Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved performance of deserializing JSON (2x) (#1024)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 5, 2022
1 parent 459de25 commit b2b0bb6
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 75 deletions.
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ regex = { version = "^1.3", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
json-deserializer = { version = "0.3", optional = true }
indexmap = { version = "^1.6", optional = true }

# used to print columns in a nice columnar format
Expand Down Expand Up @@ -72,6 +70,9 @@ parquet2 = { version = "0.13", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
Expand Down Expand Up @@ -134,7 +135,7 @@ io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_json = ["json-deserializer", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
Expand All @@ -156,9 +157,9 @@ io_avro_compression = [
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
io_print = ["comfy-table"]
# the compute kernels. Disabling this significantly reduces compile time.
compute_aggregate = ["multiversion"]
Expand Down Expand Up @@ -313,6 +314,10 @@ harness = false
name = "write_json"
harness = false

[[bench]]
name = "read_json"
harness = false

[[bench]]
name = "slices_iterator"
harness = false
67 changes: 67 additions & 0 deletions benches/read_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use arrow2::array::Array;
use arrow2::datatypes::DataType;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::io::json::{read, write};
use arrow2::util::bench_util::*;

fn prep(array: impl Array + 'static) -> (Vec<u8>, DataType) {
let mut data = vec![];
let blocks = write::Serializer::new(
vec![Ok(Box::new(array) as Box<dyn Array>)].into_iter(),
vec![],
);
// the operation of writing is IO-bounded.
write::write(&mut data, blocks).unwrap();

let value = read::json_deserializer::parse(&data).unwrap();

let dt = read::infer(&value).unwrap();
(data, dt)
}

fn bench_read(data: &[u8], dt: &DataType) {
let value = read::json_deserializer::parse(data).unwrap();
read::deserialize(&value, dt.clone()).unwrap();
}

fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);

let (data, dt) = prep(array);

c.bench_function(&format!("read i32 2^{}", log2_size), |b| {
b.iter(|| bench_read(&data, &dt))
});

let array = create_primitive_array::<f64>(size, 0.1);

let (data, dt) = prep(array);

c.bench_function(&format!("read f64 2^{}", log2_size), |b| {
b.iter(|| bench_read(&data, &dt))
});

let array = create_string_array::<i32>(size, 10, 0.1, 42);

let (data, dt) = prep(array);

c.bench_function(&format!("read utf8 2^{}", log2_size), |b| {
b.iter(|| bench_read(&data, &dt))
});

let array = create_boolean_array(size, 0.1, 0.1);

let (data, dt) = prep(array);

c.bench_function(&format!("read bool 2^{}", log2_size), |b| {
b.iter(|| bench_read(&data, &dt))
});
})
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
14 changes: 9 additions & 5 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use std::fs::File;
use std::io::BufReader;
/// Example of reading a JSON file.
use std::fs;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let reader = BufReader::new(File::open(path)?);
let json = serde_json::from_reader(reader)?;
// read the file into memory (IO-bounded)
let data = fs::read(path)?;

// create a non-owning struct of the data (CPU-bounded)
let json = read::json_deserializer::parse(&data)?;

// use it to infer an Arrow schema (CPU-bounded)
let data_type = read::infer(&json)?;

// and deserialize it (CPU-bounded)
read::deserialize(&json, data_type)
}

Expand Down
1 change: 1 addition & 0 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::super::{Block, CompressedBlock};
use super::BlockStreamIterator;
use super::Compression;

#[cfg(feature = "io_avro_compression")]
const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);

/// Decompresses an Avro block.
Expand Down
1 change: 1 addition & 0 deletions src/io/avro/write/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::error::Result;
use super::Compression;
use super::{Block, CompressedBlock};

#[cfg(feature = "io_avro_compression")]
const CRC_TABLE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);

/// Compresses a [`Block`] to a [`CompressedBlock`].
Expand Down
6 changes: 3 additions & 3 deletions src/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod write;

use crate::error::Error;

impl From<serde_json::error::Error> for Error {
fn from(error: serde_json::error::Error) -> Self {
Error::External("".to_string(), Box::new(error))
impl From<json_deserializer::Error> for Error {
fn from(error: json_deserializer::Error) -> Self {
Error::ExternalFormat(error.to_string())
}
}
Loading

0 comments on commit b2b0bb6

Please sign in to comment.