Skip to content

Commit

Permalink
First pass of apache#10296
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Apr 30, 2024
1 parent dd56837 commit d592275
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions datafusion/optimizer/src/eliminate_nested_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! [`EliminateNestedUnion`]: flattens nested `Union` to a single `Union`
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema;
use datafusion_expr::{Distinct, LogicalPlan, Union};
use std::sync::Arc;
Expand All @@ -37,49 +38,61 @@ impl EliminateNestedUnion {
impl OptimizerRule for EliminateNestedUnion {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called EliminateFilter::rewrite")
}

fn name(&self) -> &str {
"eliminate_nested_union"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Union(Union { inputs, schema }) => {
let inputs = inputs
.iter()
.flat_map(extract_plans_from_union)
.collect::<Vec<_>>();

Ok(Some(LogicalPlan::Union(Union {
Ok(Transformed::yes(LogicalPlan::Union(Union {
inputs,
schema: schema.clone(),
schema,
})))
}
LogicalPlan::Distinct(Distinct::All(plan)) => match plan.as_ref() {
LogicalPlan::Distinct(Distinct::All(ref nested_plan)) => match nested_plan.as_ref() {
LogicalPlan::Union(Union { inputs, schema }) => {
let inputs = inputs
.iter()
.map(extract_plan_from_distinct)
.flat_map(extract_plans_from_union)
.collect::<Vec<_>>();

Ok(Some(LogicalPlan::Distinct(Distinct::All(Arc::new(
LogicalPlan::Union(Union {
Ok(Transformed::yes(LogicalPlan::Distinct(Distinct::All(
Arc::new(LogicalPlan::Union(Union {
inputs,
schema: schema.clone(),
}),
)))))
})),
))))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
},
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
"eliminate_nested_union"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
}

fn extract_plans_from_union(plan: &Arc<LogicalPlan>) -> Vec<Arc<LogicalPlan>> {
Expand Down

0 comments on commit d592275

Please sign in to comment.