This repository has been archived by the owner on Feb 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 224
/
Copy pathparquet_read_async.rs
57 lines (48 loc) · 2.29 KB
/
parquet_read_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
use std::sync::Arc;
use std::time::SystemTime;
use futures::future::BoxFuture;
use futures::FutureExt;
use tokio;
use tokio::fs::File;
use tokio::io::BufReader;
use tokio_util::compat::*;
use arrow2::error::Result;
use arrow2::io::parquet::read::{self, RowGroupDeserializer};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let start = SystemTime::now();
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = Arc::new(args[1].clone());
// # Read metadata
let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat();
// this operation is usually done before reading the data, during planning.
// This is a mix of IO and CPU-bounded tasks but both of them are O(1)
let metadata = read::read_metadata_async(&mut reader).await?;
let schema = read::infer_schema(&metadata)?;
// This factory yields one file descriptor per column and is used to read columns concurrently.
// They do not need to be buffered since we execute exactly 1 seek and 1 read on them.
let factory = || {
Box::pin(async { Ok(File::open(file_path.clone().as_ref()).await?.compat()) })
as BoxFuture<_>
};
// This is the row group loop. Groups can be skipped based on the statistics they carry.
for row_group in &metadata.row_groups {
// A row group is consumed in two steps: the first step is to read the (compressed)
// columns into memory, which is IO-bounded.
let column_chunks =
read::read_columns_many_async(factory, row_group, schema.fields.clone(), None).await?;
// the second step is to iterate over the columns in chunks.
// this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
}
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}