Skip to content

Commit

Permalink
Add filter pushdown example (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 27, 2022
1 parent 42135a9 commit 43b5ea3
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 12 deletions.
224 changes: 224 additions & 0 deletions parquet/examples/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
//! Generates a large parquet file containing dictionary encoded data and demonstrates how
//! the page index, and the record skipping API can dramatically improve performance
use arrow::array::{
Array, ArrayRef, Float64Builder, Int32Builder, StringBuilder, StringDictionaryBuilder,
};
use arrow::compute::SlicesIterator;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection};
use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader, ProjectionMask};
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::{thread_rng, Rng};
use std::cmp::Ordering;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;

const NUM_ROW_GROUPS: usize = 2;
const ROWS_PER_ROW_GROUP: usize = 100_000;
const ROWS_PER_FILE: usize = ROWS_PER_ROW_GROUP * NUM_ROW_GROUPS;

fn generate_batch() -> RecordBatch {
let string_dict_t =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

let schema = Arc::new(Schema::new(vec![
Field::new("dict1", string_dict_t.clone(), true),
Field::new("dict2", string_dict_t, true),
Field::new("f64_values", DataType::Float64, true),
]));

let mut dict1 = StringDictionaryBuilder::new(
Int32Builder::new(ROWS_PER_FILE),
StringBuilder::new(1024),
);
let mut dict2 = StringDictionaryBuilder::new(
Int32Builder::new(ROWS_PER_FILE),
StringBuilder::new(1024),
);
let mut values = Float64Builder::new(ROWS_PER_FILE);

let dict = &["key0", "key1", "key2", "key3", "key4", "key5"];
let mut rng = thread_rng();
for _ in 0..ROWS_PER_FILE {
match rng.gen_bool(0.5) {
true => {
dict1.append(&dict[rng.gen_range(0..dict.len())]).unwrap();
}
false => dict1.append_null(),
}

match rng.gen_bool(0.75) {
true => {
dict2.append(&dict[rng.gen_range(0..dict.len())]).unwrap();
}
false => dict2.append_null(),
}

values.append_value(rng.gen());
}

RecordBatch::try_new(
schema,
vec![
Arc::new(dict1.finish()),
Arc::new(dict2.finish()),
Arc::new(values.finish()),
],
)
.unwrap()
}

fn generate_parquet() -> Vec<u8> {
let mut out = Vec::with_capacity(1024);

let data = generate_batch();

let props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_max_row_group_size(ROWS_PER_ROW_GROUP)
.build();
let mut writer = ArrowWriter::try_new(&mut out, data.schema(), Some(props)).unwrap();

writer.write(&data).unwrap();

let metadata = writer.close().unwrap();
assert_eq!(metadata.row_groups.len(), 2);
assert!(metadata.row_groups[0].columns[0]
.column_index_length
.is_some());
out
}

fn evaluate_basic(file: Bytes) -> ArrayRef {
let mut reader = ParquetFileArrowReader::try_new(file).unwrap();

let batches: Vec<_> = reader
.get_record_reader(1024)
.unwrap()
.map(|result| {
let batch = result.unwrap();

let filter_a =
arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[0], "key0").unwrap();
let filter_b =
arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[1], "key1").unwrap();

let combined = arrow::compute::and(&filter_a, &filter_b).unwrap();
arrow::compute::filter(&batch.column(2), &combined).unwrap()
})
.collect();

let arrays: Vec<_> = batches.iter().map(|x| x.as_ref()).collect();
arrow::compute::concat(&arrays).unwrap()
}

fn selection_from_ranges(
ranges: Vec<Range<usize>>,
total_rows: usize,
) -> Vec<RowSelection> {
let mut selection: Vec<RowSelection> = Vec::with_capacity(ranges.len() * 2);
let mut last_end = 0;
for range in ranges {
let len = range.end - range.start;

match range.start.cmp(&last_end) {
Ordering::Equal => match selection.last_mut() {
Some(last) => last.row_count += len,
None => selection.push(RowSelection::select(len)),
},
Ordering::Greater => {
selection.push(RowSelection::skip(range.start - last_end));
selection.push(RowSelection::select(len))
}
Ordering::Less => panic!("out of order"),
}
last_end = range.end;
}

if last_end != total_rows {
selection.push(RowSelection::skip(total_rows - last_end))
}

selection
}

