From e171c2c529090acd255204d80b2e5dfc601c9c16 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 27 Jun 2022 08:36:21 +0000 Subject: [PATCH] Improved examples --- examples/avro_read_async.rs | 4 +- examples/parquet_read.rs | 2 +- examples/parquet_read_parallel/src/main.rs | 15 ++++-- examples/parquet_write_async.rs | 56 +++++++++++++++++++++ examples/parquet_write_parallel/src/main.rs | 8 +-- 5 files changed, 73 insertions(+), 12 deletions(-) create mode 100644 examples/parquet_write_async.rs diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs index d3c51720496..9c9c3649da7 100644 --- a/examples/avro_read_async.rs +++ b/examples/avro_read_async.rs @@ -33,8 +33,8 @@ async fn main() -> Result<()> { decompress_block(&mut block, &mut decompressed, compression)?; deserialize(&decompressed, &schema.fields, &avro_schemas, &projection) }); - let batch = handle.await.unwrap()?; - assert!(!batch.is_empty()); + let chunk = handle.await.unwrap()?; + assert!(!chunk.is_empty()); } Ok(()) diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index a29555e6ec6..92fa83c521e 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { let file_path = &args[1]; let reader = File::open(file_path)?; - let reader = read::FileReader::try_new(reader, None, None, None, None)?; + let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?; println!("{:#?}", reader.schema()); diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs index 313205bbfd2..6ab26d61f7d 100644 --- a/examples/parquet_read_parallel/src/main.rs +++ b/examples/parquet_read_parallel/src/main.rs @@ -15,32 +15,37 @@ use arrow2::{ mod logger; +/// Advances each iterator in parallel /// # Panic /// If the iterators are empty -fn deserialize_parallel(columns: &mut [ArrayIter<'static>]) -> Result>> { +fn deserialize_parallel(iters: &mut [ArrayIter<'static>]) -> Result>> { // CPU-bounded - let columns = columns + let arrays = iters .par_iter_mut() .map(|iter| iter.next().transpose()) .collect::>>()?; - Chunk::try_new(columns.into_iter().map(|x| x.unwrap()).collect()) + Chunk::try_new(arrays.into_iter().map(|x| x.unwrap()).collect()) } fn parallel_read(path: &str, row_group: usize) -> Result<()> { + // open the file let mut file = BufReader::new(File::open(path)?); + + // read Parquet's metadata and infer Arrow schema let metadata = read::read_metadata(&mut file)?; let schema = read::infer_schema(&metadata)?; + // select the row group from the metadata let row_group = &metadata.row_groups[row_group]; - let chunk_size = 1024 * 8; + let chunk_size = 1024 * 8 * 8; // read (IO-bounded) all columns into memory (use a subset of the fields to project) let mut columns = read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?; - // deserialize (CPU-bounded) to arrow + // deserialize (CPU-bounded) to Arrow in chunks let mut num_rows = row_group.num_rows(); while num_rows > 0 { num_rows = num_rows.saturating_sub(chunk_size); diff --git a/examples/parquet_write_async.rs b/examples/parquet_write_async.rs new file mode 100644 index 00000000000..b6205c8fb0f --- /dev/null +++ b/examples/parquet_write_async.rs @@ -0,0 +1,56 @@ +use futures::SinkExt; +use tokio::fs::File; + +use arrow2::{ + array::{Array, Int32Array}, + chunk::Chunk, + datatypes::{Field, Schema}, + error::Result, + io::parquet::write::{ + transverse, CompressionOptions, Encoding, FileSink, Version, WriteOptions, + }, +}; +use tokio_util::compat::TokioAsyncReadCompatExt; + +async fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { + let options = WriteOptions { + write_statistics: true, + compression: CompressionOptions::Uncompressed, + version: Version::V2, + }; + + let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter()); + + // Create a new empty file + let file = File::create(path).await?.compat(); + + let encodings = schema + .fields + .iter() + .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .collect(); + + let mut writer = FileSink::try_new(file, schema, encodings, options)?; + + writer.send_all(&mut stream).await?; + writer.close().await?; + Ok(()) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::from(vec![field]); + let columns = Chunk::new(vec![array.boxed()]); + + write_batch("test.parquet", schema, columns).await +} diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index ac1d216f908..980c817e422 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -41,7 +41,7 @@ impl FallibleStreamingIterator for Bla { } } -fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> { +fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> { // declare the options let options = WriteOptions { write_statistics: true, @@ -70,9 +70,9 @@ fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> { // derive the parquet schema (physical types) from arrow's schema. let parquet_schema = to_parquet_schema(&schema)?; - let row_groups = batches.iter().map(|batch| { + let row_groups = chunks.iter().map(|batch| { // write batch to pages; parallelized by rayon - let columns = batch + let columns = chunk .columns() .par_iter() .zip(parquet_schema.fields().to_vec()) @@ -140,7 +140,7 @@ fn main() -> Result<()> { let schema = Schema { fields: vec![ Field::new("c1", DataType::Int32, true), - Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Utf8, true), ], metadata: Default::default(), };