From 1455800d6e524705e3e93919d01670b8bd3520a0 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 28 Feb 2022 15:53:28 +0100 Subject: [PATCH 1/4] cache schema in projection and use it --- polars/polars-lazy/Cargo.toml | 1 + .../src/physical_plan/executors/mod.rs | 4 ++ .../src/physical_plan/executors/stack.rs | 4 +- .../src/physical_plan/expressions/column.rs | 18 +++++---- .../src/physical_plan/expressions/window.rs | 8 ++-- polars/polars-lazy/src/physical_plan/state.rs | 37 +++++++++++++++---- polars/polars-utils/Cargo.toml | 2 +- py-polars/Cargo.lock | 3 +- 8 files changed, 54 insertions(+), 23 deletions(-) diff --git a/polars/polars-lazy/Cargo.toml b/polars/polars-lazy/Cargo.toml index 4a11de423c86..dc3aa280579a 100644 --- a/polars/polars-lazy/Cargo.toml +++ b/polars/polars-lazy/Cargo.toml @@ -86,6 +86,7 @@ test = [ [dependencies] ahash = "0.7" glob = "0.3" +parking_lot = "0.12" rayon = "1.5" regex = { version = "1.5", optional = true } diff --git a/polars/polars-lazy/src/physical_plan/executors/mod.rs b/polars/polars-lazy/src/physical_plan/executors/mod.rs index 21c468b47376..f44a4d0ca4d9 100644 --- a/polars/polars-lazy/src/physical_plan/executors/mod.rs +++ b/polars/polars-lazy/src/physical_plan/executors/mod.rs @@ -132,6 +132,9 @@ pub(crate) fn evaluate_physical_expressions( state: &ExecutionState, has_windows: bool, ) -> Result { + if exprs.len() > 1 && df.get_columns().len() > 10 { + state.set_schema(Arc::new(df.schema())); + } let zero_length = df.height() == 0; let selected_columns = if has_windows { execute_projection_cached_window_fns(df, exprs, state)? @@ -143,6 +146,7 @@ pub(crate) fn evaluate_physical_expressions( .collect::>() })? }; + state.clear_schema_cache(); check_expand_literals(selected_columns, zero_length) } diff --git a/polars/polars-lazy/src/physical_plan/executors/stack.rs b/polars/polars-lazy/src/physical_plan/executors/stack.rs index 1519a46764b2..ce25dfe5fcbb 100644 --- a/polars/polars-lazy/src/physical_plan/executors/stack.rs +++ b/polars/polars-lazy/src/physical_plan/executors/stack.rs @@ -14,6 +14,7 @@ impl Executor for StackExec { fn execute(&mut self, state: &ExecutionState) -> Result { let mut df = self.input.execute(state)?; + state.set_schema(Arc::new(df.schema())); let res = if self.has_windows { // we have a different run here // to ensure the window functions run sequential and share caches @@ -26,12 +27,13 @@ impl Executor for StackExec { .collect::>>() })? }; + state.clear_schema_cache(); + state.clear_expr_cache(); for s in res { df.with_column(s)?; } - state.clear_expr_cache(); Ok(df) } } diff --git a/polars/polars-lazy/src/physical_plan/expressions/column.rs b/polars/polars-lazy/src/physical_plan/expressions/column.rs index c5020b770972..db503c9c9d72 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/column.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/column.rs @@ -17,14 +17,16 @@ impl PhysicalExpr for ColumnExpr { fn as_expression(&self) -> &Expr { &self.1 } - fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> Result { - let column = match &*self.0 { - "" => df.select_at_idx(0).ok_or_else(|| { - PolarsError::NoData("could not select a column from an empty DataFrame".into()) - })?, - _ => df.column(&self.0)?, - }; - Ok(column.clone()) + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result { + match state.get_schema() { + None => df.column(&self.0).cloned(), + Some(schema) => { + let (idx, _, _) = schema + .get_full(&self.0) + .ok_or_else(|| PolarsError::NotFound(self.0.to_string()))?; + Ok(df.get_columns()[idx].clone()) + } + } } #[allow(clippy::ptr_arg)] fn evaluate_on_groups<'a>( diff --git a/polars/polars-lazy/src/physical_plan/expressions/window.rs b/polars/polars-lazy/src/physical_plan/expressions/window.rs index 2ca2213ffa4d..df918559a18d 100644 --- a/polars/polars-lazy/src/physical_plan/expressions/window.rs +++ b/polars/polars-lazy/src/physical_plan/expressions/window.rs @@ -248,7 +248,7 @@ impl PhysicalExpr for WindowExpr { cache_key.push_str(s.name()); } - let mut gt_map = state.group_tuples.lock().unwrap(); + let mut gt_map = state.group_tuples.lock(); // we run sequential and partitioned // and every partition run the cache should be empty so we expect a max of 1. debug_assert!(gt_map.len() <= 1); @@ -274,7 +274,7 @@ impl PhysicalExpr for WindowExpr { let cache_gb = |mut gb: GroupBy| { if state.cache_window { let groups = std::mem::take(gb.get_groups_mut()); - let mut gt_map = state.group_tuples.lock().unwrap(); + let mut gt_map = state.group_tuples.lock(); gt_map.insert(cache_key.clone(), groups); } else { // drop the group tuples to reduce allocated memory. @@ -424,7 +424,7 @@ impl PhysicalExpr for WindowExpr { // try to get cached join_tuples let opt_join_tuples = if state.cache_window { - let mut jt_map = state.join_tuples.lock().unwrap(); + let mut jt_map = state.join_tuples.lock(); // we run sequential and partitioned // and every partition run the cache should be empty so we expect a max of 1. debug_assert!(jt_map.len() <= 1); @@ -447,7 +447,7 @@ impl PhysicalExpr for WindowExpr { } if state.cache_window { - let mut jt_map = state.join_tuples.lock().unwrap(); + let mut jt_map = state.join_tuples.lock(); jt_map.insert(cache_key, opt_join_tuples); } diff --git a/polars/polars-lazy/src/physical_plan/state.rs b/polars/polars-lazy/src/physical_plan/state.rs index 7a802da3460c..2def649b8480 100644 --- a/polars/polars-lazy/src/physical_plan/state.rs +++ b/polars/polars-lazy/src/physical_plan/state.rs @@ -1,8 +1,9 @@ use ahash::RandomState; +use parking_lot::{Mutex, RwLock}; use polars_core::frame::groupby::GroupsProxy; use polars_core::prelude::*; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::ops::Deref; pub type JoinTuplesCache = Arc)>, RandomState>>>; @@ -12,6 +13,7 @@ pub type GroupsProxyCache = Arc> #[derive(Clone)] pub struct ExecutionState { df_cache: Arc>>, + pub schema_cache: Arc>>, /// Used by Window Expression to prevent redundant grouping pub(crate) group_tuples: GroupsProxyCache, /// Used by Window Expression to prevent redundant joins @@ -24,6 +26,7 @@ impl ExecutionState { pub fn new() -> Self { Self { df_cache: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), + schema_cache: Arc::new(RwLock::new(None)), group_tuples: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), join_tuples: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), verbose: std::env::var("POLARS_VERBOSE").is_ok(), @@ -31,25 +34,43 @@ impl ExecutionState { } } + /// Set the schema. Typically at the start of a projection. + pub(crate) fn set_schema(&self, schema: SchemaRef) { + let mut opt = self.schema_cache.write(); + *opt = Some(schema) + } + + /// Clear the schema. Typically at the end of a projection. + pub(crate) fn clear_schema_cache(&self) { + let mut lock = self.schema_cache.write(); + *lock = None; + } + + /// Get the schema. + pub(crate) fn get_schema(&self) -> Option { + let opt = self.schema_cache.read(); + opt.deref().clone() + } + /// Check if we have DataFrame in cache - pub fn cache_hit(&self, key: &str) -> Option { - let guard = self.df_cache.lock().unwrap(); + pub(crate) fn cache_hit(&self, key: &str) -> Option { + let guard = self.df_cache.lock(); guard.get(key).cloned() } /// Store DataFrame in cache. - pub fn store_cache(&self, key: String, df: DataFrame) { - let mut guard = self.df_cache.lock().unwrap(); + pub(crate) fn store_cache(&self, key: String, df: DataFrame) { + let mut guard = self.df_cache.lock(); guard.insert(key, df); } /// Clear the cache used by the Window expressions - pub fn clear_expr_cache(&self) { + pub(crate) fn clear_expr_cache(&self) { { - let mut lock = self.group_tuples.lock().unwrap(); + let mut lock = self.group_tuples.lock(); lock.clear(); } - let mut lock = self.join_tuples.lock().unwrap(); + let mut lock = self.join_tuples.lock(); lock.clear(); } } diff --git a/polars/polars-utils/Cargo.toml b/polars/polars-utils/Cargo.toml index b8ff732d91ed..c8568738eb82 100644 --- a/polars/polars-utils/Cargo.toml +++ b/polars/polars-utils/Cargo.toml @@ -9,7 +9,7 @@ description = "private utils for the polars dataframe library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -parking_lot = "0.11" +parking_lot = "0.12" rayon = "1.5" [features] diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 0017e5153571..2eb80e8a5700 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -1364,6 +1364,7 @@ version = "0.19.1" dependencies = [ "ahash", "glob", + "parking_lot 0.12.0", "polars-arrow", "polars-core", "polars-io", @@ -1386,7 +1387,7 @@ dependencies = [ name = "polars-utils" version = "0.1.0" dependencies = [ - "parking_lot 0.11.2", + "parking_lot 0.12.0", "rayon", ] From 6a79b5fc9ec8505bc67c421869bb6ac9c87e3aef Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 28 Feb 2022 15:29:01 +0100 Subject: [PATCH 2/4] don't allow std::collections::Hashmap --- polars/clippy.toml | 1 + polars/polars-core/src/frame/hash_join/mod.rs | 4 ++-- polars/polars-core/src/frame/mod.rs | 23 ++++++++----------- polars/polars-lazy/src/logical_plan/alp.rs | 11 ++++----- .../polars-lazy/src/logical_plan/builder.rs | 9 ++++---- .../polars-lazy/src/physical_plan/planner.rs | 8 ++----- polars/polars-lazy/src/physical_plan/state.rs | 15 +++++------- polars/polars-lazy/src/utils.rs | 4 +--- 8 files changed, 29 insertions(+), 46 deletions(-) create mode 100644 polars/clippy.toml diff --git a/polars/clippy.toml b/polars/clippy.toml new file mode 100644 index 000000000000..ad9bd114bed6 --- /dev/null +++ b/polars/clippy.toml @@ -0,0 +1 @@ +disallowed-types = ["std::collections::HashMap", "std::collections::HashSet"] diff --git a/polars/polars-core/src/frame/hash_join/mod.rs b/polars/polars-core/src/frame/hash_join/mod.rs index 2ac73b3fd5b8..db3e50ca92b1 100644 --- a/polars/polars-core/src/frame/hash_join/mod.rs +++ b/polars/polars-core/src/frame/hash_join/mod.rs @@ -16,7 +16,6 @@ use ahash::RandomState; use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::HashMap; use rayon::prelude::*; -use std::collections::HashSet; use std::fmt::Debug; use std::hash::{BuildHasher, Hash, Hasher}; @@ -1017,7 +1016,7 @@ impl DataFrame { mut df_right: DataFrame, suffix: Option, ) -> Result { - let mut left_names = HashSet::with_capacity_and_hasher(df_left.width(), RandomState::new()); + let mut left_names = PlHashSet::with_capacity(df_left.width()); df_left.columns.iter().for_each(|series| { left_names.insert(series.name()); @@ -1036,6 +1035,7 @@ impl DataFrame { df_right.rename(&name, &format!("{}{}", name, suffix))?; } + drop(left_names); df_left.hstack_mut(&df_right.columns)?; Ok(df_left) } diff --git a/polars/polars-core/src/frame/mod.rs b/polars/polars-core/src/frame/mod.rs index 5d3b682cbc36..4e49e9a7e67d 100644 --- a/polars/polars-core/src/frame/mod.rs +++ b/polars/polars-core/src/frame/mod.rs @@ -1,6 +1,5 @@ //! DataFrame module. use std::borrow::Cow; -use std::collections::HashSet; use std::iter::{FromIterator, Iterator}; use std::mem; use std::ops; @@ -33,7 +32,6 @@ use crate::vector_hasher::boost_hash_combine; #[cfg(feature = "row_hash")] use crate::vector_hasher::df_rows_to_hashes_threaded; use crate::POOL; -use hashbrown::HashMap; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use std::hash::{BuildHasher, Hash, Hasher}; @@ -155,14 +153,6 @@ impl DataFrame { } } - fn hash_names(&self) -> HashSet { - let mut set = HashSet::with_capacity_and_hasher(self.columns.len(), RandomState::default()); - for s in &self.columns { - set.insert(s.name().to_string()); - } - set - } - /// Create a DataFrame from a Vector of Series. /// /// # Example @@ -673,7 +663,11 @@ impl DataFrame { /// } /// ``` pub fn hstack_mut(&mut self, columns: &[Series]) -> Result<&mut Self> { - let mut names = self.hash_names(); + let mut names = PlHashSet::with_capacity(self.columns.len()); + for s in &self.columns { + names.insert(s.name()); + } + let height = self.height(); // first loop check validity. We don't do this in a single pass otherwise // this DataFrame is already modified when an error occurs. @@ -693,8 +687,9 @@ impl DataFrame { .into(), )); } - names.insert(name.to_string()); + names.insert(name); } + drop(names); Ok(self.hstack_mut_no_checks(columns)) } @@ -1280,10 +1275,10 @@ impl DataFrame { /// A non generic implementation to reduce compiler bloat. fn select_series_impl(&self, cols: &[String]) -> Result> { - let selected = if cols.len() > 1 && self.columns.len() > 300 { + let selected = if cols.len() > 1 && self.columns.len() > 10 { // we hash, because there are user that having millions of columns. // # https://github.com/pola-rs/polars/issues/1023 - let name_to_idx: HashMap<&str, usize> = self + let name_to_idx: PlHashMap<&str, usize> = self .columns .iter() .enumerate() diff --git a/polars/polars-lazy/src/logical_plan/alp.rs b/polars/polars-lazy/src/logical_plan/alp.rs index 735b7f49e375..2f5dbe336c72 100644 --- a/polars/polars-lazy/src/logical_plan/alp.rs +++ b/polars/polars-lazy/src/logical_plan/alp.rs @@ -5,10 +5,8 @@ use crate::logical_plan::ParquetOptions; use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions}; use crate::prelude::*; use crate::utils::{aexprs_to_schema, PushNode}; -use ahash::RandomState; use polars_core::prelude::*; use polars_utils::arena::{Arena, Node}; -use std::collections::HashSet; #[cfg(any(feature = "csv-file", feature = "parquet"))] use std::path::PathBuf; use std::sync::Arc; @@ -669,10 +667,8 @@ impl<'a> ALogicalPlanBuilder<'a> { let schema_right = self.lp_arena.get(other).schema(self.lp_arena); // column names of left table - let mut names: HashSet<&str, RandomState> = HashSet::with_capacity_and_hasher( - schema_left.len() + schema_right.len(), - Default::default(), - ); + let mut names: PlHashSet<&str> = + PlHashSet::with_capacity(schema_left.len() + schema_right.len()); let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len()); for (name, dtype) in schema_left.iter() { @@ -680,7 +676,7 @@ impl<'a> ALogicalPlanBuilder<'a> { new_schema.with_column(name.to_string(), dtype.clone()) } - let right_names: HashSet<_, RandomState> = right_on + let right_names: PlHashSet<_> = right_on .iter() .map(|e| match self.expr_arena.get(*e) { AExpr::Alias(_, name) => name.clone(), @@ -710,6 +706,7 @@ impl<'a> ALogicalPlanBuilder<'a> { right_on, options, }; + drop(names); let root = self.lp_arena.add(lp); Self::new(root, self.expr_arena, self.lp_arena) } diff --git a/polars/polars-lazy/src/logical_plan/builder.rs b/polars/polars-lazy/src/logical_plan/builder.rs index 95651b2d6ebf..6462bea9ead9 100644 --- a/polars/polars-lazy/src/logical_plan/builder.rs +++ b/polars/polars-lazy/src/logical_plan/builder.rs @@ -2,7 +2,6 @@ use crate::logical_plan::projection::rewrite_projections; use crate::prelude::*; use crate::utils; use crate::utils::{combine_predicates_expr, has_expr}; -use ahash::RandomState; use polars_core::prelude::*; use polars_io::csv::CsvEncoding; #[cfg(feature = "csv-file")] @@ -17,7 +16,6 @@ use polars_io::{ csv::NullValues, csv_core::utils::{get_reader_bytes, is_compressed}, }; -use std::collections::HashSet; use std::io::{Read, Seek, SeekFrom}; use std::path::PathBuf; @@ -378,7 +376,7 @@ impl LogicalPlanBuilder { let schema_right = other.schema(); // column names of left table - let mut names: HashSet<&String, RandomState> = HashSet::default(); + let mut names: PlHashSet<&str> = PlHashSet::default(); let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len()); for (name, dtype) in schema_left.iter() { @@ -386,14 +384,14 @@ impl LogicalPlanBuilder { new_schema.with_column(name.to_string(), dtype.clone()) } - let right_names: HashSet<_, RandomState> = right_on + let right_names: PlHashSet<_> = right_on .iter() .map(|e| utils::expr_output_name(e).expect("could not find name")) .collect(); for (name, dtype) in schema_right.iter() { if !right_names.iter().any(|s| s.as_ref() == name) { - if names.contains(name) { + if names.contains(&**name) { let new_name = format!("{}{}", name, options.suffix.as_ref()); new_schema.with_column(new_name, dtype.clone()) } else { @@ -404,6 +402,7 @@ impl LogicalPlanBuilder { let schema = Arc::new(new_schema); + drop(names); LogicalPlan::Join { input_left: Box::new(self.0), input_right: Box::new(other), diff --git a/polars/polars-lazy/src/physical_plan/planner.rs b/polars/polars-lazy/src/physical_plan/planner.rs index d07488a14d22..c259b4bd0342 100644 --- a/polars/polars-lazy/src/physical_plan/planner.rs +++ b/polars/polars-lazy/src/physical_plan/planner.rs @@ -13,12 +13,10 @@ use crate::{ logical_plan::iterator::ArenaExprIter, utils::{aexpr_to_root_names, aexpr_to_root_nodes, agg_source_paths, has_aexpr}, }; -use ahash::RandomState; use polars_core::prelude::*; use polars_core::{frame::groupby::GroupByMethod, utils::parallel_op_series}; #[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))] use polars_io::aggregations::ScanAggregation; -use std::collections::HashSet; use std::sync::Arc; #[cfg(any(feature = "parquet", feature = "csv-file"))] @@ -430,11 +428,9 @@ impl DefaultPlanner { // check if two DataFrames come from a separate source. // If they don't we can parallelize, // Otherwise it is in cache. - let mut sources_left = - HashSet::with_capacity_and_hasher(16, RandomState::default()); + let mut sources_left = PlHashSet::with_capacity(16); agg_source_paths(input_left, &mut sources_left, lp_arena); - let mut sources_right = - HashSet::with_capacity_and_hasher(16, RandomState::default()); + let mut sources_right = PlHashSet::with_capacity(16); agg_source_paths(input_right, &mut sources_right, lp_arena); sources_left.intersection(&sources_right).next().is_none() } else { diff --git a/polars/polars-lazy/src/physical_plan/state.rs b/polars/polars-lazy/src/physical_plan/state.rs index 2def649b8480..af4c8d42d021 100644 --- a/polars/polars-lazy/src/physical_plan/state.rs +++ b/polars/polars-lazy/src/physical_plan/state.rs @@ -1,18 +1,15 @@ -use ahash::RandomState; use parking_lot::{Mutex, RwLock}; use polars_core::frame::groupby::GroupsProxy; use polars_core::prelude::*; -use std::collections::HashMap; use std::ops::Deref; -pub type JoinTuplesCache = - Arc)>, RandomState>>>; -pub type GroupsProxyCache = Arc>>; +pub type JoinTuplesCache = Arc)>>>>; +pub type GroupsProxyCache = Arc>>; /// State/ cache that is maintained during the Execution of the physical plan. #[derive(Clone)] pub struct ExecutionState { - df_cache: Arc>>, + df_cache: Arc>>, pub schema_cache: Arc>>, /// Used by Window Expression to prevent redundant grouping pub(crate) group_tuples: GroupsProxyCache, @@ -25,10 +22,10 @@ pub struct ExecutionState { impl ExecutionState { pub fn new() -> Self { Self { - df_cache: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), + df_cache: Arc::new(Mutex::new(PlHashMap::default())), schema_cache: Arc::new(RwLock::new(None)), - group_tuples: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), - join_tuples: Arc::new(Mutex::new(HashMap::with_hasher(RandomState::default()))), + group_tuples: Arc::new(Mutex::new(PlHashMap::default())), + join_tuples: Arc::new(Mutex::new(PlHashMap::default())), verbose: std::env::var("POLARS_VERBOSE").is_ok(), cache_window: true, } diff --git a/polars/polars-lazy/src/utils.rs b/polars/polars-lazy/src/utils.rs index 2e8d410a08c8..6557d680dd7d 100644 --- a/polars/polars-lazy/src/utils.rs +++ b/polars/polars-lazy/src/utils.rs @@ -1,9 +1,7 @@ use crate::logical_plan::iterator::{ArenaExprIter, ArenaLpIter}; use crate::logical_plan::Context; use crate::prelude::*; -use ahash::RandomState; use polars_core::prelude::*; -use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -235,7 +233,7 @@ pub(crate) fn expressions_to_schema(expr: &[Expr], schema: &Schema, ctxt: Contex /// Get a set of the data source paths in this LogicalPlan pub(crate) fn agg_source_paths( root_lp: Node, - paths: &mut HashSet, + paths: &mut PlHashSet, lp_arena: &Arena, ) { lp_arena.iter(root_lp).for_each(|(_, lp)| { From 99086b3f66f8087869c81b05c1d77c90fa8ced6a Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 28 Feb 2022 16:17:45 +0100 Subject: [PATCH 3/4] set schema in groupby --- .../polars-lazy/src/physical_plan/executors/groupby.rs | 8 ++++++++ .../src/physical_plan/executors/groupby_dynamic.rs | 2 ++ .../src/physical_plan/executors/groupby_rolling.rs | 2 ++ polars/polars-lazy/src/physical_plan/executors/mod.rs | 4 +--- polars/polars-lazy/src/physical_plan/executors/stack.rs | 2 +- polars/polars-lazy/src/physical_plan/state.rs | 9 ++++++--- 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby.rs b/polars/polars-lazy/src/physical_plan/executors/groupby.rs index 310f331761d0..5b233632e882 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby.rs @@ -44,6 +44,7 @@ fn groupby_helper( let gb = df.groupby_with_series(keys, true, maintain_order)?; if let Some(f) = apply { + state.clear_schema_cache(); return gb.apply(|df| f.call_udf(df)); } @@ -74,6 +75,7 @@ fn groupby_helper( let agg_columns = agg_columns?; columns.extend(agg_columns.into_iter().flatten()); + state.clear_schema_cache(); DataFrame::new(columns) } @@ -83,6 +85,7 @@ impl Executor for GroupByExec { eprintln!("aggregates are not partitionable: running default HASH AGGREGATION") } let df = self.input.execute(state)?; + state.set_schema(&df, self.aggs.len() + self.keys.len()); let keys = self .keys .iter() @@ -215,6 +218,7 @@ fn sample_cardinality(key: &Series, sample_size: usize) -> f32 { impl Executor for PartitionGroupByExec { fn execute(&mut self, state: &ExecutionState) -> Result { let original_df = self.input.execute(state)?; + state.set_schema(&original_df, self.aggs.len() + 1); // already get the keys. This is the very last minute decision which groupby method we choose. // If the column is a categorical, we know the number of groups we have and can decide to continue @@ -289,6 +293,9 @@ impl Executor for PartitionGroupByExec { // MERGE phase // merge and hash aggregate again let df = accumulate_dataframes_vertical(dfs)?; + // the partitioned groupby has added columns so we must update the schema. + state.clear_schema_cache(); + state.set_schema(&df, self.aggs.len() + 1); let key = self.key.evaluate(&df, state)?; // first get mutable access and optionally sort @@ -321,6 +328,7 @@ impl Executor for PartitionGroupByExec { POOL.install(|| rayon::join(get_columns, get_agg)); columns.extend(agg_columns); + state.clear_schema_cache(); let df = DataFrame::new_no_checks(columns); Ok(df) diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs index 17061a0e3a2e..1034bf7a4d0e 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs @@ -17,6 +17,7 @@ impl Executor for GroupByDynamicExec { #[cfg(feature = "dynamic_groupby")] { let df = self.input.execute(state)?; + state.set_schema(&df, self.keys.len() + self.aggs.len()); let keys = self .keys .iter() @@ -44,6 +45,7 @@ impl Executor for GroupByDynamicExec { .collect::>>() })?; + state.clear_schema_cache(); let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len()); columns.extend(keys); columns.push(time_key); diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs index 257844feceaf..9aac9d8534ef 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs @@ -14,6 +14,7 @@ impl Executor for GroupByRollingExec { #[cfg(feature = "dynamic_groupby")] { let df = self.input.execute(state)?; + state.set_schema(&df, self.aggs.len()); let (time_key, groups) = df.groupby_rolling(&self.options)?; @@ -36,6 +37,7 @@ impl Executor for GroupByRollingExec { .collect::>>() })?; + state.clear_schema_cache(); let mut columns = Vec::with_capacity(agg_columns.len() + 1); columns.push(time_key); columns.extend(agg_columns.into_iter().flatten()); diff --git a/polars/polars-lazy/src/physical_plan/executors/mod.rs b/polars/polars-lazy/src/physical_plan/executors/mod.rs index f44a4d0ca4d9..2431302b8ddf 100644 --- a/polars/polars-lazy/src/physical_plan/executors/mod.rs +++ b/polars/polars-lazy/src/physical_plan/executors/mod.rs @@ -132,9 +132,7 @@ pub(crate) fn evaluate_physical_expressions( state: &ExecutionState, has_windows: bool, ) -> Result { - if exprs.len() > 1 && df.get_columns().len() > 10 { - state.set_schema(Arc::new(df.schema())); - } + state.set_schema(df, exprs.len()); let zero_length = df.height() == 0; let selected_columns = if has_windows { execute_projection_cached_window_fns(df, exprs, state)? diff --git a/polars/polars-lazy/src/physical_plan/executors/stack.rs b/polars/polars-lazy/src/physical_plan/executors/stack.rs index ce25dfe5fcbb..e91a3b97b8d2 100644 --- a/polars/polars-lazy/src/physical_plan/executors/stack.rs +++ b/polars/polars-lazy/src/physical_plan/executors/stack.rs @@ -14,7 +14,7 @@ impl Executor for StackExec { fn execute(&mut self, state: &ExecutionState) -> Result { let mut df = self.input.execute(state)?; - state.set_schema(Arc::new(df.schema())); + state.set_schema(&df, self.expr.len()); let res = if self.has_windows { // we have a different run here // to ensure the window functions run sequential and share caches diff --git a/polars/polars-lazy/src/physical_plan/state.rs b/polars/polars-lazy/src/physical_plan/state.rs index af4c8d42d021..0091b8afc1fb 100644 --- a/polars/polars-lazy/src/physical_plan/state.rs +++ b/polars/polars-lazy/src/physical_plan/state.rs @@ -32,9 +32,12 @@ impl ExecutionState { } /// Set the schema. Typically at the start of a projection. - pub(crate) fn set_schema(&self, schema: SchemaRef) { - let mut opt = self.schema_cache.write(); - *opt = Some(schema) + pub(crate) fn set_schema(&self, df: &DataFrame, exprs_len: usize) { + if exprs_len > 1 && df.get_columns().len() > 10 { + let schema = Arc::new(df.schema()); + let mut opt = self.schema_cache.write(); + *opt = Some(schema) + } } /// Clear the schema. Typically at the end of a projection. From 81d764191d05756a6419086adfd02daafd197b86 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 28 Feb 2022 16:53:15 +0100 Subject: [PATCH 4/4] use schema when known --- .../src/physical_plan/executors/groupby.rs | 8 ++++---- .../src/physical_plan/executors/groupby_dynamic.rs | 3 ++- .../src/physical_plan/executors/groupby_rolling.rs | 3 ++- .../polars-lazy/src/physical_plan/executors/mod.rs | 1 - .../src/physical_plan/executors/projection.rs | 2 ++ .../src/physical_plan/executors/scan.rs | 2 ++ .../src/physical_plan/executors/stack.rs | 3 ++- polars/polars-lazy/src/physical_plan/planner.rs | 14 ++++++++++++-- polars/polars-lazy/src/physical_plan/state.rs | 9 ++++++--- polars/tests/it/io/csv.rs | 1 + polars/tests/it/schema.rs | 2 +- 11 files changed, 34 insertions(+), 14 deletions(-) diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby.rs b/polars/polars-lazy/src/physical_plan/executors/groupby.rs index 5b233632e882..abff13a36cd1 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby.rs @@ -13,6 +13,7 @@ pub struct GroupByExec { aggs: Vec>, apply: Option>, maintain_order: bool, + input_schema: SchemaRef, } impl GroupByExec { @@ -22,6 +23,7 @@ impl GroupByExec { aggs: Vec>, apply: Option>, maintain_order: bool, + input_schema: SchemaRef, ) -> Self { Self { input, @@ -29,6 +31,7 @@ impl GroupByExec { aggs, apply, maintain_order, + input_schema, } } } @@ -85,7 +88,7 @@ impl Executor for GroupByExec { eprintln!("aggregates are not partitionable: running default HASH AGGREGATION") } let df = self.input.execute(state)?; - state.set_schema(&df, self.aggs.len() + self.keys.len()); + state.set_schema(self.input_schema.clone()); let keys = self .keys .iter() @@ -218,7 +221,6 @@ fn sample_cardinality(key: &Series, sample_size: usize) -> f32 { impl Executor for PartitionGroupByExec { fn execute(&mut self, state: &ExecutionState) -> Result { let original_df = self.input.execute(state)?; - state.set_schema(&original_df, self.aggs.len() + 1); // already get the keys. This is the very last minute decision which groupby method we choose. // If the column is a categorical, we know the number of groups we have and can decide to continue @@ -294,8 +296,6 @@ impl Executor for PartitionGroupByExec { // merge and hash aggregate again let df = accumulate_dataframes_vertical(dfs)?; // the partitioned groupby has added columns so we must update the schema. - state.clear_schema_cache(); - state.set_schema(&df, self.aggs.len() + 1); let key = self.key.evaluate(&df, state)?; // first get mutable access and optionally sort diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs index 1034bf7a4d0e..9d993385ace0 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_dynamic.rs @@ -10,6 +10,7 @@ pub(crate) struct GroupByDynamicExec { pub(crate) keys: Vec>, pub(crate) aggs: Vec>, pub(crate) options: DynamicGroupOptions, + pub(crate) input_schema: SchemaRef, } impl Executor for GroupByDynamicExec { @@ -17,7 +18,7 @@ impl Executor for GroupByDynamicExec { #[cfg(feature = "dynamic_groupby")] { let df = self.input.execute(state)?; - state.set_schema(&df, self.keys.len() + self.aggs.len()); + state.set_schema(self.input_schema.clone()); let keys = self .keys .iter() diff --git a/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs b/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs index 9aac9d8534ef..1bdda6c373bd 100644 --- a/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs +++ b/polars/polars-lazy/src/physical_plan/executors/groupby_rolling.rs @@ -7,6 +7,7 @@ pub(crate) struct GroupByRollingExec { pub(crate) input: Box, pub(crate) aggs: Vec>, pub(crate) options: RollingGroupOptions, + pub(crate) input_schema: SchemaRef, } impl Executor for GroupByRollingExec { @@ -14,7 +15,7 @@ impl Executor for GroupByRollingExec { #[cfg(feature = "dynamic_groupby")] { let df = self.input.execute(state)?; - state.set_schema(&df, self.aggs.len()); + state.set_schema(self.input_schema.clone()); let (time_key, groups) = df.groupby_rolling(&self.options)?; diff --git a/polars/polars-lazy/src/physical_plan/executors/mod.rs b/polars/polars-lazy/src/physical_plan/executors/mod.rs index 2431302b8ddf..cb7127382946 100644 --- a/polars/polars-lazy/src/physical_plan/executors/mod.rs +++ b/polars/polars-lazy/src/physical_plan/executors/mod.rs @@ -132,7 +132,6 @@ pub(crate) fn evaluate_physical_expressions( state: &ExecutionState, has_windows: bool, ) -> Result { - state.set_schema(df, exprs.len()); let zero_length = df.height() == 0; let selected_columns = if has_windows { execute_projection_cached_window_fns(df, exprs, state)? diff --git a/polars/polars-lazy/src/physical_plan/executors/projection.rs b/polars/polars-lazy/src/physical_plan/executors/projection.rs index 385c46f5a0e2..4ab17edd753d 100644 --- a/polars/polars-lazy/src/physical_plan/executors/projection.rs +++ b/polars/polars-lazy/src/physical_plan/executors/projection.rs @@ -9,6 +9,7 @@ pub struct ProjectionExec { pub(crate) input: Box, pub(crate) expr: Vec>, pub(crate) has_windows: bool, + pub(crate) input_schema: SchemaRef, #[cfg(test)] pub(crate) schema: SchemaRef, } @@ -16,6 +17,7 @@ pub struct ProjectionExec { impl Executor for ProjectionExec { fn execute(&mut self, state: &ExecutionState) -> Result { let df = self.input.execute(state)?; + state.set_schema(self.input_schema.clone()); let df = evaluate_physical_expressions(&df, &self.expr, state, self.has_windows); diff --git a/polars/polars-lazy/src/physical_plan/executors/scan.rs b/polars/polars-lazy/src/physical_plan/executors/scan.rs index c90d76fedd87..dcbf7748c68f 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan.rs @@ -264,6 +264,7 @@ impl Executor for DataFrameExec { // projection should be before selection as those are free // TODO: this is only the case if we don't create new columns if let Some(projection) = &self.projection { + state.may_set_schema(&df, projection.len()); df = evaluate_physical_expressions(&df, projection, state, self.has_windows)?; } @@ -274,6 +275,7 @@ impl Executor for DataFrameExec { })?; df = df.filter(mask)?; } + state.clear_schema_cache(); if let Some(limit) = set_n_rows(None) { Ok(df.head(Some(limit))) diff --git a/polars/polars-lazy/src/physical_plan/executors/stack.rs b/polars/polars-lazy/src/physical_plan/executors/stack.rs index e91a3b97b8d2..3dbdaa09efd6 100644 --- a/polars/polars-lazy/src/physical_plan/executors/stack.rs +++ b/polars/polars-lazy/src/physical_plan/executors/stack.rs @@ -8,13 +8,14 @@ pub struct StackExec { pub(crate) input: Box, pub(crate) has_windows: bool, pub(crate) expr: Vec>, + pub(crate) input_schema: SchemaRef, } impl Executor for StackExec { fn execute(&mut self, state: &ExecutionState) -> Result { let mut df = self.input.execute(state)?; - state.set_schema(&df, self.expr.len()); + state.set_schema(self.input_schema.clone()); let res = if self.has_windows { // we have a different run here // to ensure the window functions run sequential and share caches diff --git a/polars/polars-lazy/src/physical_plan/planner.rs b/polars/polars-lazy/src/physical_plan/planner.rs index c259b4bd0342..6e8534e70226 100644 --- a/polars/polars-lazy/src/physical_plan/planner.rs +++ b/polars/polars-lazy/src/physical_plan/planner.rs @@ -195,6 +195,7 @@ impl DefaultPlanner { schema: _schema, .. } => { + let input_schema = lp_arena.get(input).schema(lp_arena).clone(); let has_windows = expr.iter().any(|node| has_window_aexpr(*node, expr_arena)); let input = self.create_physical_plan(input, lp_arena, expr_arena)?; let phys_expr = @@ -203,6 +204,7 @@ impl DefaultPlanner { input, expr: phys_expr, has_windows, + input_schema, #[cfg(test)] schema: _schema, })) @@ -210,9 +212,12 @@ impl DefaultPlanner { LocalProjection { expr, input, - schema: _schema, + #[cfg(test)] + schema: _schema, .. } => { + let input_schema = lp_arena.get(input).schema(lp_arena).clone(); + let has_windows = expr.iter().any(|node| has_window_aexpr(*node, expr_arena)); let input = self.create_physical_plan(input, lp_arena, expr_arena)?; let phys_expr = @@ -221,6 +226,7 @@ impl DefaultPlanner { input, expr: phys_expr, has_windows, + input_schema, #[cfg(test)] schema: _schema, })) @@ -301,7 +307,6 @@ impl DefaultPlanner { maintain_order, options, } => { - #[cfg(feature = "object")] let input_schema = lp_arena.get(input).schema(lp_arena).clone(); let input = self.create_physical_plan(input, lp_arena, expr_arena)?; @@ -317,6 +322,7 @@ impl DefaultPlanner { keys: phys_keys, aggs: phys_aggs, options, + input_schema, })); } @@ -325,6 +331,7 @@ impl DefaultPlanner { input, aggs: phys_aggs, options, + input_schema, })); } @@ -411,6 +418,7 @@ impl DefaultPlanner { phys_aggs, apply, maintain_order, + input_schema, ))) } } @@ -454,6 +462,7 @@ impl DefaultPlanner { ))) } HStack { input, exprs, .. } => { + let input_schema = lp_arena.get(input).schema(lp_arena).clone(); let has_windows = exprs.iter().any(|node| has_window_aexpr(*node, expr_arena)); let input = self.create_physical_plan(input, lp_arena, expr_arena)?; let phys_expr = @@ -462,6 +471,7 @@ impl DefaultPlanner { input, has_windows, expr: phys_expr, + input_schema, })) } Udf { diff --git a/polars/polars-lazy/src/physical_plan/state.rs b/polars/polars-lazy/src/physical_plan/state.rs index 0091b8afc1fb..3c9881866419 100644 --- a/polars/polars-lazy/src/physical_plan/state.rs +++ b/polars/polars-lazy/src/physical_plan/state.rs @@ -30,13 +30,16 @@ impl ExecutionState { cache_window: true, } } + pub(crate) fn set_schema(&self, schema: SchemaRef) { + let mut opt = self.schema_cache.write(); + *opt = Some(schema) + } /// Set the schema. Typically at the start of a projection. - pub(crate) fn set_schema(&self, df: &DataFrame, exprs_len: usize) { + pub(crate) fn may_set_schema(&self, df: &DataFrame, exprs_len: usize) { if exprs_len > 1 && df.get_columns().len() > 10 { let schema = Arc::new(df.schema()); - let mut opt = self.schema_cache.write(); - *opt = Some(schema) + self.set_schema(schema); } } diff --git a/polars/tests/it/io/csv.rs b/polars/tests/it/io/csv.rs index 82ed2c28ac89..38c7fa7f61d0 100644 --- a/polars/tests/it/io/csv.rs +++ b/polars/tests/it/io/csv.rs @@ -146,6 +146,7 @@ fn test_tab_sep() { .with_ignore_parser_errors(true) .finish() .unwrap(); + assert_eq!(df.shape(), (8, 26)) } #[test] diff --git a/polars/tests/it/schema.rs b/polars/tests/it/schema.rs index d22670a4f852..c14d76000c4a 100644 --- a/polars/tests/it/schema.rs +++ b/polars/tests/it/schema.rs @@ -9,7 +9,7 @@ fn test_schema_rename() { Field::new("c", Int8), ]); schema.rename("a", "anton".to_string()).unwrap(); - let mut expected = Schema::from([ + let expected = Schema::from([ Field::new("anton", UInt64), Field::new("b", Int32), Field::new("c", Int8),