Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved examples (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 27, 2022
1 parent 09817a4 commit b942a84
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
4 changes: 2 additions & 2 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async fn main() -> Result<()> {
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
let chunk = handle.await.unwrap()?;
assert!(!chunk.is_empty());
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;
let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?;

println!("{:#?}", reader.schema());

Expand Down
15 changes: 10 additions & 5 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,37 @@ use arrow2::{

mod logger;

/// Advances each iterator in parallel
/// # Panic
/// If the iterators are empty
fn deserialize_parallel(columns: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> {
fn deserialize_parallel(iters: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> {
// CPU-bounded
let columns = columns
let arrays = iters
.par_iter_mut()
.map(|iter| iter.next().transpose())
.collect::<Result<Vec<_>>>()?;

Chunk::try_new(columns.into_iter().map(|x| x.unwrap()).collect())
Chunk::try_new(arrays.into_iter().map(|x| x.unwrap()).collect())
}

fn parallel_read(path: &str, row_group: usize) -> Result<()> {
// open the file
let mut file = BufReader::new(File::open(path)?);

// read Parquet's metadata and infer Arrow schema
let metadata = read::read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata)?;

// select the row group from the metadata
let row_group = &metadata.row_groups[row_group];

let chunk_size = 1024 * 8;
let chunk_size = 1024 * 8 * 8;

// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let mut columns =
read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?;

// deserialize (CPU-bounded) to arrow
// deserialize (CPU-bounded) to Arrow in chunks
let mut num_rows = row_group.num_rows();
while num_rows > 0 {
num_rows = num_rows.saturating_sub(chunk_size);
Expand Down
56 changes: 56 additions & 0 deletions examples/parquet_write_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use futures::SinkExt;
use tokio::fs::File;

use arrow2::{
array::{Array, Int32Array},
chunk::Chunk,
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
transverse, CompressionOptions, Encoding, FileSink, Version, WriteOptions,
},
};
use tokio_util::compat::TokioAsyncReadCompatExt;

async fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter());

// Create a new empty file
let file = File::create(path).await?.compat();

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.collect();

let mut writer = FileSink::try_new(file, schema, encodings, options)?;

writer.send_all(&mut stream).await?;
writer.close().await?;
Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns).await
}
8 changes: 4 additions & 4 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FallibleStreamingIterator for Bla {
}
}

fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> {
fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
// declare the options
let options = WriteOptions {
write_statistics: true,
Expand Down Expand Up @@ -70,9 +70,9 @@ fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> {
// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(&schema)?;

let row_groups = batches.iter().map(|batch| {
let row_groups = chunks.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
let columns = chunk
.columns()
.par_iter()
.zip(parquet_schema.fields().to_vec())
Expand Down Expand Up @@ -140,7 +140,7 @@ fn main() -> Result<()> {
let schema = Schema {
fields: vec![
Field::new("c1", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Utf8, true),
],
metadata: Default::default(),
};
Expand Down

0 comments on commit b942a84

Please sign in to comment.