Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved examples #1109

Merged
merged 1 commit into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ async fn main() -> Result<()> {
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
let chunk = handle.await.unwrap()?;
assert!(!chunk.is_empty());
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;
let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?;

println!("{:#?}", reader.schema());

Expand Down
15 changes: 10 additions & 5 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,37 @@ use arrow2::{

mod logger;

/// Advances each iterator in parallel
/// # Panic
/// If the iterators are empty
fn deserialize_parallel(columns: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> {
fn deserialize_parallel(iters: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> {
// CPU-bounded
let columns = columns
let arrays = iters
.par_iter_mut()
.map(|iter| iter.next().transpose())
.collect::<Result<Vec<_>>>()?;

Chunk::try_new(columns.into_iter().map(|x| x.unwrap()).collect())
Chunk::try_new(arrays.into_iter().map(|x| x.unwrap()).collect())
}

fn parallel_read(path: &str, row_group: usize) -> Result<()> {
// open the file
let mut file = BufReader::new(File::open(path)?);

// read Parquet's metadata and infer Arrow schema
let metadata = read::read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata)?;

// select the row group from the metadata
let row_group = &metadata.row_groups[row_group];

let chunk_size = 1024 * 8;
let chunk_size = 1024 * 8 * 8;

// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let mut columns =
read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?;

// deserialize (CPU-bounded) to arrow
// deserialize (CPU-bounded) to Arrow in chunks
let mut num_rows = row_group.num_rows();
while num_rows > 0 {
num_rows = num_rows.saturating_sub(chunk_size);
Expand Down
56 changes: 56 additions & 0 deletions examples/parquet_write_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use futures::SinkExt;
use tokio::fs::File;

use arrow2::{
array::{Array, Int32Array},
chunk::Chunk,
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
transverse, CompressionOptions, Encoding, FileSink, Version, WriteOptions,
},
};
use tokio_util::compat::TokioAsyncReadCompatExt;

async fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter());

// Create a new empty file
let file = File::create(path).await?.compat();

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.collect();

let mut writer = FileSink::try_new(file, schema, encodings, options)?;

writer.send_all(&mut stream).await?;
writer.close().await?;
Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns).await
}
8 changes: 4 additions & 4 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FallibleStreamingIterator for Bla {
}
}

fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> {
fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
// declare the options
let options = WriteOptions {
write_statistics: true,
Expand Down Expand Up @@ -70,9 +70,9 @@ fn parallel_write(path: &str, schema: Schema, batches: &[Chunk]) -> Result<()> {
// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(&schema)?;

let row_groups = batches.iter().map(|batch| {
let row_groups = chunks.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
let columns = chunk
.columns()
.par_iter()
.zip(parquet_schema.fields().to_vec())
Expand Down Expand Up @@ -140,7 +140,7 @@ fn main() -> Result<()> {
let schema = Schema {
fields: vec![
Field::new("c1", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Utf8, true),
],
metadata: Default::default(),
};
Expand Down