Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several projection improvements #2795

Merged
merged 4 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions polars/clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
disallowed-types = ["std::collections::HashMap", "std::collections::HashSet"]
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use ahash::RandomState;
use hashbrown::hash_map::{Entry, RawEntryMut};
use hashbrown::HashMap;
use rayon::prelude::*;
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash, Hasher};

Expand Down Expand Up @@ -1017,7 +1016,7 @@ impl DataFrame {
mut df_right: DataFrame,
suffix: Option<String>,
) -> Result<DataFrame> {
let mut left_names = HashSet::with_capacity_and_hasher(df_left.width(), RandomState::new());
let mut left_names = PlHashSet::with_capacity(df_left.width());

df_left.columns.iter().for_each(|series| {
left_names.insert(series.name());
Expand All @@ -1036,6 +1035,7 @@ impl DataFrame {
df_right.rename(&name, &format!("{}{}", name, suffix))?;
}

drop(left_names);
df_left.hstack_mut(&df_right.columns)?;
Ok(df_left)
}
Expand Down
23 changes: 9 additions & 14 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! DataFrame module.
use std::borrow::Cow;
use std::collections::HashSet;
use std::iter::{FromIterator, Iterator};
use std::mem;
use std::ops;
Expand Down Expand Up @@ -33,7 +32,6 @@ use crate::vector_hasher::boost_hash_combine;
#[cfg(feature = "row_hash")]
use crate::vector_hasher::df_rows_to_hashes_threaded;
use crate::POOL;
use hashbrown::HashMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::hash::{BuildHasher, Hash, Hasher};
Expand Down Expand Up @@ -155,14 +153,6 @@ impl DataFrame {
}
}

fn hash_names(&self) -> HashSet<String, RandomState> {
let mut set = HashSet::with_capacity_and_hasher(self.columns.len(), RandomState::default());
for s in &self.columns {
set.insert(s.name().to_string());
}
set
}

/// Create a DataFrame from a Vector of Series.
///
/// # Example
Expand Down Expand Up @@ -673,7 +663,11 @@ impl DataFrame {
/// }
/// ```
pub fn hstack_mut(&mut self, columns: &[Series]) -> Result<&mut Self> {
let mut names = self.hash_names();
let mut names = PlHashSet::with_capacity(self.columns.len());
for s in &self.columns {
names.insert(s.name());
}

let height = self.height();
// first loop check validity. We don't do this in a single pass otherwise
// this DataFrame is already modified when an error occurs.
Expand All @@ -693,8 +687,9 @@ impl DataFrame {
.into(),
));
}
names.insert(name.to_string());
names.insert(name);
}
drop(names);
Ok(self.hstack_mut_no_checks(columns))
}