fn evaluate_selection(
mut reader: ParquetFileArrowReader,
column: usize,
key: &str,
) -> Vec<RowSelection> {
let mask = ProjectionMask::leaves(reader.parquet_schema(), [column]);

let mut range_offset = 0;
let mut ranges = vec![];
for result in reader.get_record_reader_by_columns(mask, 1024).unwrap() {
let batch = result.unwrap();
let filter =
arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[0], key).unwrap();

let valid = SlicesIterator::new(&filter)
.map(|(start, end)| start + range_offset..end + range_offset);
ranges.extend(valid);
range_offset += batch.num_rows();
}

let selection = selection_from_ranges(ranges, range_offset);

// Sanity check
let total = selection.iter().map(|x| x.row_count).sum::<usize>();
assert_eq!(total, range_offset);

selection
}

fn evaluate_pushdown(file: Bytes) -> ArrayRef {
// TODO: This could also make use of the page index

let reader = ParquetFileArrowReader::try_new(file.clone()).unwrap();
let selection = evaluate_selection(reader, 0, "key0");

// Perhaps we need a way to keep the provide a selection to an existing reader?
let options = ArrowReaderOptions::default().with_row_selection(selection);
let reader =
ParquetFileArrowReader::try_new_with_options(file.clone(), options).unwrap();
let selection = evaluate_selection(reader, 1, "key1");

let total_rows = selection
.iter()
.filter_map(|x| (!x.skip).then(|| x.row_count))
.sum::<usize>();

let options = ArrowReaderOptions::default().with_row_selection(selection);
let mut reader = ParquetFileArrowReader::try_new_with_options(file, options).unwrap();
let mask = ProjectionMask::leaves(reader.parquet_schema(), [2]);

let batch = reader
.get_record_reader_by_columns(mask, total_rows)
.unwrap()
.next()
.unwrap()
.unwrap();

batch.column(0).clone()
}

fn main() {
let data: Bytes = generate_parquet().into();

let t0 = Instant::now();
let basic = evaluate_basic(data.clone());
let t1 = Instant::now();
let complex = evaluate_pushdown(data);
let t2 = Instant::now();

assert_eq!(basic.data(), complex.data());
println!(
"Simple strategy took {}s vs {}s",
(t1 - t0).as_secs_f64(),
(t2 - t1).as_secs_f64()
);
}
13 changes: 3 additions & 10 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait ArrowReader {
/// [`RowSelection`] allows selecting or skipping a provided number of rows
/// when scanning the parquet file
#[derive(Debug, Clone, Copy)]
pub(crate) struct RowSelection {
pub struct RowSelection {
/// The number of rows
pub row_count: usize,

Expand Down Expand Up @@ -127,12 +127,7 @@ impl ArrowReaderOptions {
}

/// Scan rows from the parquet file according to the provided `selection`
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn with_row_selection(
self,
selection: impl Into<Vec<RowSelection>>,
) -> Self {
pub fn with_row_selection(self, selection: impl Into<Vec<RowSelection>>) -> Self {
Self {
selection: Some(selection.into()),
..self
Expand Down Expand Up @@ -359,9 +354,7 @@ impl ParquetRecordBatchReader {
/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn new(
pub fn new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
selection: Option<VecDeque<RowSelection>>,
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn read_columns_indexes<R: ChunkReader>(
let length = lengths.iter().sum::<usize>();

//read all need data into buffer
let mut reader = reader.get_read(offset, reader.len() as usize)?;
let mut reader = reader.get_read(offset, length)?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

Expand Down Expand Up @@ -65,7 +65,7 @@ pub fn read_pages_locations<R: ChunkReader>(
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;

//read all need data into buffer
let mut reader = reader.get_read(offset, reader.len() as usize)?;
let mut reader = reader.get_read(offset, total_length)?;
let mut data = vec![0; total_length];
reader.read_exact(&mut data)?;

Expand Down

0 comments on commit 43b5ea3

Please sign in to comment.