Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x perf improvement (#171) #1180

Merged
merged 10 commits into from
Jan 24, 2022
135 changes: 128 additions & 7 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::Array;
use arrow::datatypes::DataType;
use criterion::{criterion_group, criterion_main, Criterion};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
Expand All @@ -24,6 +26,7 @@ use parquet::{
data_type::{ByteArrayType, Int32Type},
schema::types::{ColumnDescPtr, SchemaDescPtr},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{collections::VecDeque, sync::Arc};

fn build_test_schema() -> SchemaDescPtr {
Expand All @@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;

use arrow::array::Array;
use rand::{rngs::StdRng, Rng, SeedableRng};

pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
Expand Down Expand Up @@ -333,6 +333,46 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

make_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
true,
)
.unwrap()
}

fn create_complex_object_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::{
make_byte_array_dictionary_reader, ComplexObjectArrayReader,
};
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

let converter = Utf8Converter::new(Utf8ArrayConverter {});
Box::new(
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
Some(arrow_type),
)
.unwrap(),
)
}

fn add_benches(c: &mut Criterion) {
const EXPECTED_VALUE_COUNT: usize =
NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
Expand All @@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) {
let mandatory_int32_column_desc = schema.column(0);
let optional_int32_column_desc = schema.column(1);
let mandatory_string_column_desc = schema.column(2);
// println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
let optional_string_column_desc = schema.column(3);
// println!("optional_string_column_desc: {:?}", optional_string_column_desc);

// primitive / int32 benchmarks
// =============================

Expand Down Expand Up @@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) {

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
schema,
schema.clone(),
optional_string_column_desc.clone(),
0.5,
);
Expand Down Expand Up @@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) {
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.finish();
}

Expand Down
4 changes: 3 additions & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
decoder.get_batch(keys.spare_capacity_mut(len))
let keys_slice = keys.spare_capacity_mut(range.start + len);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to figure out why this didn't get caught by the fuzz tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix in bf28e16

let len = decoder.get_batch(&mut keys_slice[range.start..])?;
Ok(len)
}
None => {
// Sad path - need to recompute dictionary
Expand Down