From 4cf86d6f1a888192ce5ec9520aa62196ef4639d9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 29 Oct 2021 16:27:13 +0000 Subject: [PATCH] Fixed example. --- examples/parquet_read_parallel.rs | 28 +++++++++++++---------- src/io/parquet/read/README.md | 37 ++++++++++++++++--------------- src/io/parquet/read/mod.rs | 2 +- tests/it/io/parquet/mod.rs | 2 +- 4 files changed, 37 insertions(+), 32 deletions(-) diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index 532a735894e..2cd95205046 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -1,4 +1,5 @@ use crossbeam_channel::unbounded; +use parquet2::metadata::ColumnChunkMetaData; use std::fs::File; use std::sync::Arc; @@ -21,32 +22,35 @@ fn parallel_read(path: &str, row_group: usize) -> Result { let start = SystemTime::now(); // spawn a thread to produce `Vec` (IO bounded) let producer = thread::spawn(move || { - for field in 0..file_metadata.schema().fields().len() { + for (field_i, field) in file_metadata.schema().fields().iter().enumerate() { let start = SystemTime::now(); let mut columns = read::get_column_iterator( &mut file, &file_metadata, row_group, - field, + field_i, None, vec![], ); - println!("produce start - field: {}", field); + println!("produce start - field: {}", field_i); + let mut column_chunks = vec![]; while let read::State::Some(mut new_iter) = columns.advance().unwrap() { if let Some((pages, metadata)) = new_iter.get() { let pages = pages.collect::>(); - tx.send((field, metadata.clone(), pages)).unwrap(); + column_chunks.push((pages, metadata.clone())); } columns = new_iter; } + // todo: create API to allow sending each column (and not column chunks) to be processed in parallel + tx.send((field_i, field.clone(), column_chunks)).unwrap(); println!( "produce end - {:?}: {} {}", start.elapsed().unwrap(), - field, + field_i, row_group ); } @@ -60,22 +64,22 @@ fn parallel_read(path: &str, row_group: usize) -> Result { let arrow_schema_consumer = arrow_schema.clone(); thread::spawn(move || { let mut arrays = vec![]; - while let Ok((field, metadata, pages)) = rx_consumer.recv() { + while let Ok((field_i, field, column_chunks)) = rx_consumer.recv() { let start = SystemTime::now(); - let data_type = arrow_schema_consumer.fields()[field].data_type().clone(); - println!("consumer {} start - {}", i, field); + let data_type = arrow_schema_consumer.fields()[field_i].data_type().clone(); + println!("consumer {} start - {}", i, field_i); - let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let columns = read::ReadColumnIterator::new(field, column_chunks); - let array = read::page_iter_to_array(&mut pages, &metadata, data_type); + let array = read::column_iter_to_array(columns, data_type, vec![]).map(|x| x.0); println!( "consumer {} end - {:?}: {}", i, start.elapsed().unwrap(), - field + field_i ); - arrays.push((field, array)) + arrays.push((field_i, array)) } arrays }) diff --git a/src/io/parquet/read/README.md b/src/io/parquet/read/README.md index 30f8e5e194d..37bf71d5fd6 100644 --- a/src/io/parquet/read/README.md +++ b/src/io/parquet/read/README.md @@ -2,32 +2,33 @@ ### LSB equivalence between definition levels and bitmaps -* When the maximum repetition level is 0 and the maximum definition level is 1, - the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be - memcopied without further transformations. - -* Reading a parquet nested field can be done by reading each individual primitive - column and "building" the nested struct in arrow. +When the maximum repetition level is 0 and the maximum definition level is 1, +the RLE-encoded definition levels correspond exactly to Arrow's bitmap and can be +memcopied without further transformations. ## Nested parquet groups are deserialized recursively +Reading a parquet nested field is done by reading each primitive +column sequentially, and build the nested struct recursively. + Rows of nested parquet groups are encoded in the repetition and definition levels. In arrow, they correspond to: * list's offsets and validity * struct's validity -An implementation that leverages this observation: - -When nested types are observed in a parquet column, we recurse over the struct to gather -whether the type is a Struct or List and whether it is required or optional, which we store -in a `Vec`. `Nested` is an enum that can process definition and repetition -levels depending on the type and nullability. +The implementation in this module leverages this observation: -When processing pages, we process the definition and repetition levels into `Vec`. +Nested parquet fields are initially recursed over to gather +whether the type is a Struct or List, and whether it is required or optional, which we store +in `nested_info: Vec>`. `Nested` is a trait object that receives definition +and repetition levels depending on the type and nullability of the nested item. +We process the definition and repetition levels into `nested_info`. -When we finish a column chunk, we recursively pop `Vec` as we are building the `StructArray` -or `ListArray`. +When we finish a field, we recursively pop from `nested_info` as we build +the `StructArray` or `ListArray`. -With this approach, the only difference vs flat is that we cannot leverage the bitmap -optimization, and instead need to deserialize the repetition and definition -levels to `i32`. +With this approach, the only difference vs flat is: +1. we do not leverage the bitmap optimization, and instead need to deserialize the repetition + and definition levels to `i32`. +2. we deserialize definition levels twice, once to extend the values/nullability and + one to extend `nested_info`. diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 565ad37f00f..6bab223028f 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -18,7 +18,7 @@ pub use parquet2::{ decompress, get_column_iterator, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, BasicDecompressor, ColumnChunkIter, - Decompressor, MutStreamingIterator, PageFilter, PageIterator, State, + Decompressor, MutStreamingIterator, PageFilter, PageIterator, ReadColumnIterator, State, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 23d021b4ded..4d563ba4792 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -546,7 +546,7 @@ pub fn pyarrow_struct(column: usize) -> Box { Some(true), Some(true), ]; - let boolean = Arc::new(BooleanArray::from(boolean.clone())) as Arc; + let boolean = Arc::new(BooleanArray::from(boolean)) as Arc; let fields = vec![ Field::new("f1", DataType::Utf8, true), Field::new("f2", DataType::Boolean, true),