Skip to content

Commit

Permalink
Fixed statistics writing flag and correct null_count in dictionaries (j…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 5, 2023
1 parent b93d806 commit 9f48859
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 68 deletions.
165 changes: 98 additions & 67 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn serialize_keys<K: DictionaryKey>(
array: &DictionaryArray<K>,
type_: PrimitiveType,
nested: &[Nested],
statistics: ParquetStatistics,
statistics: Option<ParquetStatistics>,
options: WriteOptions,
) -> Result<Page> {
let mut buffer = vec![];
Expand Down Expand Up @@ -146,7 +146,7 @@ fn serialize_keys<K: DictionaryKey>(
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
Some(statistics),
statistics,
type_,
options,
Encoding::RleDictionary,
Expand All @@ -159,8 +159,15 @@ macro_rules! dyn_prim {
let values = $array.values().as_any().downcast_ref().unwrap();

let buffer = primitive_encode_plain::<$from, $to>(values, false, vec![]);
let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone());
let stats = serialize_statistics(&stats);

let stats: Option<ParquetStatistics> = if $options.write_statistics {
let mut stats = primitive_build_statistics::<$from, $to>(values, $type_.clone());
stats.null_count = Some($array.null_count() as i64);
let stats = serialize_statistics(&stats);
Some(stats)
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
}};
}
Expand All @@ -175,69 +182,93 @@ pub fn array_to_pages<K: DictionaryKey>(
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
let (dict_page, statistics) = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options, type_),
DataType::Int16 => dyn_prim!(i16, i32, array, options, type_),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
dyn_prim!(i32, i32, array, options, type_)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_),
DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_),
DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_),
DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_),
DataType::UInt64 => dyn_prim!(u64, i64, array, options, type_),
DataType::Float32 => dyn_prim!(f32, f32, array, options, type_),
DataType::Float64 => dyn_prim!(f64, f64, array, options, type_),
DataType::Utf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i32>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, type_.clone());
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::LargeUtf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i64>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, type_.clone());
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::Binary => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i32>(array, false, &mut buffer);
let stats = binary_build_statistics(array, type_.clone());
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::LargeBinary => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i64>(array, false, &mut buffer);
let stats = binary_build_statistics(array, type_.clone());
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
let stats = fixed_binary_build_statistics(array, type_.clone());
let stats = serialize_statistics(&stats);
(DictPage::new(buffer, array.len(), false), stats)
}
other => {
return Err(Error::NotYetImplemented(format!(
"Writing dictionary arrays to parquet only support data type {other:?}"
)))
}
};
let (dict_page, statistics): (_, Option<ParquetStatistics>) =
match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options, type_),
DataType::Int16 => dyn_prim!(i16, i32, array, options, type_),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
dyn_prim!(i32, i32, array, options, type_)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_),
DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_),
DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_),
DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_),
DataType::UInt64 => dyn_prim!(u64, i64, array, options, type_),
DataType::Float32 => dyn_prim!(f32, f32, array, options, type_),
DataType::Float64 => dyn_prim!(f64, f64, array, options, type_),
DataType::Utf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i32>(array, false, &mut buffer);
let stats = if options.write_statistics {
Some(utf8_build_statistics(array, type_.clone()))
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::LargeUtf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i64>(array, false, &mut buffer);
let stats = if options.write_statistics {
Some(utf8_build_statistics(array, type_.clone()))
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::Binary => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i32>(array, false, &mut buffer);
let stats = if options.write_statistics {
Some(binary_build_statistics(array, type_.clone()))
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
}
DataType::LargeBinary => {
let values = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i64>(values, false, &mut buffer);
let stats = if options.write_statistics {
let mut stats = binary_build_statistics(values, type_.clone());
stats.null_count = Some(array.null_count() as i64);
Some(stats)
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
}
DataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
let stats = if options.write_statistics {
let mut stats = fixed_binary_build_statistics(array, type_.clone());
stats.null_count = Some(array.null_count() as i64);
Some(serialize_statistics(&stats))
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
}
other => {
return Err(Error::NotYetImplemented(format!(
"Writing dictionary arrays to parquet only support data type {other:?}"
)))
}
};
let dict_page = Page::Dict(dict_page);

// write DataPage pointing to DictPage
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics {

Statistics {
distinct_count: UInt64Array::from([None]).boxed(),
null_count: UInt64Array::from([Some(0)]).boxed(),
null_count: UInt64Array::from([Some(1)]).boxed(),
min_value: new_dict(Box::new(Int32Array::from_slice([10]))),
max_value: new_dict(Box::new(Int32Array::from_slice([200]))),
}
Expand Down

0 comments on commit 9f48859

Please sign in to comment.