Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance of writing to CSV (20-25%) #382

Merged
merged 2 commits into from
Sep 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hash_hasher = "^2.0.3"
csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -86,7 +87,7 @@ full = [
"compute",
]
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["flatbuffers"]
io_ipc_compression = ["lz4", "zstd"]
Expand Down Expand Up @@ -187,3 +188,7 @@ harness = false
[[bench]]
name = "bitmap_ops"
harness = false

[[bench]]
name = "write_csv"
harness = false
4 changes: 2 additions & 2 deletions benches/comparison_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_op_scalar(&arr_a, &BooleanScalar::from(Some(true)), Operator::Eq))
});

let arr_a = create_string_array::<i32>(size, 0.1, 42);
let arr_b = create_string_array::<i32>(size, 0.1, 43);
let arr_a = create_string_array::<i32>(size, 4, 0.1, 42);
let arr_b = create_string_array::<i32>(size, 4, 0.1, 43);
c.bench_function(&format!("utf8 2^{}", log2_size), |b| {
b.iter(|| bench_op(&arr_a, &arr_b, Operator::Eq))
});
Expand Down
2 changes: 1 addition & 1 deletion benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});

let data_array = create_string_array::<i32>(size, 0.5, 42);
let data_array = create_string_array::<i32>(size, 4, 0.5, 42);
c.bench_function("filter context string", |b| {
b.iter(|| bench_built_filter(&filter, &data_array))
});
Expand Down
2 changes: 1 addition & 1 deletion benches/sort_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_lexsort(&arr_a, &arr_b))
});

let arr_a = create_string_array::<i32>(size, 0.1, 42);
let arr_a = create_string_array::<i32>(size, 4, 0.1, 42);
c.bench_function(&format!("sort utf8 null 2^{}", log2_size), |b| {
b.iter(|| bench_sort(&arr_a))
});
Expand Down
12 changes: 6 additions & 6 deletions benches/take_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,37 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_take(&values, &indices_nulls))
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
c.bench_function(&format!("take str 2^{}", log2_size), |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
c.bench_function(&format!("take str nulls 2^{}", log2_size), |b| {
b.iter(|| bench_take(&values, &indices_nulls))
});
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
let indices = create_random_index(512, 0.5);
c.bench_function("take str null indices 512", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.0, 42);
let values = create_string_array::<i32>(1024, 4, 0.0, 42);
let indices = create_random_index(1024, 0.5);
c.bench_function("take str null indices 1024", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.5, 42);
let values = create_string_array::<i32>(1024, 4, 0.5, 42);

let indices = create_random_index(1024, 0.0);
c.bench_function("take str null values 1024", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.5, 42);
let values = create_string_array::<i32>(1024, 4, 0.5, 42);
let indices = create_random_index(1024, 0.5);
c.bench_function("take str null values null indices 1024", |b| {
b.iter(|| bench_take(&values, &indices))
Expand Down
58 changes: 58 additions & 0 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use arrow2::util::bench_util::*;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::record_batch::RecordBatch;

fn write_batch(batch: &RecordBatch) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);

write::write_header(writer, batch.schema())?;

let options = write::SerializeOptions::default();
write::write_batch(writer, batch, &options)
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
array.data_type().clone(),
true,
)]));
RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, DataType::Int32, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);

c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, DataType::Float64, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn add_benchmark(c: &mut Criterion) {
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 2^{}", 10 + i);
c.bench_function(&a, |b| b.iter(|| write(array).unwrap()));
});
Expand Down
4 changes: 2 additions & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ fn add_benchmark(c: &mut Criterion) {
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 2^{}", 10 + i);
c.bench_function(&a, |b| b.iter(|| write(array, Encoding::Plain).unwrap()));
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 delta 2^{}", 10 + i);
c.bench_function(&a, |b| {
b.iter(|| write(array, Encoding::DeltaLengthByteArray).unwrap())
Expand Down
65 changes: 65 additions & 0 deletions src/io/csv/write/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
pub use streaming_iterator::StreamingIterator;

/// A [`StreamingIterator`] with an internal buffer of [`Vec<u8>`] used to efficiently
/// present items of type `T` as `&[u8]`.
/// It is generic over the type `T` and the transformation `F: T -> &[u8]`.
pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
iterator: I,
f: F,
buffer: Vec<u8>,
is_valid: bool,
}

impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Self {
iterator,
f,
buffer,
is_valid: false,
}
}
}

impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
type Item = [u8];

#[inline]
fn advance(&mut self) {
let a = self.iterator.next();
if let Some(a) = a {
self.is_valid = true;
self.buffer.clear();
(self.f)(a, &mut self.buffer);
} else {
self.is_valid = false;
}
}

#[inline]
fn get(&self) -> Option<&Self::Item> {
if self.is_valid {
Some(&self.buffer)
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.iterator.size_hint()
}
}
9 changes: 6 additions & 3 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod iterator;
mod serialize;

use iterator::StreamingIterator;

use std::io::Write;

// re-export necessary public APIs from csv
Expand All @@ -15,7 +18,7 @@ use crate::{datatypes::Schema, error::Result};
fn new_serializers<'a>(
batch: &'a RecordBatch,
options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn Iterator<Item = Vec<u8>> + 'a>>> {
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
batch
.columns()
.iter()
Expand All @@ -34,7 +37,7 @@ pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result<Vec<
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
}
Expand All @@ -54,7 +57,7 @@ pub fn write_batch<W: Write>(
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
Result::Ok(())
Expand Down
Loading