Skip to content

Commit

Permalink
refactor: ScalarVisitor refactor push_down_prewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Apr 11, 2024
1 parent 4c27f29 commit b81e141
Showing 1 changed file with 41 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ use crate::optimizer::rule::Rule;
use crate::optimizer::ColumnSet;
use crate::optimizer::RuleID;
use crate::optimizer::SExpr;
use crate::plans::AggregateFunction;
use crate::plans::BoundColumnRef;
use crate::plans::Filter;
use crate::plans::Prewhere;
use crate::plans::RelOp;
use crate::plans::ScalarExpr;
use crate::plans::Scan;
use crate::plans::SubqueryExpr;
use crate::plans::Visitor;
use crate::plans::WindowFunc;
use crate::IndexType;
use crate::MetadataRef;
use crate::Visibility;
Expand Down Expand Up @@ -58,44 +63,56 @@ impl RulePushDownPrewhere {
table_index: IndexType,
schema: &TableSchemaRef,
expr: &ScalarExpr,
columns: &mut ColumnSet,
) -> Result<()> {
match expr {
ScalarExpr::BoundColumnRef(column) => {
) -> Result<ColumnSet> {
struct ColumnVisitor {
table_index: IndexType,
schema: TableSchemaRef,
columns: ColumnSet,
}
impl<'a> Visitor<'a> for ColumnVisitor {
fn visit_bound_column_ref(&mut self, column: &'a BoundColumnRef) -> Result<()> {
if let Some(index) = &column.column.table_index {
if table_index == *index
if self.table_index == *index
&& (column.column.visibility == Visibility::InVisible
|| schema.index_of(column.column.column_name.as_str()).is_ok())
|| self
.schema
.index_of(column.column.column_name.as_str())
.is_ok())
{
columns.insert(column.column.index);
self.columns.insert(column.column.index);
return Ok(());
}
}
return Err(ErrorCode::Unimplemented("Column is not in the table"));
}
ScalarExpr::FunctionCall(func) => {
for arg in func.arguments.iter() {
Self::collect_columns_impl(table_index, schema, arg, columns)?;
}
}
ScalarExpr::CastExpr(cast) => {
Self::collect_columns_impl(table_index, schema, cast.argument.as_ref(), columns)?;
fn visit_window_function(&mut self, window: &'a WindowFunc) -> Result<()> {
return Err(ErrorCode::Unimplemented(format!(
"Prewhere don't support expr {:?}",
window
)));
}
ScalarExpr::ConstantExpr(_) => {}
ScalarExpr::UDFCall(udf) => {
for arg in udf.arguments.iter() {
Self::collect_columns_impl(table_index, schema, arg, columns)?;
}
fn visit_aggregate_function(&mut self, aggregate: &'a AggregateFunction) -> Result<()> {
return Err(ErrorCode::Unimplemented(format!(
"Prewhere don't support expr {:?}",
aggregate
)));
}
_ => {
// SubqueryExpr and AggregateFunction will not appear in Filter-LogicalGet
fn visit_subquery(&mut self, subquery: &'a SubqueryExpr) -> Result<()> {
return Err(ErrorCode::Unimplemented(format!(
"Prewhere don't support expr {:?}",
expr
subquery
)));
}
}
Ok(())

let mut column_visitor = ColumnVisitor {
table_index,
schema: schema.clone(),
columns: ColumnSet::new(),
};
// WindowFunc, SubqueryExpr and AggregateFunction will not appear in Filter-LogicalGet
column_visitor.visit(expr)?;
Ok(column_visitor.columns)
}

// analyze if the expression can be moved to prewhere
Expand All @@ -104,11 +121,7 @@ impl RulePushDownPrewhere {
schema: &TableSchemaRef,
expr: &ScalarExpr,
) -> Option<ColumnSet> {
let mut columns = ColumnSet::new();
// columns in subqueries are not considered
Self::collect_columns_impl(table_index, schema, expr, &mut columns).ok()?;

Some(columns)
Self::collect_columns_impl(table_index, schema, expr).ok()
}

pub fn prewhere_optimize(&self, s_expr: &SExpr) -> Result<SExpr> {
Expand Down

0 comments on commit b81e141

Please sign in to comment.