Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of projection #1028

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::prelude::sort::prepare_argsort;
#[cfg(feature = "row_hash")]
use crate::vector_hasher::df_rows_to_hashes_threaded;
use crate::POOL;
use hashbrown::HashMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -581,10 +582,34 @@ impl DataFrame {
S: Selection<'a, J>,
{
let cols = selection.to_selection_vec();
let selected = cols
.iter()
.map(|c| self.column(c).map(|s| s.clone()))
.collect::<Result<Vec<_>>>()?;
self.select_series_impl(&cols)
}

/// A non generic implementation to reduce compiler bloat.
fn select_series_impl(&self, cols: &[&str]) -> Result<Vec<Series>> {
let selected = if cols.len() > 1 && self.columns.len() > 300 {
// we hash, because there are user that having millions of columns.
// # https://github.com/pola-rs/polars/issues/1023
let name_to_idx: HashMap<&str, usize> = self
.columns
.iter()
.enumerate()
.map(|(i, s)| (s.name(), i))
.collect();
cols.iter()
.map(|&name| {
let idx = *name_to_idx
.get(name)
.ok_or_else(|| PolarsError::NotFound(name.into()))?;
Ok(self.select_at_idx(idx).unwrap().clone())
})
.collect::<Result<Vec<_>>>()?
} else {
cols.iter()
.map(|c| self.column(c).map(|s| s.clone()))
.collect::<Result<Vec<_>>>()?
};

Ok(selected)
}

Expand Down
31 changes: 28 additions & 3 deletions polars/polars-core/src/frame/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,44 @@ impl<'a> Selection<'a, &str> for Vec<&'a str> {
}
}

/// Similar to AsRef
/// Needed to go from Arc<String> to &str
pub trait AsRefPolars<T: ?Sized> {
/// Performs the conversion.
fn as_ref_p(&self) -> &T;
}

impl AsRefPolars<str> for std::sync::Arc<String> {
fn as_ref_p(&self) -> &str {
&**self
}
}

impl AsRefPolars<str> for String {
fn as_ref_p(&self) -> &str {
&**self
}
}

impl AsRefPolars<str> for &str {
fn as_ref_p(&self) -> &str {
self
}
}

impl<'a, T, S: 'a> Selection<'a, S> for &'a T
where
T: AsRef<[S]>,
S: AsRef<str>,
S: AsRefPolars<str>,
{
fn to_selection_vec(self) -> Vec<&'a str> {
self.as_ref().iter().map(|s| s.as_ref()).collect()
self.as_ref().iter().map(|s| s.as_ref_p()).collect()
}

fn single(&self) -> Option<&str> {
let a = self.as_ref();
match a.len() {
1 => Some(a[0].as_ref()),
1 => Some(a[0].as_ref_p()),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::logical_plan::optimizer::{
use crate::physical_plan::state::ExecutionState;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::fast_projection::FastProjection;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::utils::combine_predicates_expr;
use crate::{logical_plan::FETCH_ROWS, prelude::*};
Expand Down Expand Up @@ -538,6 +539,8 @@ impl LazyFrame {
rules.push(Box::new(opt));
}

rules.push(Box::new(FastProjection {}));

let opt = StackOptimizer {};
lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top);

Expand Down
79 changes: 79 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/fast_projection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::logical_plan::alp::ALogicalPlan;
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use polars_core::prelude::*;

/// Projection in the physical plan is done by selecting an expression per thread.
/// In case of many projections and columns this can be expensive when the expressions are simple
/// column selections. These can be selected on a single thread. The single thread is faster, because
/// the eager selection algorithm hashes the column names, making the projection complexity linear
/// instead of quadratic.
///
/// It is important that this optimization is ran after projection pushdown.
///
/// The schema reported after this optimization is also
pub(crate) struct FastProjection {}

fn impl_fast_projection(
input: Node,
expr: &[Node],
schema: Option<SchemaRef>,
expr_arena: &mut Arena<AExpr>,
) -> Option<ALogicalPlan> {
let mut columns = Vec::with_capacity(expr.len());
for node in expr.iter() {
if let AExpr::Column(name) = expr_arena.get(*node) {
columns.push(name.clone())
} else {
break;
}
}
if columns.len() == expr.len() {
let function = move |df: DataFrame| df.select(&columns);

let lp = ALogicalPlan::Udf {
input,
function: Arc::new(function),
predicate_pd: true,
projection_pd: true,
schema,
};

Some(lp)
} else {
None
}
}

impl OptimizationRule for FastProjection {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
node: Node,
) -> Option<ALogicalPlan> {
let lp = lp_arena.get(node);

match lp {
ALogicalPlan::Projection {
input,
expr,
schema,
..
} => {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
}
ALogicalPlan::LocalProjection {
input,
expr,
schema,
..
} => {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
}
_ => None,
}
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::{datatypes::PlHashMap, prelude::*};
pub(crate) mod aggregate_pushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
pub(crate) mod aggregate_scan_projections;
pub(crate) mod fast_projection;
#[cfg(feature = "private")]
pub(crate) mod join_pruning;
pub(crate) mod predicate_pushdown;
Expand Down