Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Allow a different arrow schema into FileReader
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 8, 2022
1 parent e8686b7 commit 728c955
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 115 deletions.
34 changes: 20 additions & 14 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ fn to_buffer(
buffer
}

fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);
fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let mut reader = Cursor::new(buffer);

let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?;
let metadata = read::read_metadata(&mut reader)?;

let schema = read::infer_schema(&metadata)?;

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata, schema, None, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand All @@ -49,43 +55,43 @@ fn add_benchmark(c: &mut Criterion) {
let size = 2usize.pow(i);
let buffer = to_buffer(size, true, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let a = format!("read utf8 large 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 6).unwrap()));

let a = format!("read utf8 emoji 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 12).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 12).unwrap()));

let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, true, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, false, false, false);
let a = format!("read required utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));
});
}

Expand Down
23 changes: 13 additions & 10 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::error::Error;
use arrow2::io::parquet::read;

fn main() -> Result<()> {
fn main() -> Result<(), Error> {
// say we have a file
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let mut reader = File::open(file_path)?;

// we can read its metadata:
let metadata = read::read_metadata(&mut reader)?;
let reader = read::FileReader::try_new(reader, metadata, None, Some(1024 * 8 * 8), None, None)?;

println!("{:#?}", reader.schema());
// and infer a [`Schema`] from the `metadata`.
let schema = read::infer_schema(&metadata)?;

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];
// we can filter the columns we need (here we select all)
let schema = schema.filter(|_index, _field| true);

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;
// we can read the statistics of all parquet's row groups (here for the first field)
let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?;

println!("{:#?}", statistics);

// and create an iterator of
let reader = read::FileReader::new(reader, metadata, schema, Some(1024 * 8 * 8), None, None);

let start = SystemTime::now();
for maybe_chunk in reader {
let chunk = maybe_chunk?;
Expand Down
22 changes: 22 additions & 0 deletions src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ impl Schema {
metadata,
}
}

/// Returns a new [`Schema`] with a subset of all fields whose `predicate`
/// evaluates to true.
pub fn filter<F: Fn(usize, &Field) -> bool>(self, predicate: F) -> Self {
let fields = self
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if (predicate)(index, &f) {
Some(f)
} else {
None
}
})
.collect();

Schema {
fields,
metadata: self.metadata,
}
}
}

impl From<Vec<Field>> for Schema {
Expand Down
78 changes: 11 additions & 67 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ use std::sync::Arc;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::Result;
use crate::io::parquet::read::read_columns_many;
use crate::{
datatypes::Field,
error::{Error, Result},
};

use super::{infer_schema, FileMetaData, RowGroupDeserializer, RowGroupMetaData};
use super::{FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;

Expand All @@ -20,88 +17,37 @@ type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;
/// mapped to an [`Iterator<Item=Chunk>`] and each iterator is iterated upon until either the limit
/// or the last iterator ends.
/// # Implementation
/// This iterator mixes IO-bounded and CPU-bounded operations.
/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them.
pub struct FileReader<R: Read + Seek> {
row_groups: RowGroupReader<R>,
metadata: FileMetaData,
remaining_rows: usize,
current_row_group: Option<RowGroupDeserializer>,
}

impl<R: Read + Seek> FileReader<R> {
/// Returns a new [`FileReader`].
///
/// # Error
/// This function errors iff:
/// * it is not possible to derive an arrow schema from the parquet file
/// * the projection contains columns that do not exist
pub fn try_new(
pub fn new(
reader: R,
metadata: FileMetaData,
projection: Option<&[usize]>,
schema: Schema,
chunk_size: Option<usize>,
limit: Option<usize>,
groups_filter: Option<GroupFilter>,
) -> Result<Self> {
let schema = infer_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
schema
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if projection.iter().any(|&i| i == index) {
Some(f)
} else {
None
}
})
.collect()
} else {
schema.fields.into_iter().collect()
};

if let Some(projection) = &projection {
if fields.len() != projection.len() {
return Err(Error::InvalidArgumentError(
"While reading parquet, some columns in the projection do not exist in the file"
.to_string(),
));
}
}

let schema = Schema {
fields,
metadata: schema_metadata,
};

) -> Self {
let row_groups = RowGroupReader::new(
reader,
schema,
groups_filter,
metadata.row_groups.clone(),
metadata.row_groups,
chunk_size,
limit,
);

Ok(Self {
Self {
row_groups,
metadata,
remaining_rows: limit.unwrap_or(usize::MAX),
current_row_group: None,
})
}

/// Returns the derived arrow [`Schema`] of the file
pub fn schema(&self) -> &Schema {
&self.row_groups.schema
}

/// Returns parquet's [`FileMetaData`].
pub fn metadata(&self) -> &FileMetaData {
&self.metadata
}
}

/// Sets the groups filter
Expand Down Expand Up @@ -239,12 +185,10 @@ impl<R: Read + Seek> RowGroupReader<R> {

let result = RowGroupDeserializer::new(
column_chunks,
row_group.num_rows() as usize,
row_group.num_rows(),
Some(self.remaining_rows),
);
self.remaining_rows = self
.remaining_rows
.saturating_sub(row_group.num_rows() as usize);
self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows());
Ok(Some(result))
}
}
Expand Down
43 changes: 22 additions & 21 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,15 @@ pub fn read_column<R: Read + Seek>(mut reader: R, column: &str) -> Result<ArrayS
&schema.fields,
)?;

