Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified API
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 3, 2022
1 parent 5a07aa6 commit ad96f97
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 391 deletions.
45 changes: 11 additions & 34 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,23 @@
use std::fs::File;
use std::io::BufReader;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::io::parquet::read;
use arrow2::{array::Array, error::Result};

fn read_field(path: &str, row_group: usize, field: usize) -> Result<Box<dyn Array>> {
// Open a file, a common operation in Rust
let mut file = BufReader::new(File::open(path)?);

// Read the files' metadata. This has a small IO cost because it requires seeking to the end
// of the file to read its footer.
let metadata = read::read_metadata(&mut file)?;

// Convert the files' metadata into an arrow schema. This is CPU-only and amounts to
// parse thrift if the arrow format is available on a key, or infering the arrow schema from
// the parquet's physical, converted and logical types.
let arrow_schema = read::get_schema(&metadata)?;

// Created an iterator of column chunks. Each iteration
// yields an iterator of compressed pages. There is almost no CPU work in iterating.
let columns = read::get_column_iterator(&mut file, &metadata, row_group, field, None, vec![]);

// get the columns' field
let field = &arrow_schema.fields[field];

// This is the actual work. In this case, pages are read and
// decompressed, decoded and deserialized to arrow.
// Because `columns` is an iterator, it uses a combination of IO and CPU.
let (array, _, _) = read::column_iter_to_array(columns, field, vec![])?;

Ok(array)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let field = args[2].parse::<usize>().unwrap();
let row_group = args[3].parse::<usize>().unwrap();

let array = read_field(file_path, row_group, field)?;
println!("{:?}", array);
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;

let start = SystemTime::now();
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert!(!columns.is_empty());
}
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
115 changes: 0 additions & 115 deletions examples/parquet_read_parallel.rs

This file was deleted.

61 changes: 14 additions & 47 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,28 @@ use std::time::SystemTime;

use rayon::prelude::*;

use arrow2::{
array::Array, chunk::Chunk, error::Result, io::parquet::read,
io::parquet::read::MutStreamingIterator,
};
use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};

fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
let mut file = BufReader::new(File::open(path)?);
let file_metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&file_metadata)?;
let metadata = read::read_metadata(&mut file)?;
let schema = read::get_schema(&metadata)?;

// IO-bounded
let columns = file_metadata
.schema()
.fields()
.iter()
.enumerate()
.map(|(field_i, field)| {
let start = SystemTime::now();
println!("read start - field: {}", field_i);
let mut columns = read::get_column_iterator(
&mut file,
&file_metadata,
row_group,
field_i,
None,
vec![],
);

let mut column_chunks = vec![];
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}
println!(
"read end - {:?}: {} {}",
start.elapsed().unwrap(),
field_i,
row_group
);
(field_i, field.clone(), column_chunks)
})
.collect::<Vec<_>>();
// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
&mut file,
&metadata.row_groups[row_group],
schema.fields,
None,
)?;

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|(field_i, parquet_field, column_chunks)| {
let columns = read::ReadColumnIterator::new(parquet_field, column_chunks);
let field = &schema.fields()[field_i];

read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into())
.map(|mut iter| {
// when chunk_size != None, `iter` must be iterated multiple times to get all the chunks,
// and some synchronization is required to output a single `Chunk` per iterator
iter.next().unwrap()
})
.collect::<Result<Vec<_>>>()?;

Expand Down
23 changes: 0 additions & 23 deletions examples/parquet_read_record.rs

This file was deleted.

7 changes: 2 additions & 5 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::ArrayIter;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
Expand Down Expand Up @@ -163,11 +164,7 @@ where
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, K, O, I>(
iter: I,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<Arc<dyn Array>>> + 'a>
pub fn iter_to_arrays<'a, K, O, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
Expand Down
27 changes: 11 additions & 16 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use std::sync::Arc;

use crate::{
array::{Array, Offset},
datatypes::{DataType, Field},
error::Result,
};

mod basic;
mod dictionary;
mod nested;
mod utils;

pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use self::basic::TraitBinaryArray;
use std::sync::Arc;

use crate::{
array::{Array, Offset},
datatypes::{DataType, Field},
};

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::{nested_utils::NestedState, DataPages};
use super::ArrayIter;
use super::{nested_utils::NestedArrayIter, DataPages};
use basic::BinaryArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, O, A, I>(
iter: I,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<Arc<dyn Array>>> + 'a>
pub fn iter_to_arrays<'a, O, A, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
Expand All @@ -42,7 +37,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>(
field: Field,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<(NestedState, Arc<dyn Array>)>> + 'a>
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
Expand Down
12 changes: 4 additions & 8 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ use std::sync::Arc;
use crate::{
array::Array,
datatypes::{DataType, Field},
error::Result,
};

use self::basic::BooleanArrayIterator;
use self::nested::ArrayIterator;
use super::{nested_utils::NestedState, DataPages};
use super::ArrayIter;
use super::{nested_utils::NestedArrayIter, DataPages};

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I: 'a>(
iter: I,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<Arc<dyn Array>>> + 'a>
pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: DataPages,
{
Expand All @@ -33,7 +29,7 @@ pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
field: Field,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<(NestedState, Arc<dyn Array>)>> + 'a>
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Expand Down
Loading

0 comments on commit ad96f97

Please sign in to comment.