Skip to content

Commit

Permalink
new statistics api
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 26, 2022
1 parent 06d6af0 commit 145ea36
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 89 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "ddede6c9063a843fd60b793ba9d761653d020e6a"
rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d"
# branch = "polars"
version = "0.10"
default-features = false
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
103 changes: 30 additions & 73 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::ArrowResult;
use arrow::io::parquet::read::statistics::{
deserialize_statistics, PrimitiveStatistics, Statistics, Utf8Statistics,
};
use arrow::io::parquet::read::ColumnChunkMetaData;
use arrow::array::{Array, ArrayRef};
use arrow::compute::concatenate::concatenate;
use arrow::io::parquet::read::statistics::{self, deserialize, Statistics};
use arrow::io::parquet::read::RowGroupMetaData;
use polars_core::prelude::*;

/// The statistics for a column in a Parquet file
Expand All @@ -11,79 +11,37 @@ use polars_core::prelude::*;
/// - min value
/// - null_count
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct ColumnStats(Box<dyn Statistics>);
pub struct ColumnStats(Statistics, Field);

impl ColumnStats {
pub fn dtype(&self) -> DataType {
self.0.data_type().into()
self.1.data_type().clone()
}

pub fn null_count(&self) -> Option<usize> {
self.0.null_count().map(|v| v as usize)
match &self.0.null_count {
statistics::Count::Single(arr) => {
if arr.is_valid(0) {
Some(arr.value(0) as usize)
} else {
None
}
}
_ => None,
}
}

pub fn to_min_max(&self) -> Option<Series> {
let name = "";
use DataType::*;
let s = match self.dtype() {
Float64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Float32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Utf8 => {
let stats = self.0.as_any().downcast_ref::<Utf8Statistics>().unwrap();
Series::new(
name,
[stats.min_value.as_deref(), stats.max_value.as_deref()],
)
}
_ => return None,
};
Some(s)
let max_val = &*self.0.max_value;
let min_val = &*self.0.min_value;

let dtype = DataType::from(min_val.data_type());
if dtype.is_numeric() || matches!(dtype, DataType::Utf8) {
let arr = concatenate(&[min_val, max_val]).unwrap();
Some(Series::try_from(("", Arc::from(arr) as ArrayRef)).unwrap())
} else {
None
}
}
}

Expand All @@ -105,17 +63,16 @@ impl BatchStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[ColumnChunkMetaData],
md: &[RowGroupMetaData],
arrow_schema: &ArrowSchema,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
for st in deserialize_statistics(fld, md)?.into_iter().flatten() {
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st));
}
let st = deserialize(fld, md)?;
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st, Field::from(fld)));
}

Ok(if stats.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let current_row_count = md.num_rows() as u32;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md.columns(), schema)? {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl FallibleStreamingIterator for Bla {
#[must_use]
pub struct ParquetWriter<W> {
writer: W,
compression: write::Compression,
compression: write::CompressionOptions,
statistics: bool,
}

pub use write::Compression as ParquetCompression;
pub use write::CompressionOptions as ParquetCompression;

impl<W> ParquetWriter<W>
where
Expand All @@ -61,13 +61,13 @@ where
{
ParquetWriter {
writer,
compression: write::Compression::Snappy,
compression: write::CompressionOptions::Lz4Raw,
statistics: false,
}
}

/// Set the compression used. Defaults to `Snappy`.
pub fn with_compression(mut self, compression: write::Compression) -> Self {
/// Set the compression used. Defaults to `Lz4Raw`.
pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self {
self.compression = compression;
self
}
Expand Down
13 changes: 10 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,16 @@ mod stats {
let fld_l = self.left.to_field(schema)?;
let fld_r = self.right.to_field(schema)?;

debug_assert_eq!(fld_l.data_type(), fld_r.data_type(), "implementation error");
if fld_l.data_type() != fld_r.data_type() {
return Ok(true);
#[cfg(debug_assertions)]
{
match (fld_l.data_type(), fld_r.data_type()) {
#[cfg(feature = "dtype-categorical")]
(DataType::Utf8, DataType::Categorical(_)) => {}
#[cfg(feature = "dtype-categorical")]
(DataType::Categorical(_), DataType::Utf8) => {}
(l, r) if l != r => panic!("implementation error: {:?}, {:?}", l, r),
_ => {}
}
}

let dummy = DataFrame::new_no_checks(vec![]);
Expand Down
6 changes: 3 additions & 3 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ impl PyDataFrame {
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli,
"lz4" => ParquetCompression::Lz4Raw,
"zstd" => ParquetCompression::Zstd,
"zstd" => ParquetCompression::Zstd(None),
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};

Expand Down

0 comments on commit 145ea36

Please sign in to comment.