Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance and stability of writing to CSV #866

Merged
merged 5 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ simdutf8 = "0.1.3"

# for csv io
csv = { version = "^1.1", optional = true }
csv-core = { version = "0.1", optional = true }

# for csv async io
csv-async = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
Expand Down
8 changes: 4 additions & 4 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use arrow2::util::bench_util::*;
type ChunkArc = Chunk<Arc<dyn Array>>;

fn write_batch(columns: &ChunkArc) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);
let mut writer = vec![];

assert_eq!(columns.arrays().len(), 1);
write::write_header(writer, &["a"])?;

let options = write::SerializeOptions::default();
write::write_chunk(writer, columns, &options)
write::write_header(&mut writer, &["a"], &options)?;

write::write_chunk(&mut writer, columns, &options)
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Expand Down
8 changes: 4 additions & 4 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use arrow2::{
};

fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, &["c1"])?;
let mut writer = std::fs::File::create(path)?;

let options = write::SerializeOptions::default();
write::write_header(&mut writer, &["c1"], &options)?;

columns
.iter()
.try_for_each(|batch| write::write_chunk(writer, batch, &options))
.try_for_each(|batch| write::write_chunk(&mut writer, batch, &options))
}

fn main() -> Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
Expand All @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, &["c1"])?;
let mut writer = std::fs::File::create(path)?;
write::write_header(&mut writer, &["c1"], &options)?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
Expand All @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = options.clone();
let batch = batches[id].clone(); // note: this is cheap
let child = thread::spawn(move || {
let records = write::serialize(&batch, &options).unwrap();
thread_tx.send(records).unwrap();
let rows = write::serialize(&batch, &options).unwrap();
thread_tx.send(rows).unwrap();
});

children.push(child);
Expand All @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
for _ in 0..2 {
// block: assumes that the order of batches matter.
let records = rx.recv().unwrap();
records
.iter()
.try_for_each(|record| writer.write_byte_record(record))?
records.iter().try_for_each(|row| writer.write_all(&row))?
}

for child in children {
Expand Down
68 changes: 50 additions & 18 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::super::iterator::StreamingIterator;
use std::io::Write;

// re-export necessary public APIs from csv
pub use csv::{ByteRecord, Writer, WriterBuilder};
pub use csv::{ByteRecord, WriterBuilder};

pub use serialize::*;

Expand All @@ -26,55 +26,87 @@ fn new_serializers<'a, A: AsRef<dyn Array>>(
.collect()
}

/// Serializes [`Chunk`] to a vector of `ByteRecord`.
/// Serializes [`Chunk`] to a vector of rows.
/// The vector is guaranteed to have `columns.len()` entries.
/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields.
/// Each `row` is guaranteed to have `columns.array().len()` fields.
pub fn serialize<A: AsRef<dyn Array>>(
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<Vec<ByteRecord>> {
) -> Result<Vec<Vec<u8>>> {
let mut serializers = new_serializers(columns, options)?;

let rows = columns.len();
let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows];
records.iter_mut().for_each(|record| {
let mut rows = Vec::with_capacity(columns.len());
let mut row = vec![];

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..columns.len()).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
if !row.is_empty() {
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
rows.push(row.clone());
row.clear();
}
Result::Ok(())
})?;

Ok(rows)
}

/// Writes [`Chunk`] to `writer` according to the serialization options `options`.
pub fn write_chunk<W: Write, A: AsRef<dyn Array>>(
writer: &mut Writer<W>,
writer: &mut W,
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<()> {
let mut serializers = new_serializers(columns.arrays(), options)?;

let rows = columns.len();
let mut record = ByteRecord::with_capacity(0, columns.arrays().len());
let mut row = Vec::with_capacity(columns.arrays().len() * 10);

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..rows).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
writer.write_all(&row)?;
row.clear();
Result::Ok(())
})?;
Ok(())
}

/// Writes a CSV header to `writer`
pub fn write_header<W: Write, T>(writer: &mut Writer<W>, names: &[T]) -> Result<()>
pub fn write_header<W: Write, T>(
writer: &mut W,
names: &[T],
options: &SerializeOptions,
) -> Result<()>
where
T: AsRef<str>,
{
writer.write_record(names.iter().map(|x| x.as_ref().as_bytes()))?;
let names = names.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
writer.write_all(
names
.join(std::str::from_utf8(&[options.delimiter]).unwrap())
.as_bytes(),
)?;
writer.write_all(&[b'\n'])?;
Ok(())
}
75 changes: 56 additions & 19 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use crate::{

use super::super::super::iterator::{BufStreamingIterator, StreamingIterator};
use crate::array::{DictionaryArray, DictionaryKey, Offset};
use csv_core::WriteResult;
use std::any::Any;

/// Options to serialize logical types to CSV
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SerializeOptions {
/// used for [`DataType::Date32`]
pub date32_format: Option<String>,
Expand All @@ -28,6 +29,24 @@ pub struct SerializeOptions {
pub time64_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub timestamp_format: Option<String>,
/// used as separator/delimiter
pub delimiter: u8,
/// quoting character
pub quote: u8,
}

impl Default for SerializeOptions {
fn default() -> Self {
SerializeOptions {
date32_format: None,
date64_format: None,
time32_format: None,
time64_format: None,
timestamp_format: None,
delimiter: b',',
quote: b'"',
}
}
}

fn primitive_write<'a, T: NativeType + ToLexical>(
Expand Down Expand Up @@ -187,6 +206,40 @@ fn timestamp_with_tz<'a>(
}
}

fn new_utf8_serializer<'a, O: Offset>(
array: &'a Utf8Array<O>,
options: &'a SerializeOptions,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a> {
let mut local_buf = vec![0u8; 64];
let mut ser_writer = csv_core::WriterBuilder::new().quote(options.quote).build();

Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
match x {
// Empty strings are quoted.
// This will ensure a csv parser will not read them as missing
// in a delimited field
Some("") => buf.extend_from_slice(b"\"\""),
Some(s) => loop {
match ser_writer.field(s.as_bytes(), &mut local_buf) {
(WriteResult::OutputFull, _, _) => {
let additional = local_buf.len();
local_buf.extend(std::iter::repeat(0u8).take(additional))
}
(WriteResult::InputEmpty, _, n_out) => {
buf.extend_from_slice(&local_buf[..n_out]);
break;
}
}
},
_ => {}
}
},
vec![],
))
}

/// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`.
/// For numeric types, this serializes as usual. For dates, times and timestamps, it uses `options` to
/// Supported types:
Expand Down Expand Up @@ -337,27 +390,11 @@ pub fn new_serializer<'a>(
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
new_utf8_serializer(array, options)
}
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
new_utf8_serializer(array, options)
}
DataType::Binary => {
let array = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Expand Down
Loading