From ad7befcc626d7f5c0acaa8d555c6565febfde7f3 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 19 Apr 2022 08:49:34 +0200 Subject: [PATCH 1/3] update arrow --- polars/polars-arrow/Cargo.toml | 4 +- polars/polars-core/Cargo.toml | 8 ++-- .../src/chunked_array/object/mod.rs | 3 ++ polars/polars-io/Cargo.toml | 4 +- polars/polars-io/src/parquet/write.rs | 12 ++++-- py-polars/Cargo.lock | 39 ++++++------------- 6 files changed, 30 insertions(+), 40 deletions(-) diff --git a/polars/polars-arrow/Cargo.toml b/polars/polars-arrow/Cargo.toml index 3db53f747480..c1f58895c006 100644 --- a/polars/polars-arrow/Cargo.toml +++ b/polars/polars-arrow/Cargo.toml @@ -9,8 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false } -arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] } hashbrown = "0.12" num = "^0.4" diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index cfa6f5331508..fa2fe1dacab5 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -169,10 +169,10 @@ thiserror = "^1.0" [dependencies.arrow] package = "arrow2" -# git = "https://github.com/jorgecarleitao/arrow2" -git = "https://github.com/ritchie46/arrow2" -# rev = "36b08249dd03c8b8f88f454158fcf3401c647a49" -branch = "polars" +git = "https://github.com/jorgecarleitao/arrow2" +# git = "https://github.com/ritchie46/arrow2" +rev = "ddede6c9063a843fd60b793ba9d761653d020e6a" +# branch = "polars" version = "0.10" default-features = false features = [ diff --git a/polars/polars-core/src/chunked_array/object/mod.rs b/polars/polars-core/src/chunked_array/object/mod.rs index 1f426e1e4c8e..06154c5ddea7 100644 --- a/polars/polars-core/src/chunked_array/object/mod.rs +++ b/polars/polars-core/src/chunked_array/object/mod.rs @@ -143,6 +143,9 @@ where arr.null_bitmap = validity; Box::new(arr) } + fn to_boxed(&self) -> Box { + Box::new(self.clone()) + } } impl ObjectChunked diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 00be3adee3d8..0b34f6845326 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -34,8 +34,8 @@ private = ["polars-time/private"] [dependencies] ahash = "0.7" anyhow = "1.0" -# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false } -arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false } csv-core = { version = "0.1.10", optional = true } dirs = "4.0" diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index 9971ea35b49d..0afc11ff3f00 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -111,8 +111,12 @@ where .zip(parquet_schema.columns().par_iter()) .zip(encodings.par_iter()) .map(|((array, descriptor), encoding)| { - let encoded_pages = - array_to_pages(array.as_ref(), descriptor.clone(), options, *encoding)?; + let encoded_pages = array_to_pages( + array.as_ref(), + descriptor.descriptor.clone(), + options, + *encoding, + )?; encoded_pages .map(|page| { compress(page?, vec![], options.compression).map_err(|x| x.into()) @@ -133,8 +137,8 @@ where // write the headers writer.start()?; for group in row_group_iter { - let (group, len) = group?; - writer.write(group, len)?; + let (group, _len) = group?; + writer.write(group)?; } let _ = writer.end(None)?; diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 6c035251f0fd..ced3f3d1fd8b 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.10.1" -source = "git+https://github.com/ritchie46/arrow2?branch=polars#da703ae38b5778b7642c3ea50763daddeb6d05b2" +source = "git+https://github.com/jorgecarleitao/arrow2?rev=ddede6c9063a843fd60b793ba9d761653d020e6a#ddede6c9063a843fd60b793ba9d761653d020e6a" dependencies = [ "arrow-format", "avro-schema", @@ -222,12 +222,6 @@ dependencies = [ "syn", ] -[[package]] -name = "byteorder" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" - [[package]] name = "cc" version = "1.0.73" @@ -1069,15 +1063,6 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" -[[package]] -name = "ordered-float" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" -dependencies = [ - "num-traits", -] - [[package]] name = "parking_lot" version = "0.12.0" @@ -1103,22 +1088,20 @@ dependencies = [ [[package]] name = "parquet-format-async-temp" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03abc2f9c83fe9ceec83f47c76cc071bfd56caba33794340330f35623ab1f544" +checksum = "488c8b5f43521d019fade4bcc0ce88cce5da5fd26eb1d38b933807041f5930bf" dependencies = [ "async-trait", - "byteorder", "futures", "integer-encoding", - "ordered-float", ] [[package]] name = "parquet2" -version = "0.10.3" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b085f9e78e4842865151b693f6d94bdf7b280af66daa6e3587adeb3106a07e9" +checksum = "770387177ba511db3839e8f243d82f483ecdfdc3619a67a41d29fdd23f355127" dependencies = [ "async-stream", "bitpacking", @@ -1825,18 +1808,18 @@ checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" [[package]] name = "zstd" -version = "0.10.0+zstd.1.5.2" +version = "0.11.1+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd" +checksum = "77a16b8414fde0414e90c612eba70985577451c4c504b99885ebed24762cb81a" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.4+zstd.1.5.2" +version = "5.0.1+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" +checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" dependencies = [ "libc", "zstd-sys", @@ -1844,9 +1827,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.3+zstd.1.5.2" +version = "2.0.1+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" +checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" dependencies = [ "cc", "libc", From 06d6af05606ec40c8ca72d570b4ec6aa6d8068cd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 19 Apr 2022 10:24:14 +0200 Subject: [PATCH 2/3] use lz4 raw --- py-polars/src/dataframe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index bc93284d4ac7..610d9ff5881d 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -529,7 +529,7 @@ impl PyDataFrame { "gzip" => ParquetCompression::Gzip, "lzo" => ParquetCompression::Lzo, "brotli" => ParquetCompression::Brotli, - "lz4" => ParquetCompression::Lz4, + "lz4" => ParquetCompression::Lz4Raw, "zstd" => ParquetCompression::Zstd, s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()), }; From 145ea3621f626f5d55cede74aacef155e0e0ebbd Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 26 Apr 2022 15:15:44 +0200 Subject: [PATCH 3/3] new statistics api --- polars/polars-arrow/Cargo.toml | 2 +- polars/polars-core/Cargo.toml | 2 +- polars/polars-io/Cargo.toml | 2 +- polars/polars-io/src/parquet/predicates.rs | 103 +++++------------- polars/polars-io/src/parquet/read_impl.rs | 2 +- polars/polars-io/src/parquet/write.rs | 10 +- .../src/physical_plan/expressions/binary.rs | 13 ++- py-polars/Cargo.lock | 6 +- py-polars/src/dataframe.rs | 2 +- 9 files changed, 53 insertions(+), 89 deletions(-) diff --git a/polars/polars-arrow/Cargo.toml b/polars/polars-arrow/Cargo.toml index c1f58895c006..198b5724db87 100644 --- a/polars/polars-arrow/Cargo.toml +++ b/polars/polars-arrow/Cargo.toml @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] } hashbrown = "0.12" diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index fa2fe1dacab5..98229b90bff7 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -171,7 +171,7 @@ thiserror = "^1.0" package = "arrow2" git = "https://github.com/jorgecarleitao/arrow2" # git = "https://github.com/ritchie46/arrow2" -rev = "ddede6c9063a843fd60b793ba9d761653d020e6a" +rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d" # branch = "polars" version = "0.10" default-features = false diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 0b34f6845326..19fb82f08349 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -34,7 +34,7 @@ private = ["polars-time/private"] [dependencies] ahash = "0.7" anyhow = "1.0" -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "ddede6c9063a843fd60b793ba9d761653d020e6a", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.10", default-features = false } csv-core = { version = "0.1.10", optional = true } diff --git a/polars/polars-io/src/parquet/predicates.rs b/polars/polars-io/src/parquet/predicates.rs index c6e5927024c7..28dc89321b3d 100644 --- a/polars/polars-io/src/parquet/predicates.rs +++ b/polars/polars-io/src/parquet/predicates.rs @@ -1,8 +1,8 @@ use crate::ArrowResult; -use arrow::io::parquet::read::statistics::{ - deserialize_statistics, PrimitiveStatistics, Statistics, Utf8Statistics, -}; -use arrow::io::parquet::read::ColumnChunkMetaData; +use arrow::array::{Array, ArrayRef}; +use arrow::compute::concatenate::concatenate; +use arrow::io::parquet::read::statistics::{self, deserialize, Statistics}; +use arrow::io::parquet::read::RowGroupMetaData; use polars_core::prelude::*; /// The statistics for a column in a Parquet file @@ -11,79 +11,37 @@ use polars_core::prelude::*; /// - min value /// - null_count #[cfg_attr(debug_assertions, derive(Debug))] -pub struct ColumnStats(Box); +pub struct ColumnStats(Statistics, Field); impl ColumnStats { pub fn dtype(&self) -> DataType { - self.0.data_type().into() + self.1.data_type().clone() } pub fn null_count(&self) -> Option { - self.0.null_count().map(|v| v as usize) + match &self.0.null_count { + statistics::Count::Single(arr) => { + if arr.is_valid(0) { + Some(arr.value(0) as usize) + } else { + None + } + } + _ => None, + } } pub fn to_min_max(&self) -> Option { - let name = ""; - use DataType::*; - let s = match self.dtype() { - Float64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Float32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Int64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Int32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - UInt32 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - UInt64 => { - let stats = self - .0 - .as_any() - .downcast_ref::>() - .unwrap(); - Series::new(name, [stats.min_value, stats.max_value]) - } - Utf8 => { - let stats = self.0.as_any().downcast_ref::().unwrap(); - Series::new( - name, - [stats.min_value.as_deref(), stats.max_value.as_deref()], - ) - } - _ => return None, - }; - Some(s) + let max_val = &*self.0.max_value; + let min_val = &*self.0.min_value; + + let dtype = DataType::from(min_val.data_type()); + if dtype.is_numeric() || matches!(dtype, DataType::Utf8) { + let arr = concatenate(&[min_val, max_val]).unwrap(); + Some(Series::try_from(("", Arc::from(arr) as ArrayRef)).unwrap()) + } else { + None + } } } @@ -105,17 +63,16 @@ impl BatchStats { /// Collect the statistics in a column chunk. pub(crate) fn collect_statistics( - md: &[ColumnChunkMetaData], + md: &[RowGroupMetaData], arrow_schema: &ArrowSchema, ) -> ArrowResult> { let mut schema = Schema::with_capacity(arrow_schema.fields.len()); let mut stats = vec![]; for fld in &arrow_schema.fields { - for st in deserialize_statistics(fld, md)?.into_iter().flatten() { - schema.with_column(fld.name.to_string(), (&fld.data_type).into()); - stats.push(ColumnStats(st)); - } + let st = deserialize(fld, md)?; + schema.with_column(fld.name.to_string(), (&fld.data_type).into()); + stats.push(ColumnStats(st, Field::from(fld))); } Ok(if stats.is_empty() { diff --git a/polars/polars-io/src/parquet/read_impl.rs b/polars/polars-io/src/parquet/read_impl.rs index e3a2278b1aa4..45a8765c4ad1 100644 --- a/polars/polars-io/src/parquet/read_impl.rs +++ b/polars/polars-io/src/parquet/read_impl.rs @@ -62,7 +62,7 @@ pub fn read_parquet( let current_row_count = md.num_rows() as u32; if let Some(pred) = &predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(md.columns(), schema)? { + if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? { let should_read = pred.should_read(&stats); // a parquet file may not have statistics of all columns if matches!(should_read, Ok(false)) { diff --git a/polars/polars-io/src/parquet/write.rs b/polars/polars-io/src/parquet/write.rs index 0afc11ff3f00..68f68458d4b6 100644 --- a/polars/polars-io/src/parquet/write.rs +++ b/polars/polars-io/src/parquet/write.rs @@ -44,11 +44,11 @@ impl FallibleStreamingIterator for Bla { #[must_use] pub struct ParquetWriter { writer: W, - compression: write::Compression, + compression: write::CompressionOptions, statistics: bool, } -pub use write::Compression as ParquetCompression; +pub use write::CompressionOptions as ParquetCompression; impl ParquetWriter where @@ -61,13 +61,13 @@ where { ParquetWriter { writer, - compression: write::Compression::Snappy, + compression: write::CompressionOptions::Lz4Raw, statistics: false, } } - /// Set the compression used. Defaults to `Snappy`. - pub fn with_compression(mut self, compression: write::Compression) -> Self { + /// Set the compression used. Defaults to `Lz4Raw`. + pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self { self.compression = compression; self } diff --git a/polars/polars-lazy/src/physical_plan/expressions/binary.rs b/polars/polars-lazy/src/physical_plan/expressions/binary.rs index afc2daae439e..b28ab005d242 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/binary.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/binary.rs @@ -419,9 +419,16 @@ mod stats { let fld_l = self.left.to_field(schema)?; let fld_r = self.right.to_field(schema)?; - debug_assert_eq!(fld_l.data_type(), fld_r.data_type(), "implementation error"); - if fld_l.data_type() != fld_r.data_type() { - return Ok(true); + #[cfg(debug_assertions)] + { + match (fld_l.data_type(), fld_r.data_type()) { + #[cfg(feature = "dtype-categorical")] + (DataType::Utf8, DataType::Categorical(_)) => {} + #[cfg(feature = "dtype-categorical")] + (DataType::Categorical(_), DataType::Utf8) => {} + (l, r) if l != r => panic!("implementation error: {:?}, {:?}", l, r), + _ => {} + } } let dummy = DataFrame::new_no_checks(vec![]); diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index ced3f3d1fd8b..ac70cd304dcc 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.10.1" -source = "git+https://github.com/jorgecarleitao/arrow2?rev=ddede6c9063a843fd60b793ba9d761653d020e6a#ddede6c9063a843fd60b793ba9d761653d020e6a" +source = "git+https://github.com/jorgecarleitao/arrow2?rev=e478619ef1f07c472b4e5daa35bfa08dbd87591d#e478619ef1f07c472b4e5daa35bfa08dbd87591d" dependencies = [ "arrow-format", "avro-schema", @@ -1099,9 +1099,9 @@ dependencies = [ [[package]] name = "parquet2" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "770387177ba511db3839e8f243d82f483ecdfdc3619a67a41d29fdd23f355127" +checksum = "98f99f9724402d81faadd9cfa1e8dc78055fd0ddfdbefb7adab3a3a13e893408" dependencies = [ "async-stream", "bitpacking", diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index 610d9ff5881d..f0f2916b9a94 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -530,7 +530,7 @@ impl PyDataFrame { "lzo" => ParquetCompression::Lzo, "brotli" => ParquetCompression::Brotli, "lz4" => ParquetCompression::Lz4Raw, - "zstd" => ParquetCompression::Zstd, + "zstd" => ParquetCompression::Zstd(None), s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()), };