Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Moved guide examples to examples/
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 7, 2021
1 parent d7bd957 commit 35df6af
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 62 deletions.
37 changes: 37 additions & 0 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::sync::Arc;

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::csv::write,
record_batch::RecordBatch,
};

fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, batches[0].schema())?;

let options = write::SerializeOptions::default();
batches
.iter()
.try_for_each(|batch| write::write_batch(writer, batch, &options))
}

fn main() -> Result<()> {
let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

write_batch("example.csv", &[batch])
}
69 changes: 69 additions & 0 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
use std::thread;

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::csv::write,
record_batch::RecordBatch,
};

fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, batches[0].schema())?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
let mut children = Vec::new();

(0..2).for_each(|id| {
// The sender endpoint can be cloned
let thread_tx = tx.clone();

let options = options.clone();
let batch = batches[id].clone(); // note: this is cheap
let child = thread::spawn(move || {
let records = write::serialize(&batch, &options).unwrap();
thread_tx.send(records).unwrap();
});

children.push(child);
});

for _ in 0..2 {
// block: assumes that the order of batches matter.
let records = rx.recv().unwrap();
records
.iter()
.try_for_each(|record| writer.write_byte_record(record))?
}

for child in children {
child.join().expect("child thread panicked");
}

Ok(())
}

fn main() -> Result<()> {
let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

parallel_write("example.csv", [batch.clone(), batch])
}
63 changes: 2 additions & 61 deletions guide/src/io/csv_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,7 @@ This crate relies on [the crate csv](https://crates.io/crates/csv) to write well
The following example writes a batch as a CSV file with the default configuration:

```rust
use arrow2::io::csv::write;
use arrow2::record_batch::RecordBatch;
use arrow2::error::Result;

fn write_batch(path: &str, batches: &[RecordBatch]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, batches[0].schema())?;

let options = write::SerializeOptions::default();
batches.iter().try_for_each(|batch| {
write::write_batch(writer, batch, &options)
})
}
{{#include ../../../examples/csv_write.rs}}
```

## Parallelism
Expand All @@ -33,51 +20,5 @@ However, these typically deal with different bounds: serialization is often CPU
Suppose that we know that we are getting CPU-bounded at serialization, and would like to offload that workload to other threads, at the cost of a higher memory usage. We would achieve this as follows (two batches for simplicity):

```rust
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

use arrow2::io::csv::write;
use arrow2::record_batch::RecordBatch;
use arrow2::error::Result;

fn parallel_write(path: &str, batches: [RecordBatch; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, batches[0].schema())?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
let mut children = Vec::new();

for id in 0..2 {
// The sender endpoint can be copied
let thread_tx = tx.clone();

let options = options.clone();
let batch = batches[id].clone(); // note: this is cheap
let child = thread::spawn(move || {
let records = write::serialize(&batch, &options).unwrap();
thread_tx.send(records).unwrap();
});

children.push(child);
}

for _ in 0..2 {
// block: assumes that the order of batches matter.
let records = rx.recv().unwrap();
records.iter().try_for_each(|record| {
writer.write_byte_record(record)
})?
}

for child in children {
child.join().expect("child thread panicked");
}

Ok(())
}
{{#include ../../../examples/csv_write_parallel.rs}}
```
1 change: 0 additions & 1 deletion src/docs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
doc_comment::doctest!("../guide/src/low_level.md");
doc_comment::doctest!("../guide/src/high_level.md");
doc_comment::doctest!("../guide/src/io/csv_write.md");

0 comments on commit 35df6af

Please sign in to comment.