From 35df6af05985c49b2bcfda7466b04f19f8d77265 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 7 Sep 2021 17:36:00 +0000 Subject: [PATCH] Moved guide examples to examples/ --- examples/csv_write.rs | 37 ++++++++++++++++++ examples/csv_write_parallel.rs | 69 ++++++++++++++++++++++++++++++++++ guide/src/io/csv_write.md | 63 +------------------------------ src/docs.rs | 1 - 4 files changed, 108 insertions(+), 62 deletions(-) create mode 100644 examples/csv_write.rs create mode 100644 examples/csv_write_parallel.rs diff --git a/examples/csv_write.rs b/examples/csv_write.rs new file mode 100644 index 00000000000..805aa0a0949 --- /dev/null +++ b/examples/csv_write.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use arrow2::{ + array::{Array, Int32Array}, + datatypes::{Field, Schema}, + error::Result, + io::csv::write, + record_batch::RecordBatch, +}; + +fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> { + let writer = &mut write::WriterBuilder::new().from_path(path)?; + + write::write_header(writer, batches[0].schema())?; + + let options = write::SerializeOptions::default(); + batches + .iter() + .try_for_each(|batch| write::write_batch(writer, batch, &options)) +} + +fn main() -> Result<()> { + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + + write_batch("example.csv", &[batch]) +} diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs new file mode 100644 index 00000000000..5bdcb0f5dc5 --- /dev/null +++ b/examples/csv_write_parallel.rs @@ -0,0 +1,69 @@ +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; +use std::thread; + +use arrow2::{ + array::{Array, Int32Array}, + datatypes::{Field, Schema}, + error::Result, + io::csv::write, + record_batch::RecordBatch, +}; + +fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> { + let options = write::SerializeOptions::default(); + + // write a header + let writer = &mut write::WriterBuilder::new().from_path(path)?; + write::write_header(writer, batches[0].schema())?; + + // prepare a channel to send serialized records from threads + let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); + let mut children = Vec::new(); + + (0..2).for_each(|id| { + // The sender endpoint can be cloned + let thread_tx = tx.clone(); + + let options = options.clone(); + let batch = batches[id].clone(); // note: this is cheap + let child = thread::spawn(move || { + let records = write::serialize(&batch, &options).unwrap(); + thread_tx.send(records).unwrap(); + }); + + children.push(child); + }); + + for _ in 0..2 { + // block: assumes that the order of batches matter. + let records = rx.recv().unwrap(); + records + .iter() + .try_for_each(|record| writer.write_byte_record(record))? + } + + for child in children { + child.join().expect("child thread panicked"); + } + + Ok(()) +} + +fn main() -> Result<()> { + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + + parallel_write("example.csv", [batch.clone(), batch]) +} diff --git a/guide/src/io/csv_write.md b/guide/src/io/csv_write.md index 45bc6280b47..859a806b8ea 100644 --- a/guide/src/io/csv_write.md +++ b/guide/src/io/csv_write.md @@ -7,20 +7,7 @@ This crate relies on [the crate csv](https://crates.io/crates/csv) to write well The following example writes a batch as a CSV file with the default configuration: ```rust -use arrow2::io::csv::write; -use arrow2::record_batch::RecordBatch; -use arrow2::error::Result; - -fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_path(path)?; - - write::write_header(writer, batches[0].schema())?; - - let options = write::SerializeOptions::default(); - batches.iter().try_for_each(|batch| { - write::write_batch(writer, batch, &options) - }) -} +{{#include ../../../examples/csv_write.rs}} ``` ## Parallelism @@ -33,51 +20,5 @@ However, these typically deal with different bounds: serialization is often CPU Suppose that we know that we are getting CPU-bounded at serialization, and would like to offload that workload to other threads, at the cost of a higher memory usage. We would achieve this as follows (two batches for simplicity): ```rust -use std::sync::mpsc::{Sender, Receiver}; -use std::sync::mpsc; -use std::thread; - -use arrow2::io::csv::write; -use arrow2::record_batch::RecordBatch; -use arrow2::error::Result; - -fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> { - let options = write::SerializeOptions::default(); - - // write a header - let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, batches[0].schema())?; - - // prepare a channel to send serialized records from threads - let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); - let mut children = Vec::new(); - - for id in 0..2 { - // The sender endpoint can be copied - let thread_tx = tx.clone(); - - let options = options.clone(); - let batch = batches[id].clone(); // note: this is cheap - let child = thread::spawn(move || { - let records = write::serialize(&batch, &options).unwrap(); - thread_tx.send(records).unwrap(); - }); - - children.push(child); - } - - for _ in 0..2 { - // block: assumes that the order of batches matter. - let records = rx.recv().unwrap(); - records.iter().try_for_each(|record| { - writer.write_byte_record(record) - })? - } - - for child in children { - child.join().expect("child thread panicked"); - } - - Ok(()) -} +{{#include ../../../examples/csv_write_parallel.rs}} ``` diff --git a/src/docs.rs b/src/docs.rs index bb36eff183d..a85ff128437 100644 --- a/src/docs.rs +++ b/src/docs.rs @@ -1,3 +1,2 @@ doc_comment::doctest!("../guide/src/low_level.md"); doc_comment::doctest!("../guide/src/high_level.md"); -doc_comment::doctest!("../guide/src/io/csv_write.md");