Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x perf improvement (#171) #1180

Merged
merged 10 commits into from
Jan 24, 2022
68 changes: 26 additions & 42 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

#[cfg(test)]
tustvold marked this conversation as resolved.
Show resolved Hide resolved
mod test_util;

pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -271,7 +276,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand Down Expand Up @@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down
71 changes: 7 additions & 64 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,69 +579,18 @@ impl ByteArrayDecoderDictionary {
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{
byte_array_all_encodings, utf8_column,
};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, ByteArrayType};
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
use crate::util::memory::MemTracker;
use arrow::array::{Array, StringArray};
use std::sync::Arc;

fn column() -> ColumnDescPtr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is extracted out into test_utils

let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.build()
.unwrap();

Arc::new(ColumnDescriptor::new(
Arc::new(t),
1,
0,
ColumnPath::new(vec![]),
))
}

fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
let descriptor = column();
let mem_tracker = Arc::new(MemTracker::new());
let mut encoder =
get_encoder::<ByteArrayType>(descriptor, encoding, mem_tracker).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
}

#[test]
fn test_byte_array_decoder() {
let data: Vec<_> = vec!["hello", "world", "a", "b"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is extracted out into test_utils so that it can be reused

.into_iter()
.map(ByteArray::from)
.collect();

let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column(), Arc::new(MemTracker::new()));

dict_encoder.put(&data).unwrap();
let encoded_rle = dict_encoder.flush_buffer().unwrap();
let encoded_dictionary = dict_encoder.write_dict().unwrap();

// A column chunk with all the encodings!
let pages = vec![
(Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
(
Encoding::DELTA_BYTE_ARRAY,
get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
),
(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
),
(Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
(Encoding::RLE_DICTIONARY, encoded_rle),
];
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = column();
let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
Expand All @@ -668,15 +617,9 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, true, true, false, false];
let rev_position_iter = valid
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of how changing pad_nulls to take a packed bitmask makes the tests less verbose

.iter()
.enumerate()
.rev()
.filter_map(|(i, valid)| valid.then(|| i));

let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), rev_position_iter);
output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

Expand Down
Loading