let column = schema
.fields
.iter()
.enumerate()
.find_map(|(i, f)| if f.name == column { Some(i) } else { None })
.unwrap();
let schema = schema.filter(|_, f| f.name == column);

let field = &schema.fields[column];
let field = &schema.fields[0];

let statistics = deserialize(field, &metadata.row_groups)?;

let mut reader =
p_read::FileReader::try_new(reader, metadata, Some(&[column]), None, None, None)?;
let metadata = p_read::read_metadata(&mut reader)?;
let schema = p_read::infer_schema(&metadata)?;
let mut reader = p_read::FileReader::new(reader, metadata, schema, None, None, None);

Ok((
reader.next().unwrap()?.into_arrays().pop().unwrap(),
Expand Down Expand Up @@ -1153,13 +1149,21 @@ type IntegrationRead = (Schema, Vec<Chunk<Box<dyn Array>>>);
fn integration_read(data: &[u8], limit: Option<usize>) -> Result<IntegrationRead> {
let mut reader = Cursor::new(data);
let metadata = p_read::read_metadata(&mut reader)?;
let reader = p_read::FileReader::try_new(Cursor::new(data), metadata, None, None, limit, None)?;
let schema = reader.schema().clone();
let schema = p_read::infer_schema(&metadata)?;

for field in &schema.fields {
let mut _statistics = deserialize(field, &reader.metadata().row_groups)?;
let mut _statistics = deserialize(field, &metadata.row_groups)?;
}

let reader = p_read::FileReader::new(
Cursor::new(data),
metadata,
schema.clone(),
None,
limit,
None,
);

let batches = reader.collect::<Result<Vec<_>>>()?;

Ok((schema, batches))
Expand Down Expand Up @@ -1534,24 +1538,21 @@ fn filter_chunk() -> Result<()> {

let metadata = p_read::read_metadata(&mut reader)?;

let reader = p_read::FileReader::try_new(
let new_schema = p_read::infer_schema(&metadata)?;
assert_eq!(new_schema, schema);

let reader = p_read::FileReader::new(
reader,
metadata,
None,
schema,
None,
None,
// select chunk 1
Some(std::sync::Arc::new(|i, _| i == 0)),
)?;
let new_schema = reader.schema().clone();

for field in &schema.fields {
let mut _statistics = deserialize(field, &reader.metadata().row_groups)?;
}
);

let new_chunks = reader.collect::<Result<Vec<_>>>()?;

assert_eq!(new_schema, schema);
assert_eq!(new_chunks, vec![chunk1]);
Ok(())
}
9 changes: 6 additions & 3 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ fn all_types() -> Result<()> {
let mut reader = std::fs::File::open(path)?;

let metadata = read_metadata(&mut reader)?;
let reader = FileReader::try_new(reader, metadata, None, None, None, None)?;
let schema = infer_schema(&metadata)?;
let reader = FileReader::new(reader, metadata, schema, None, None, None);

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -539,8 +540,9 @@ fn all_types_chunked() -> Result<()> {
let mut reader = std::fs::File::open(path)?;

let metadata = read_metadata(&mut reader)?;
let schema = infer_schema(&metadata)?;
// chunk it in 5 (so, (5,3))
let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?;
let reader = FileReader::new(reader, metadata, schema, Some(5), None, None);

let batches = reader.collect::<Result<Vec<_>>>()?;
assert_eq!(batches.len(), 2);
Expand Down Expand Up @@ -602,7 +604,8 @@ fn invalid_utf8() -> Result<()> {
let mut reader = Cursor::new(invalid_data);

let metadata = read_metadata(&mut reader)?;
let reader = FileReader::try_new(reader, metadata, None, Some(5), None, None)?;
let schema = infer_schema(&metadata)?;
let reader = FileReader::new(reader, metadata, schema, Some(5), None, None);

let error = reader.collect::<Result<Vec<_>>>().unwrap_err();
assert!(
Expand Down

0 comments on commit 728c955

Please sign in to comment.