Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update arrow #3181

Merged
merged 3 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "36b08249dd03c8b8f88f454158fcf3401c647a49"
branch = "polars"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d"
# branch = "polars"
version = "0.10"
default-features = false
features = [
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/chunked_array/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ where
arr.null_bitmap = validity;
Box::new(arr)
}
fn to_boxed(&self) -> Box<dyn Array> {
Box::new(self.clone())
}
}

impl<T> ObjectChunked<T>
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false }
csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
Expand Down
103 changes: 30 additions & 73 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::ArrowResult;
use arrow::io::parquet::read::statistics::{
deserialize_statistics, PrimitiveStatistics, Statistics, Utf8Statistics,
};
use arrow::io::parquet::read::ColumnChunkMetaData;
use arrow::array::{Array, ArrayRef};
use arrow::compute::concatenate::concatenate;
use arrow::io::parquet::read::statistics::{self, deserialize, Statistics};
use arrow::io::parquet::read::RowGroupMetaData;
use polars_core::prelude::*;

/// The statistics for a column in a Parquet file
Expand All @@ -11,79 +11,37 @@ use polars_core::prelude::*;
/// - min value
/// - null_count
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct ColumnStats(Box<dyn Statistics>);
pub struct ColumnStats(Statistics, Field);

impl ColumnStats {
pub fn dtype(&self) -> DataType {
self.0.data_type().into()
self.1.data_type().clone()
}

pub fn null_count(&self) -> Option<usize> {
self.0.null_count().map(|v| v as usize)
match &self.0.null_count {
statistics::Count::Single(arr) => {
if arr.is_valid(0) {
Some(arr.value(0) as usize)
} else {
None
}
}
_ => None,
}
}

pub fn to_min_max(&self) -> Option<Series> {
let name = "";
use DataType::*;
let s = match self.dtype() {
Float64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Float32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Utf8 => {
let stats = self.0.as_any().downcast_ref::<Utf8Statistics>().unwrap();
Series::new(
name,
[stats.min_value.as_deref(), stats.max_value.as_deref()],
)
}
_ => return None,
};
Some(s)
let max_val = &*self.0.max_value;
let min_val = &*self.0.min_value;

let dtype = DataType::from(min_val.data_type());
if dtype.is_numeric() || matches!(dtype, DataType::Utf8) {
let arr = concatenate(&[min_val, max_val]).unwrap();
Some(Series::try_from(("", Arc::from(arr) as ArrayRef)).unwrap())
} else {
None
}
}
}

Expand All @@ -105,17 +63,16 @@ impl BatchStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[ColumnChunkMetaData],
md: &[RowGroupMetaData],
arrow_schema: &ArrowSchema,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
for st in deserialize_statistics(fld, md)?.into_iter().flatten() {
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st));
}
let st = deserialize(fld, md)?;
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st, Field::from(fld)));
}

Ok(if stats.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let current_row_count = md.num_rows() as u32;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md.columns(), schema)? {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
22 changes: 13 additions & 9 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl FallibleStreamingIterator for Bla {
#[must_use]
pub struct ParquetWriter<W> {
writer: W,
compression: write::Compression,
compression: write::CompressionOptions,
statistics: bool,
}

pub use write::Compression as ParquetCompression;
pub use write::CompressionOptions as ParquetCompression;

impl<W> ParquetWriter<W>
where
Expand All @@ -61,13 +61,13 @@ where
{
ParquetWriter {
writer,
compression: write::Compression::Snappy,
compression: write::CompressionOptions::Lz4Raw,
statistics: false,
}
}

/// Set the compression used. Defaults to `Snappy`.
pub fn with_compression(mut self, compression: write::Compression) -> Self {
/// Set the compression used. Defaults to `Lz4Raw`.
pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self {
self.compression = compression;
self
}
Expand Down Expand Up @@ -111,8 +111,12 @@ where
.zip(parquet_schema.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let encoded_pages =
array_to_pages(array.as_ref(), descriptor.clone(), options, *encoding)?;
let encoded_pages = array_to_pages(
array.as_ref(),
descriptor.descriptor.clone(),
options,
*encoding,
)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
Expand All @@ -133,8 +137,8 @@ where
// write the headers
writer.start()?;
for group in row_group_iter {
let (group, len) = group?;
writer.write(group, len)?;
let (group, _len) = group?;
writer.write(group)?;
}
let _ = writer.end(None)?;

Expand Down
13 changes: 10 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,16 @@ mod stats {
let fld_l = self.left.to_field(schema)?;
let fld_r = self.right.to_field(schema)?;

debug_assert_eq!(fld_l.data_type(), fld_r.data_type(), "implementation error");
if fld_l.data_type() != fld_r.data_type() {
return Ok(true);
#[cfg(debug_assertions)]
{
match (fld_l.data_type(), fld_r.data_type()) {
#[cfg(feature = "dtype-categorical")]
(DataType::Utf8, DataType::Categorical(_)) => {}
#[cfg(feature = "dtype-categorical")]
(DataType::Categorical(_), DataType::Utf8) => {}
(l, r) if l != r => panic!("implementation error: {:?}, {:?}", l, r),
_ => {}
}
}

let dummy = DataFrame::new_no_checks(vec![]);
Expand Down
39 changes: 11 additions & 28 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ impl PyDataFrame {
"gzip" => ParquetCompression::Gzip,
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli,
"lz4" => ParquetCompression::Lz4,
"zstd" => ParquetCompression::Zstd,
"lz4" => ParquetCompression::Lz4Raw,
"zstd" => ParquetCompression::Zstd(None),
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};

Expand Down