diff --git a/Cargo.toml b/Cargo.toml index 94ca01441ee..d9c7d9c763c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ hash_hasher = "^2.0.3" csv = { version = "^1.1", optional = true } regex = { version = "^1.3", optional = true } lazy_static = { version = "^1.4", optional = true } +streaming-iterator = { version = "0.1", optional = true } serde = { version = "^1.0", features = ["rc"], optional = true } serde_derive = { version = "^1.0", optional = true } @@ -86,7 +87,7 @@ full = [ "compute", ] merge_sort = ["itertools"] -io_csv = ["csv", "lazy_static", "regex", "lexical-core"] +io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"] io_json = ["serde", "serde_json", "indexmap"] io_ipc = ["flatbuffers"] io_ipc_compression = ["lz4", "zstd"] @@ -187,3 +188,7 @@ harness = false [[bench]] name = "bitmap_ops" harness = false + +[[bench]] +name = "write_csv" +harness = false diff --git a/benches/comparison_kernels.rs b/benches/comparison_kernels.rs index be76eb05976..843570d3943 100644 --- a/benches/comparison_kernels.rs +++ b/benches/comparison_kernels.rs @@ -43,8 +43,8 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_op_scalar(&arr_a, &BooleanScalar::from(Some(true)), Operator::Eq)) }); - let arr_a = create_string_array::(size, 0.1, 42); - let arr_b = create_string_array::(size, 0.1, 43); + let arr_a = create_string_array::(size, 4, 0.1, 42); + let arr_b = create_string_array::(size, 4, 0.1, 43); c.bench_function(&format!("utf8 2^{}", log2_size), |b| { b.iter(|| bench_op(&arr_a, &arr_b, Operator::Eq)) }); diff --git a/benches/filter_kernels.rs b/benches/filter_kernels.rs index 9d2a59e9b0c..13acb942300 100644 --- a/benches/filter_kernels.rs +++ b/benches/filter_kernels.rs @@ -114,7 +114,7 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_built_filter(&sparse_filter, &data_array)) }); - let data_array = create_string_array::(size, 0.5, 42); + let data_array = create_string_array::(size, 4, 0.5, 42); c.bench_function("filter context string", |b| { b.iter(|| bench_built_filter(&filter, &data_array)) }); diff --git a/benches/sort_kernel.rs b/benches/sort_kernel.rs index 780b3619991..7294aa71d63 100644 --- a/benches/sort_kernel.rs +++ b/benches/sort_kernel.rs @@ -80,7 +80,7 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_lexsort(&arr_a, &arr_b)) }); - let arr_a = create_string_array::(size, 0.1, 42); + let arr_a = create_string_array::(size, 4, 0.1, 42); c.bench_function(&format!("sort utf8 null 2^{}", log2_size), |b| { b.iter(|| bench_sort(&arr_a)) }); diff --git a/benches/take_kernels.rs b/benches/take_kernels.rs index 65b148065b9..50de3533dbb 100644 --- a/benches/take_kernels.rs +++ b/benches/take_kernels.rs @@ -58,37 +58,37 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_take(&values, &indices_nulls)) }); - let values = create_string_array::(512, 0.0, 42); + let values = create_string_array::(512, 4, 0.0, 42); c.bench_function(&format!("take str 2^{}", log2_size), |b| { b.iter(|| bench_take(&values, &indices)) }); - let values = create_string_array::(512, 0.0, 42); + let values = create_string_array::(512, 4, 0.0, 42); c.bench_function(&format!("take str nulls 2^{}", log2_size), |b| { b.iter(|| bench_take(&values, &indices_nulls)) }); }); - let values = create_string_array::(512, 0.0, 42); + let values = create_string_array::(512, 4, 0.0, 42); let indices = create_random_index(512, 0.5); c.bench_function("take str null indices 512", |b| { b.iter(|| bench_take(&values, &indices)) }); - let values = create_string_array::(1024, 0.0, 42); + let values = create_string_array::(1024, 4, 0.0, 42); let indices = create_random_index(1024, 0.5); c.bench_function("take str null indices 1024", |b| { b.iter(|| bench_take(&values, &indices)) }); - let values = create_string_array::(1024, 0.5, 42); + let values = create_string_array::(1024, 4, 0.5, 42); let indices = create_random_index(1024, 0.0); c.bench_function("take str null values 1024", |b| { b.iter(|| bench_take(&values, &indices)) }); - let values = create_string_array::(1024, 0.5, 42); + let values = create_string_array::(1024, 4, 0.5, 42); let indices = create_random_index(1024, 0.5); c.bench_function("take str null values null indices 1024", |b| { b.iter(|| bench_take(&values, &indices)) diff --git a/benches/write_csv.rs b/benches/write_csv.rs new file mode 100644 index 00000000000..5427acf7c65 --- /dev/null +++ b/benches/write_csv.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use arrow2::util::bench_util::*; +use criterion::{criterion_group, criterion_main, Criterion}; + +use arrow2::array::*; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::csv::write; +use arrow2::record_batch::RecordBatch; + +fn write_batch(batch: &RecordBatch) -> Result<()> { + let writer = &mut write::WriterBuilder::new().from_writer(vec![]); + + write::write_header(writer, batch.schema())?; + + let options = write::SerializeOptions::default(); + write::write_batch(writer, batch, &options) +} + +fn make_batch(array: impl Array + 'static) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + array.data_type().clone(), + true, + )])); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + +fn add_benchmark(c: &mut Criterion) { + (10..=18).step_by(2).for_each(|log2_size| { + let size = 2usize.pow(log2_size); + + let array = create_primitive_array::(size, DataType::Int32, 0.1); + let batch = make_batch(array); + + c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + + let array = create_string_array::(size, 100, 0.1, 42); + let batch = make_batch(array); + + c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + + let array = create_primitive_array::(size, DataType::Float64, 0.1); + let batch = make_batch(array); + + c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| { + b.iter(|| write_batch(&batch)) + }); + }); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/benches/write_ipc.rs b/benches/write_ipc.rs index a91b8e5da67..a0dd9081c4e 100644 --- a/benches/write_ipc.rs +++ b/benches/write_ipc.rs @@ -35,7 +35,7 @@ fn add_benchmark(c: &mut Criterion) { }); (0..=10).step_by(2).for_each(|i| { - let array = &create_string_array::(1024 * 2usize.pow(i), 0.1, 42); + let array = &create_string_array::(1024 * 2usize.pow(i), 4, 0.1, 42); let a = format!("write utf8 2^{}", 10 + i); c.bench_function(&a, |b| b.iter(|| write(array).unwrap())); }); diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index d1a8ff37ae7..5ffa69d762e 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -55,13 +55,13 @@ fn add_benchmark(c: &mut Criterion) { }); (0..=10).step_by(2).for_each(|i| { - let array = &create_string_array::(1024 * 2usize.pow(i), 0.1, 42); + let array = &create_string_array::(1024 * 2usize.pow(i), 4, 0.1, 42); let a = format!("write utf8 2^{}", 10 + i); c.bench_function(&a, |b| b.iter(|| write(array, Encoding::Plain).unwrap())); }); (0..=10).step_by(2).for_each(|i| { - let array = &create_string_array::(1024 * 2usize.pow(i), 0.1, 42); + let array = &create_string_array::(1024 * 2usize.pow(i), 4, 0.1, 42); let a = format!("write utf8 delta 2^{}", 10 + i); c.bench_function(&a, |b| { b.iter(|| write(array, Encoding::DeltaLengthByteArray).unwrap()) diff --git a/src/io/csv/write/iterator.rs b/src/io/csv/write/iterator.rs new file mode 100644 index 00000000000..65e16b4d173 --- /dev/null +++ b/src/io/csv/write/iterator.rs @@ -0,0 +1,65 @@ +pub use streaming_iterator::StreamingIterator; + +/// A [`StreamingIterator`] with an internal buffer of [`Vec`] used to efficiently +/// present items of type `T` as `&[u8]`. +/// It is generic over the type `T` and the transformation `F: T -> &[u8]`. +pub struct BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + iterator: I, + f: F, + buffer: Vec, + is_valid: bool, +} + +impl BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + #[inline] + pub fn new(iterator: I, f: F, buffer: Vec) -> Self { + Self { + iterator, + f, + buffer, + is_valid: false, + } + } +} + +impl StreamingIterator for BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + type Item = [u8]; + + #[inline] + fn advance(&mut self) { + let a = self.iterator.next(); + if let Some(a) = a { + self.is_valid = true; + self.buffer.clear(); + (self.f)(a, &mut self.buffer); + } else { + self.is_valid = false; + } + } + + #[inline] + fn get(&self) -> Option<&Self::Item> { + if self.is_valid { + Some(&self.buffer) + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iterator.size_hint() + } +} diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 61c37424a31..4bde82c24ea 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -1,5 +1,8 @@ +mod iterator; mod serialize; +use iterator::StreamingIterator; + use std::io::Write; // re-export necessary public APIs from csv @@ -15,7 +18,7 @@ use crate::{datatypes::Schema, error::Result}; fn new_serializers<'a>( batch: &'a RecordBatch, options: &'a SerializeOptions, -) -> Result> + 'a>>> { +) -> Result + 'a>>> { batch .columns() .iter() @@ -34,7 +37,7 @@ pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result( serializers .iter_mut() // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` - .for_each(|iter| record.push_field(&iter.next().unwrap())); + .for_each(|iter| record.push_field(iter.next().unwrap())); writer.write_byte_record(&record)?; record.clear(); Result::Ok(()) diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 44a184eecb0..60eb7863cf8 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -1,11 +1,16 @@ +use lexical_core::ToLexical; + use crate::temporal_conversions; -use crate::util::lexical_to_bytes; +use crate::types::NativeType; +use crate::util::lexical_to_bytes_mut; use crate::{ array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}, datatypes::{DataType, TimeUnit}, error::Result, }; +use super::iterator::{BufStreamingIterator, StreamingIterator}; + #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct SerializeOptions { pub date_format: String, @@ -23,16 +28,24 @@ impl Default for SerializeOptions { } } +fn primitive_write<'a, T: NativeType + ToLexical>( + array: &'a PrimitiveArray, +) -> Box + 'a> { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + lexical_to_bytes_mut(*x, buf) + } + }, + vec![], + )) +} + macro_rules! dyn_primitive { - ($ty:ident, $array:expr) => {{ - let array = $array - .as_any() - .downcast_ref::>() - .unwrap(); - let iter = array - .iter() - .map(move |x| x.map(|x| lexical_to_bytes(*x)).unwrap_or(vec![])); - Box::new(iter) + ($ty:ty, $array:expr) => {{ + let array = $array.as_any().downcast_ref().unwrap(); + primitive_write::<$ty>(array) }}; } @@ -42,11 +55,15 @@ macro_rules! dyn_date { .as_any() .downcast_ref::>() .unwrap(); - let iter = array.iter().map(move |x| { - x.map(|x| ($fn)(*x).format($format).to_string().into_bytes()) - .unwrap_or_default() - }); - Box::new(iter) + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes()) + } + }, + vec![], + )) }}; } @@ -62,15 +79,23 @@ macro_rules! dyn_date { pub fn new_serializer<'a>( array: &'a dyn Array, options: &'a SerializeOptions, -) -> Result> + 'a>> { +) -> Result + 'a>> { Ok(match array.data_type() { DataType::Boolean => { let array = array.as_any().downcast_ref::().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + if x { + buf.extend_from_slice(b"true"); + } else { + buf.extend_from_slice(b"false"); + } + } + }, + vec![], + )) } DataType::UInt8 => { dyn_primitive!(u8, array) @@ -184,35 +209,51 @@ pub fn new_serializer<'a>( } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) } DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) } DataType::Binary => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_vec()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) } DataType::LargeBinary => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_vec()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) } _ => todo!(), }) diff --git a/src/util/bench_util.rs b/src/util/bench_util.rs index 0185384af92..a1707bd66a3 100644 --- a/src/util/bench_util.rs +++ b/src/util/bench_util.rs @@ -95,18 +95,23 @@ where .collect() } -/// Creates an random (but fixed-seeded) array of a given size and null density -pub fn create_string_array(size: usize, null_density: f32, seed: u64) -> Utf8Array { +/// Creates an random (but fixed-seeded) [`Utf8Array`] of a given length, number of characters and null density. +pub fn create_string_array( + length: usize, + size: usize, + null_density: f32, + seed: u64, +) -> Utf8Array { let mut rng = StdRng::seed_from_u64(seed); - (0..size) + (0..length) .map(|_| { if rng.gen::() < null_density { None } else { let value = (&mut rng) .sample_iter(&Alphanumeric) - .take(4) + .take(size) .map(char::from) .collect::(); Some(value) diff --git a/src/util/lexical.rs b/src/util/lexical.rs index bbeeed6c15a..970c972e0af 100644 --- a/src/util/lexical.rs +++ b/src/util/lexical.rs @@ -2,6 +2,15 @@ #[inline] pub fn lexical_to_bytes(n: N) -> Vec { let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); + lexical_to_bytes_mut(n, &mut buf); + buf +} + +/// Converts numeric type to a `String` +#[inline] +pub fn lexical_to_bytes_mut(n: N, buf: &mut Vec) { + buf.clear(); + buf.reserve(N::FORMATTED_SIZE_DECIMAL); unsafe { // JUSTIFICATION // Benefit @@ -13,7 +22,6 @@ pub fn lexical_to_bytes(n: N) -> Vec { let len = lexical_core::write(n, slice).len(); buf.set_len(len); } - buf } /// Converts numeric type to a `String`