Expand Down Expand Up @@ -1280,10 +1275,10 @@ impl DataFrame {

/// A non generic implementation to reduce compiler bloat.
fn select_series_impl(&self, cols: &[String]) -> Result<Vec<Series>> {
let selected = if cols.len() > 1 && self.columns.len() > 300 {
let selected = if cols.len() > 1 && self.columns.len() > 10 {
// we hash, because there are user that having millions of columns.
// # https://github.com/pola-rs/polars/issues/1023
let name_to_idx: HashMap<&str, usize> = self
let name_to_idx: PlHashMap<&str, usize> = self
.columns
.iter()
.enumerate()
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ test = [
[dependencies]
ahash = "0.7"
glob = "0.3"
parking_lot = "0.12"
rayon = "1.5"
regex = { version = "1.5", optional = true }

Expand Down
11 changes: 4 additions & 7 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ use crate::logical_plan::ParquetOptions;
use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
use ahash::RandomState;
use polars_core::prelude::*;
use polars_utils::arena::{Arena, Node};
use std::collections::HashSet;
#[cfg(any(feature = "csv-file", feature = "parquet"))]
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -669,18 +667,16 @@ impl<'a> ALogicalPlanBuilder<'a> {
let schema_right = self.lp_arena.get(other).schema(self.lp_arena);

// column names of left table
let mut names: HashSet<&str, RandomState> = HashSet::with_capacity_and_hasher(
schema_left.len() + schema_right.len(),
Default::default(),
);
let mut names: PlHashSet<&str> =
PlHashSet::with_capacity(schema_left.len() + schema_right.len());
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
names.insert(name.as_str());
new_schema.with_column(name.to_string(), dtype.clone())
}

let right_names: HashSet<_, RandomState> = right_on
let right_names: PlHashSet<_> = right_on
.iter()
.map(|e| match self.expr_arena.get(*e) {
AExpr::Alias(_, name) => name.clone(),
Expand Down Expand Up @@ -710,6 +706,7 @@ impl<'a> ALogicalPlanBuilder<'a> {
right_on,
options,
};
drop(names);
let root = self.lp_arena.add(lp);
Self::new(root, self.expr_arena, self.lp_arena)
}
Expand Down
9 changes: 4 additions & 5 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::logical_plan::projection::rewrite_projections;
use crate::prelude::*;
use crate::utils;
use crate::utils::{combine_predicates_expr, has_expr};
use ahash::RandomState;
use polars_core::prelude::*;
use polars_io::csv::CsvEncoding;
#[cfg(feature = "csv-file")]
Expand All @@ -17,7 +16,6 @@ use polars_io::{
csv::NullValues,
csv_core::utils::{get_reader_bytes, is_compressed},
};
use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;

Expand Down Expand Up @@ -378,22 +376,22 @@ impl LogicalPlanBuilder {
let schema_right = other.schema();

// column names of left table
let mut names: HashSet<&String, RandomState> = HashSet::default();
let mut names: PlHashSet<&str> = PlHashSet::default();
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
names.insert(name);
new_schema.with_column(name.to_string(), dtype.clone())
}

let right_names: HashSet<_, RandomState> = right_on
let right_names: PlHashSet<_> = right_on
.iter()
.map(|e| utils::expr_output_name(e).expect("could not find name"))
.collect();

for (name, dtype) in schema_right.iter() {
if !right_names.iter().any(|s| s.as_ref() == name) {
if names.contains(name) {
if names.contains(&**name) {
let new_name = format!("{}{}", name, options.suffix.as_ref());
new_schema.with_column(new_name, dtype.clone())
} else {
Expand All @@ -404,6 +402,7 @@ impl LogicalPlanBuilder {

let schema = Arc::new(new_schema);

drop(names);
LogicalPlan::Join {
input_left: Box::new(self.0),
input_right: Box::new(other),
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct GroupByExec {
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
}

impl GroupByExec {
Expand All @@ -22,13 +23,15 @@ impl GroupByExec {
aggs: Vec<Arc<dyn PhysicalExpr>>,
apply: Option<Arc<dyn DataFrameUdf>>,
maintain_order: bool,
input_schema: SchemaRef,
) -> Self {
Self {
input,
keys,
aggs,
apply,
maintain_order,
input_schema,
}
}
}
Expand All @@ -44,6 +47,7 @@ fn groupby_helper(
let gb = df.groupby_with_series(keys, true, maintain_order)?;

if let Some(f) = apply {
state.clear_schema_cache();
return gb.apply(|df| f.call_udf(df));
}

Expand Down Expand Up @@ -74,6 +78,7 @@ fn groupby_helper(
let agg_columns = agg_columns?;

columns.extend(agg_columns.into_iter().flatten());
state.clear_schema_cache();
DataFrame::new(columns)
}

Expand All @@ -83,6 +88,7 @@ impl Executor for GroupByExec {
eprintln!("aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());
let keys = self
.keys
.iter()
Expand Down Expand Up @@ -289,6 +295,7 @@ impl Executor for PartitionGroupByExec {
// MERGE phase
// merge and hash aggregate again
let df = accumulate_dataframes_vertical(dfs)?;
// the partitioned groupby has added columns so we must update the schema.
let key = self.key.evaluate(&df, state)?;

// first get mutable access and optionally sort
Expand Down Expand Up @@ -321,6 +328,7 @@ impl Executor for PartitionGroupByExec {
POOL.install(|| rayon::join(get_columns, get_agg));

columns.extend(agg_columns);
state.clear_schema_cache();

let df = DataFrame::new_no_checks(columns);
Ok(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ pub(crate) struct GroupByDynamicExec {
pub(crate) keys: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: DynamicGroupOptions,
pub(crate) input_schema: SchemaRef,
}

impl Executor for GroupByDynamicExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());
let keys = self
.keys
.iter()
Expand Down Expand Up @@ -44,6 +46,7 @@ impl Executor for GroupByDynamicExec {
.collect::<Result<Vec<_>>>()
})?;

state.clear_schema_cache();
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
columns.extend(keys);
columns.push(time_key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ pub(crate) struct GroupByRollingExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) aggs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) options: RollingGroupOptions,
pub(crate) input_schema: SchemaRef,
}

impl Executor for GroupByRollingExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());

let (time_key, groups) = df.groupby_rolling(&self.options)?;

Expand All @@ -36,6 +38,7 @@ impl Executor for GroupByRollingExec {
.collect::<Result<Vec<_>>>()
})?;

state.clear_schema_cache();
let mut columns = Vec::with_capacity(agg_columns.len() + 1);
columns.push(time_key);
columns.extend(agg_columns.into_iter().flatten());
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub(crate) fn evaluate_physical_expressions(
.collect::<Result<_>>()
})?
};
state.clear_schema_cache();

check_expand_literals(selected_columns, zero_length)
}
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ pub struct ProjectionExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) has_windows: bool,
pub(crate) input_schema: SchemaRef,
#[cfg(test)]
pub(crate) schema: SchemaRef,
}

impl Executor for ProjectionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());

let df = evaluate_physical_expressions(&df, &self.expr, state, self.has_windows);

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl Executor for DataFrameExec {
// projection should be before selection as those are free
// TODO: this is only the case if we don't create new columns
if let Some(projection) = &self.projection {
state.may_set_schema(&df, projection.len());
df = evaluate_physical_expressions(&df, projection, state, self.has_windows)?;
}

Expand All @@ -274,6 +275,7 @@ impl Executor for DataFrameExec {
})?;
df = df.filter(mask)?;
}
state.clear_schema_cache();

if let Some(limit) = set_n_rows(None) {
Ok(df.head(Some(limit)))
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ pub struct StackExec {
pub(crate) input: Box<dyn Executor>,
pub(crate) has_windows: bool,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) input_schema: SchemaRef,
}

impl Executor for StackExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;

state.set_schema(self.input_schema.clone());
let res = if self.has_windows {
// we have a different run here
// to ensure the window functions run sequential and share caches
Expand All @@ -26,12 +28,13 @@ impl Executor for StackExec {
.collect::<Result<Vec<_>>>()
})?
};
state.clear_schema_cache();
state.clear_expr_cache();

for s in res {
df.with_column(s)?;
}

state.clear_expr_cache();
Ok(df)
}
}
18 changes: 10 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> &Expr {
&self.1
}
fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> Result<Series> {
let column = match &*self.0 {
"" => df.select_at_idx(0).ok_or_else(|| {
PolarsError::NoData("could not select a column from an empty DataFrame".into())
})?,
_ => df.column(&self.0)?,
};
Ok(column.clone())
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
match state.get_schema() {
None => df.column(&self.0).cloned(),
Some(schema) => {
let (idx, _, _) = schema
.get_full(&self.0)
.ok_or_else(|| PolarsError::NotFound(self.0.to_string()))?;
Ok(df.get_columns()[idx].clone())
}
}
}
#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
Expand Down
Loading