diff --git a/polars/polars-core/src/frame/mod.rs b/polars/polars-core/src/frame/mod.rs index d90ca43129e2..14d68506f9b4 100644 --- a/polars/polars-core/src/frame/mod.rs +++ b/polars/polars-core/src/frame/mod.rs @@ -35,6 +35,7 @@ use crate::prelude::sort::prepare_argsort; #[cfg(feature = "row_hash")] use crate::vector_hasher::df_rows_to_hashes_threaded; use crate::POOL; +use hashbrown::HashMap; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -581,10 +582,34 @@ impl DataFrame { S: Selection<'a, J>, { let cols = selection.to_selection_vec(); - let selected = cols - .iter() - .map(|c| self.column(c).map(|s| s.clone())) - .collect::>>()?; + self.select_series_impl(&cols) + } + + /// A non generic implementation to reduce compiler bloat. + fn select_series_impl(&self, cols: &[&str]) -> Result> { + let selected = if cols.len() > 1 && self.columns.len() > 300 { + // we hash, because there are user that having millions of columns. + // # https://github.com/pola-rs/polars/issues/1023 + let name_to_idx: HashMap<&str, usize> = self + .columns + .iter() + .enumerate() + .map(|(i, s)| (s.name(), i)) + .collect(); + cols.iter() + .map(|&name| { + let idx = *name_to_idx + .get(name) + .ok_or_else(|| PolarsError::NotFound(name.into()))?; + Ok(self.select_at_idx(idx).unwrap().clone()) + }) + .collect::>>()? + } else { + cols.iter() + .map(|c| self.column(c).map(|s| s.clone())) + .collect::>>()? + }; + Ok(selected) } diff --git a/polars/polars-core/src/frame/select.rs b/polars/polars-core/src/frame/select.rs index c62b84f64baf..3d81212d0ad6 100644 --- a/polars/polars-core/src/frame/select.rs +++ b/polars/polars-core/src/frame/select.rs @@ -28,19 +28,44 @@ impl<'a> Selection<'a, &str> for Vec<&'a str> { } } +/// Similar to AsRef +/// Needed to go from Arc to &str +pub trait AsRefPolars { + /// Performs the conversion. + fn as_ref_p(&self) -> &T; +} + +impl AsRefPolars for std::sync::Arc { + fn as_ref_p(&self) -> &str { + &**self + } +} + +impl AsRefPolars for String { + fn as_ref_p(&self) -> &str { + &**self + } +} + +impl AsRefPolars for &str { + fn as_ref_p(&self) -> &str { + self + } +} + impl<'a, T, S: 'a> Selection<'a, S> for &'a T where T: AsRef<[S]>, - S: AsRef, + S: AsRefPolars, { fn to_selection_vec(self) -> Vec<&'a str> { - self.as_ref().iter().map(|s| s.as_ref()).collect() + self.as_ref().iter().map(|s| s.as_ref_p()).collect() } fn single(&self) -> Option<&str> { let a = self.as_ref(); match a.len() { - 1 => Some(a[0].as_ref()), + 1 => Some(a[0].as_ref_p()), _ => None, } } diff --git a/polars/polars-lazy/src/frame.rs b/polars/polars-lazy/src/frame.rs index 251f12a22e42..f2b859de6033 100644 --- a/polars/polars-lazy/src/frame.rs +++ b/polars/polars-lazy/src/frame.rs @@ -17,6 +17,7 @@ use crate::logical_plan::optimizer::{ use crate::physical_plan::state::ExecutionState; #[cfg(any(feature = "parquet", feature = "csv-file"))] use crate::prelude::aggregate_scan_projections::agg_projection; +use crate::prelude::fast_projection::FastProjection; use crate::prelude::simplify_expr::SimplifyBooleanRule; use crate::utils::combine_predicates_expr; use crate::{logical_plan::FETCH_ROWS, prelude::*}; @@ -538,6 +539,8 @@ impl LazyFrame { rules.push(Box::new(opt)); } + rules.push(Box::new(FastProjection {})); + let opt = StackOptimizer {}; lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top); diff --git a/polars/polars-lazy/src/logical_plan/optimizer/fast_projection.rs b/polars/polars-lazy/src/logical_plan/optimizer/fast_projection.rs new file mode 100644 index 000000000000..cdebb1a7bf37 --- /dev/null +++ b/polars/polars-lazy/src/logical_plan/optimizer/fast_projection.rs @@ -0,0 +1,79 @@ +use crate::logical_plan::alp::ALogicalPlan; +use crate::prelude::stack_opt::OptimizationRule; +use crate::prelude::*; +use polars_core::prelude::*; + +/// Projection in the physical plan is done by selecting an expression per thread. +/// In case of many projections and columns this can be expensive when the expressions are simple +/// column selections. These can be selected on a single thread. The single thread is faster, because +/// the eager selection algorithm hashes the column names, making the projection complexity linear +/// instead of quadratic. +/// +/// It is important that this optimization is ran after projection pushdown. +/// +/// The schema reported after this optimization is also +pub(crate) struct FastProjection {} + +fn impl_fast_projection( + input: Node, + expr: &[Node], + schema: Option, + expr_arena: &mut Arena, +) -> Option { + let mut columns = Vec::with_capacity(expr.len()); + for node in expr.iter() { + if let AExpr::Column(name) = expr_arena.get(*node) { + columns.push(name.clone()) + } else { + break; + } + } + if columns.len() == expr.len() { + let function = move |df: DataFrame| df.select(&columns); + + let lp = ALogicalPlan::Udf { + input, + function: Arc::new(function), + predicate_pd: true, + projection_pd: true, + schema, + }; + + Some(lp) + } else { + None + } +} + +impl OptimizationRule for FastProjection { + fn optimize_plan( + &mut self, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + node: Node, + ) -> Option { + let lp = lp_arena.get(node); + + match lp { + ALogicalPlan::Projection { + input, + expr, + schema, + .. + } => { + let schema = Some(schema.clone()); + impl_fast_projection(*input, expr, schema, expr_arena) + } + ALogicalPlan::LocalProjection { + input, + expr, + schema, + .. + } => { + let schema = Some(schema.clone()); + impl_fast_projection(*input, expr, schema, expr_arena) + } + _ => None, + } + } +} diff --git a/polars/polars-lazy/src/logical_plan/optimizer/mod.rs b/polars/polars-lazy/src/logical_plan/optimizer/mod.rs index eaa08bfc7f14..5eca27dfa4d0 100644 --- a/polars/polars-lazy/src/logical_plan/optimizer/mod.rs +++ b/polars/polars-lazy/src/logical_plan/optimizer/mod.rs @@ -4,6 +4,7 @@ use polars_core::{datatypes::PlHashMap, prelude::*}; pub(crate) mod aggregate_pushdown; #[cfg(any(feature = "parquet", feature = "csv-file"))] pub(crate) mod aggregate_scan_projections; +pub(crate) mod fast_projection; #[cfg(feature = "private")] pub(crate) mod join_pruning; pub(crate) mod predicate_pushdown;