Skip to content

Commit

Permalink
Decouple parquet fuzz tests from converter (apache#1661)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 9, 2022
1 parent 80a6ef7 commit 1bce846
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 185 deletions.
198 changes: 87 additions & 111 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ mod tests {
ParquetRecordBatchReader, RowSelection,
};
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
};
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::{ArrowWriter, ProjectionMask};
Expand Down Expand Up @@ -517,29 +516,29 @@ mod tests {

#[test]
fn test_primitive_single_column_reader_test() {
run_single_column_reader_tests::<BoolType, BooleanArray, _, BoolType>(
run_single_column_reader_tests::<BoolType, _, BoolType>(
2,
ConvertedType::NONE,
None,
&FromConverter::new(),
|vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
&[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
);
run_single_column_reader_tests::<Int32Type, Int32Array, _, Int32Type>(
run_single_column_reader_tests::<Int32Type, _, Int32Type>(
2,
ConvertedType::NONE,
None,
&FromConverter::new(),
|vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_BINARY_PACKED,
],
);
run_single_column_reader_tests::<Int64Type, Int64Array, _, Int64Type>(
run_single_column_reader_tests::<Int64Type, _, Int64Type>(
2,
ConvertedType::NONE,
None,
&FromConverter::new(),
|vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
&[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Expand All @@ -561,33 +560,23 @@ mod tests {
#[test]
fn test_fixed_length_binary_column_reader() {
let converter = FixedSizeArrayConverter::new(20);
run_single_column_reader_tests::<
FixedLenByteArrayType,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
RandFixedLenGen,
>(
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
20,
ConvertedType::NONE,
None,
&converter,
|vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}

#[test]
fn test_interval_day_time_column_reader() {
let converter = IntervalDayTimeArrayConverter {};
run_single_column_reader_tests::<
FixedLenByteArrayType,
IntervalDayTimeArray,
IntervalDayTimeArrayConverter,
RandFixedLenGen,
>(
run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
12,
ConvertedType::INTERVAL,
None,
&converter,
|vals| Arc::new(converter.convert(vals.to_vec()).unwrap()),
&[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
);
}
Expand All @@ -602,53 +591,52 @@ mod tests {

#[test]
fn test_utf8_single_column_reader_test() {
fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
})))
}

let encodings = &[
Encoding::PLAIN,
Encoding::RLE_DICTIONARY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::DELTA_BYTE_ARRAY,
];

let converter = BinaryArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
BinaryArray,
BinaryArrayConverter,
RandUtf8Gen,
>(2, ConvertedType::NONE, None, &converter, encodings);

let utf8_converter = Utf8ArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(2, ConvertedType::UTF8, None, &utf8_converter, encodings);

run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::NONE,
None,
|vals| {
Arc::new(BinaryArray::from_iter(
vals.iter().map(|x| x.as_ref().map(|x| x.data())),
))
},
encodings,
);

run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
None,
string_converter::<i32>,
encodings,
);

run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(ArrowDataType::Utf8),
&utf8_converter,
string_converter::<i32>,
encodings,
);

let large_utf8_converter = LargeUtf8ArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
LargeStringArray,
LargeUtf8ArrayConverter,
RandUtf8Gen,
>(
run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(ArrowDataType::LargeUtf8),
&large_utf8_converter,
string_converter::<i64>,
encodings,
);

Expand All @@ -658,21 +646,21 @@ mod tests {
let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
opts.encoding = *encoding;

let data_type = ArrowDataType::Dictionary(
Box::new(key.clone()),
Box::new(ArrowDataType::Utf8),
);

// Cannot run full test suite as keys overflow, run small test instead
single_column_reader_test::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(
single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
opts,
2,
ConvertedType::UTF8,
Some(ArrowDataType::Dictionary(
Box::new(key.clone()),
Box::new(ArrowDataType::Utf8),
)),
&utf8_converter,
Some(data_type.clone()),
move |vals| {
let vals = string_converter::<i32>(vals);
arrow::compute::cast(&vals, &data_type).unwrap()
},
);
}
}
Expand All @@ -687,37 +675,37 @@ mod tests {
];

for key in &key_types {
run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(
let data_type = ArrowDataType::Dictionary(
Box::new(key.clone()),
Box::new(ArrowDataType::Utf8),
);

run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2,
ConvertedType::UTF8,
Some(ArrowDataType::Dictionary(
Box::new(key.clone()),
Box::new(ArrowDataType::Utf8),
)),
&utf8_converter,
Some(data_type.clone()),
move |vals| {
let vals = string_converter::<i32>(vals);
arrow::compute::cast(&vals, &data_type).unwrap()
},
encodings,
);

// https://github.com/apache/arrow-rs/issues/1179
// run_single_column_reader_tests::<
// ByteArrayType,
// LargeStringArray,
// LargeUtf8ArrayConverter,
// RandUtf8Gen,
// >(
// let data_type = ArrowDataType::Dictionary(
// Box::new(key.clone()),
// Box::new(ArrowDataType::LargeUtf8),
// );
//
// run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
// 2,
// ConvertedType::UTF8,
// Some(ArrowDataType::Dictionary(
// Box::new(key.clone()),
// Box::new(ArrowDataType::LargeUtf8),
// )),
// &large_utf8_converter,
// encodings
// Some(data_type.clone()),
// move |vals| {
// let vals = string_converter::<i64>(vals);
// arrow::compute::cast(&vals, &data_type).unwrap()
// },
// encodings,
// );
}
}
Expand Down Expand Up @@ -1004,17 +992,16 @@ mod tests {
///
/// `rand_max` represents the maximum size of value to pass to to
/// value generator
fn run_single_column_reader_tests<T, A, C, G>(
fn run_single_column_reader_tests<T, F, G>(
rand_max: i32,
converted_type: ConvertedType,
arrow_type: Option<ArrowDataType>,
converter: &C,
converter: F,
encodings: &[Encoding],
) where
T: DataType,
G: RandGen<T>,
A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
let mut all_options = vec![
// choose record_batch_batch (15) so batches cross row
Expand Down Expand Up @@ -1090,12 +1077,12 @@ mod tests {
..opts.clone()
};

single_column_reader_test::<T, A, C, G>(
single_column_reader_test::<T, _, G>(
opts,
rand_max,
converted_type,
arrow_type.clone(),
converter,
&converter,
)
}
}
Expand All @@ -1105,17 +1092,16 @@ mod tests {
/// Create a parquet file and then read it using
/// `ParquetFileArrowReader` using the parameters described in
/// `opts`.
fn single_column_reader_test<T, A, C, G>(
fn single_column_reader_test<T, F, G>(
opts: TestOptions,
rand_max: i32,
converted_type: ConvertedType,
arrow_type: Option<ArrowDataType>,
converter: &C,
converter: F,
) where
T: DataType,
G: RandGen<T>,
A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
// Print out options to facilitate debugging failures on CI
println!(
Expand Down Expand Up @@ -1231,19 +1217,9 @@ mod tests {
let batch = maybe_batch.unwrap().unwrap();
assert_eq!(end - total_read, batch.num_rows());

let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
let a = converter(&expected_data[total_read..end]);
let b = Arc::clone(batch.column(0));

let a = converter.convert(data).unwrap();
let mut b = Arc::clone(batch.column(0));

if let Some(arrow_type) = arrow_type.as_ref() {
assert_eq!(b.data_type(), arrow_type);
if let ArrowDataType::Dictionary(_, v) = arrow_type {
assert_eq!(a.data_type(), v.as_ref());
b = arrow::compute::cast(&b, v.as_ref()).unwrap()
}
}
assert_eq!(a.data_type(), b.data_type());
assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), b.data());

Expand Down Expand Up @@ -1282,12 +1258,12 @@ mod tests {
def_levels: Option<&Vec<Vec<i16>>>,
file: File,
schema: TypePtr,
field: Option<arrow::datatypes::Field>,
field: Option<Field>,
opts: &TestOptions,
) -> Result<parquet_format::FileMetaData> {
let mut writer_props = opts.writer_props();
if let Some(field) = field {
let arrow_schema = arrow::datatypes::Schema::new(vec![field]);
let arrow_schema = Schema::new(vec![field]);
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
}

Expand Down
Loading

0 comments on commit 1bce846

Please sign in to comment.