diff --git a/polars/polars-arrow/Cargo.toml b/polars/polars-arrow/Cargo.toml index c1f58895c006..198b5724db87 100644 --- a/polars/polars-arrow/Cargo.toml +++ b/polars/polars-arrow/Cargo.toml @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] } hashbrown = "0.12" diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index fa2fe1dacab5..98229b90bff7 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -171,7 +171,7 @@ thiserror = "^1.0" package = "arrow2" git = "https://github.com/jorgecarleitao/arrow2" # git = "https://github.com/ritchie46/arrow2" -rev = "ddede6c9063a843fd60b793ba9d761653d020e6a" +rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d" # branch = "polars" version = "0.10" default-features = false diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 0b34f6845326..19fb82f08349 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -34,7 +34,7 @@ private = ["polars-time/private"] [dependencies] ahash = "0.7" anyhow = "1.0" -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false } csv-core = { version = "0.1.10", optional = true } diff --git a/polars/polars-io/src/parquet/predicates.rs b/polars/polars-io/src/parquet/predicates.rs index c6e5927024c7..28dc89321b3d 100644 --- a/polars/polars-io/src/parquet/predicates.rs +++ b/polars/polars-io/src/parquet/predicates.rs @@ -1,8 +1,8 @@ use crate::ArrowResult; -use arrow::io::parquet::read::statistics::{ - deserialize_statistics, PrimitiveStatistics, Statistics, Utf8Statistics, -}; -use arrow::io::parquet::read::ColumnChunkMetaData; +use arrow::array::{Array, ArrayRef}; +use arrow::compute::concatenate::concatenate; +use arrow::io::parquet::read::statistics::{self, deserialize, Statistics}; +use arrow::io::parquet::read::RowGroupMetaData; use polars_core::prelude::*; /// The statistics for a column in a Parquet file @@ -11,79 +11,37 @@ use polars_core::prelude::*; /// - min value /// - null_count #[cfg_attr(debug_assertions, derive(Debug))] -pub struct ColumnStats(Box); +pub struct ColumnStats(Statistics, Field); impl ColumnStats { pub fn dtype(&self) -> DataType { - self.0.data_type().into() + self.1.data_type().clone() } pub fn null_count(&self) -> Option { - self.0.null_count().map(|v| v as usize) + match &self.0.null_count { + statistics::Count::Single(arr) => { + if arr.is_valid(0) { + Some(arr.value(0) as usize) + } else { + None + } + } + _ => None, + } } pub fn to_min_max(&self) -> Option { - let name = ""; - use DataType::*; - let s = match self.dtype() { - Float64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Float32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Int64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Int32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - UInt32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - UInt64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Utf8 => { - let stats = self.0.as_any().downcast_ref::().unwrap(); - Series::new( - name, - [stats.min_value.as_deref(), stats.max_value.as_deref()], - ) - } - _ => return None, - }; - Some(s) + let max_val = &*self.0.max_value; + let min_val = &*self.0.min_value; + + let dtype = DataType::from(min_val.data_type()); + if dtype.is_numeric() || matches!(dtype, DataType::Utf8) { + let arr = concatenate(&[min_val, max_val]).unwrap(); + Some(Series::try_from(("", Arc::from(arr) as ArrayRef)).unwrap()) + } else { + None + } } } @@ -105,17 +63,16 @@ impl BatchStats { /// Collect the statistics in a column chunk. pub(crate) fn collect_statistics( - md: &[ColumnChunkMetaData], + md: &[RowGroupMetaData], arrow_schema: &ArrowSchema, ) -> ArrowResult> { let mut schema = Schema::with_capacity(arrow_schema.fields.len()); let mut stats = vec![]; for fld in &arrow_schema.fields { - for st in deserialize_statistics(fld, md)?.into_iter().flatten() { - schema.with_column(fld.name.to_string(), (&fld.data_type).into()); - stats.push(ColumnStats(st)); - } + let st = deserialize(fld, md)?; + schema.with_column(fld.name.to_string(), (&fld.data_type).into()); + stats.push(ColumnStats(st, Field::from(fld))); } Ok(if stats.is_empty() { diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index e3a2278b1aa4..45a8765c4ad1 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -62,7 +62,7 @@ pub fn read_parquet( let current_row_count = md.num_rows() as u32; if let Some(pred) = &predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(md.columns(), schema)? { + if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? { let should_read = pred.should_read(&stats); // a parquet file may not have statistics of all columns if matches!(should_read, Ok(false)) { diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index 0afc11ff3f00..68f68458d4b6 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -44,11 +44,11 @@ impl FallibleStreamingIterator for Bla { #[must_use] pub struct ParquetWriter { writer: W, - compression: write::Compression, + compression: write::CompressionOptions, statistics: bool, } -pub use write::Compression as ParquetCompression; +pub use write::CompressionOptions as ParquetCompression; impl ParquetWriter where @@ -61,13 +61,13 @@ where { ParquetWriter { writer, - compression: write::Compression::Snappy, + compression: write::CompressionOptions::Lz4Raw, statistics: false, } } - /// Set the compression used. Defaults to `Snappy`. - pub fn with_compression(mut self, compression: write::Compression) -> Self { + /// Set the compression used. Defaults to `Lz4Raw`. + pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self { self.compression = compression; self } diff --git a/polars/polars-lazy/src/physical_plan/expressions/binary.rs b/polars/polars-lazy/src/physical_plan/expressions/binary.rs index afc2daae439e..b28ab005d242 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/binary.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/binary.rs @@ -419,9 +419,16 @@ mod stats { let fld_l = self.left.to_field(schema)?; let fld_r = self.right.to_field(schema)?; - debug_assert_eq!(fld_l.data_type(), fld_r.data_type(), "implementation error"); - if fld_l.data_type() != fld_r.data_type() { - return Ok(true); + #[cfg(debug_assertions)] + { + match (fld_l.data_type(), fld_r.data_type()) { + #[cfg(feature = "dtype-categorical")] + (DataType::Utf8, DataType::Categorical(_)) => {} + #[cfg(feature = "dtype-categorical")] + (DataType::Categorical(_), DataType::Utf8) => {} + (l, r) if l != r => panic!("implementation error: {:?}, {:?}", l, r), + _ => {} + } } let dummy = DataFrame::new_no_checks(vec![]); diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index ced3f3d1fd8b..ac70cd304dcc 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.10.1" -source = "git+https://github.com/jorgecarleitao/arrow2?rev=ddede6c9063a843fd60b793ba9d761653d020e6a#ddede6c9063a843fd60b793ba9d761653d020e6a" +source = "git+https://github.com/jorgecarleitao/arrow2?rev=e478619ef1f07c472b4e5daa35bfa08dbd87591d#e478619ef1f07c472b4e5daa35bfa08dbd87591d" dependencies = [ "arrow-format", "avro-schema", @@ -1099,9 +1099,9 @@ dependencies = [ [[package]] name = "parquet2" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "770387177ba511db3839e8f243d82f483ecdfdc3619a67a41d29fdd23f355127" +checksum = "98f99f9724402d81faadd9cfa1e8dc78055fd0ddfdbefb7adab3a3a13e893408" dependencies = [ "async-stream", "bitpacking", diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index 610d9ff5881d..f0f2916b9a94 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -530,7 +530,7 @@ impl PyDataFrame { "lzo" => ParquetCompression::Lzo, "brotli" => ParquetCompression::Brotli, "lz4" => ParquetCompression::Lz4Raw, - "zstd" => ParquetCompression::Zstd, + "zstd" => ParquetCompression::Zstd(None), s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()), };