Skip to content

Commit

Permalink
add lazy optimizer, so that simple projections are done single threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 23, 2021
1 parent 1e96226 commit 4a2c0b6
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 3 deletions.
31 changes: 28 additions & 3 deletions polars/polars-core/src/frame/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,44 @@ impl<'a> Selection<'a, &str> for Vec<&'a str> {
}
}

/// Similar to AsRef
/// Needed to go from Arc<String> to &str
pub trait AsRefPolars<T: ?Sized> {
/// Performs the conversion.
fn as_ref_p(&self) -> &T;
}

impl AsRefPolars<str> for std::sync::Arc<String> {
fn as_ref_p(&self) -> &str {
&**self
}
}

impl AsRefPolars<str> for String {
fn as_ref_p(&self) -> &str {
&**self
}
}

impl AsRefPolars<str> for &str {
fn as_ref_p(&self) -> &str {
self
}
}

impl<'a, T, S: 'a> Selection<'a, S> for &'a T
where
T: AsRef<[S]>,
S: AsRef<str>,
S: AsRefPolars<str>,
{
fn to_selection_vec(self) -> Vec<&'a str> {
self.as_ref().iter().map(|s| s.as_ref()).collect()
self.as_ref().iter().map(|s| s.as_ref_p()).collect()
}

fn single(&self) -> Option<&str> {
let a = self.as_ref();
match a.len() {
1 => Some(a[0].as_ref()),
1 => Some(a[0].as_ref_p()),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::logical_plan::optimizer::{
use crate::physical_plan::state::ExecutionState;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::fast_projection::FastProjection;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::utils::combine_predicates_expr;
use crate::{logical_plan::FETCH_ROWS, prelude::*};
Expand Down Expand Up @@ -538,6 +539,8 @@ impl LazyFrame {
rules.push(Box::new(opt));
}

rules.push(Box::new(FastProjection {}));

let opt = StackOptimizer {};
lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top);

Expand Down
79 changes: 79 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/fast_projection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::logical_plan::alp::ALogicalPlan;
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use polars_core::prelude::*;

/// Projection in the physical plan is done by selecting an expression per thread.
/// In case of many projections and columns this can be expensive when the expressions are simple
/// column selections. These can be selected on a single thread. The single thread is faster, because
/// the eager selection algorithm hashes the column names, making the projection complexity linear
/// instead of quadratic.
///
/// It is important that this optimization is ran after projection pushdown.
///
/// The schema reported after this optimization is also
pub(crate) struct FastProjection {}

fn impl_fast_projection(
input: Node,
expr: &[Node],
schema: Option<SchemaRef>,
expr_arena: &mut Arena<AExpr>,
) -> Option<ALogicalPlan> {
let mut columns = Vec::with_capacity(expr.len());
for node in expr.iter() {
if let AExpr::Column(name) = expr_arena.get(*node) {
columns.push(name.clone())
} else {
break;
}
}
if columns.len() == expr.len() {
let function = move |df: DataFrame| df.select(&columns);

let lp = ALogicalPlan::Udf {
input,
function: Arc::new(function),
predicate_pd: true,
projection_pd: true,
schema,
};

Some(lp)
} else {
None
}
}

impl OptimizationRule for FastProjection {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
node: Node,
) -> Option<ALogicalPlan> {
let lp = lp_arena.get(node);

match lp {
ALogicalPlan::Projection {
input,
expr,
schema,
..
} => {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
}
ALogicalPlan::LocalProjection {
input,
expr,
schema,
..
} => {
let schema = Some(schema.clone());
impl_fast_projection(*input, expr, schema, expr_arena)
}
_ => None,
}
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::{datatypes::PlHashMap, prelude::*};
pub(crate) mod aggregate_pushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
pub(crate) mod aggregate_scan_projections;
pub(crate) mod fast_projection;
#[cfg(feature = "private")]
pub(crate) mod join_pruning;
pub(crate) mod predicate_pushdown;
Expand Down

0 comments on commit 4a2c0b6

Please sign in to comment.