diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 0c68f1761601..a52fd40df801 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; +use datafusion_expr::logical_plan::Offset; use std::sync::Arc; /// Optimization rule that tries pushes down LIMIT n @@ -43,18 +44,24 @@ fn limit_push_down( upper_limit: Option, plan: &LogicalPlan, _execution_props: &ExecutionProps, + is_offset: bool, ) -> Result { match (plan, upper_limit) { (LogicalPlan::Limit(Limit { n, input }), upper_limit) => { - let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n); + let new_limit: usize = if is_offset { + *n + upper_limit.unwrap_or(0) + } else { + upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n) + }; Ok(LogicalPlan::Limit(Limit { - n: smallest, + n: new_limit, // push down limit to plan (minimum of upper limit and current limit) input: Arc::new(limit_push_down( _optimizer, - Some(smallest), + Some(new_limit), input.as_ref(), _execution_props, + false, )?), })) } @@ -95,6 +102,7 @@ fn limit_push_down( upper_limit, input.as_ref(), _execution_props, + false, )?), schema: schema.clone(), alias: alias.clone(), @@ -119,6 +127,7 @@ fn limit_push_down( Some(upper_limit), x, _execution_props, + false, )?), })) }) @@ -129,6 +138,25 @@ fn limit_push_down( schema: schema.clone(), })) } + // offset 5 limit 10 then push limit 15 (5 + 10) + // Limit should always be Offset's input + (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => { + let new_limit = if let Some(ul) = upper_limit { + ul + *offset + } else { + *offset + }; + Ok(LogicalPlan::Offset(Offset { + offset: *offset, + input: Arc::new(limit_push_down( + _optimizer, + Some(new_limit), + input.as_ref(), + _execution_props, + true, + )?), + })) + } // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down _ => { @@ -138,7 +166,9 @@ fn limit_push_down( let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props)) + .map(|plan| { + limit_push_down(_optimizer, None, plan, _execution_props, false) + }) .collect::>>()?; utils::from_plan(plan, &expr, &new_inputs) @@ -152,7 +182,7 @@ impl OptimizerRule for LimitPushDown { plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { - limit_push_down(self, None, plan, execution_props) + limit_push_down(self, None, plan, execution_props, false) } fn name(&self) -> &str { @@ -167,6 +197,8 @@ mod test { logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder}, test::*, }; + use datafusion_expr::exists; + use datafusion_expr::logical_plan::JoinType; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = LimitPushDown::new(); @@ -278,4 +310,172 @@ mod test { Ok(()) } + + #[test] + fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .offset(10)? + .limit(1000)? + .build()?; + + // Should push the limit down to table provider + // When it has a select + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Projection: #test.a\ + \n TableScan: test projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_pushdown_with_offset_after_limit() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .limit(1000)? + .offset(10)? + .build()?; + + let expected = "Offset: 10\ + \n Limit: 1010\ + \n Projection: #test.a\ + \n TableScan: test projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .offset(10)? + .limit(1000)? + .limit(10)? + .build()?; + + // Should push down the smallest limit + // Towards table scan + // This rule doesn't replace multiple limits + let expected = "Limit: 10\ + \n Limit: 10\ + \n Offset: 10\ + \n TableScan: test projection=None, limit=20"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![max(col("b"))])? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit should *not* push down aggregate node + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_should_push_down_with_offset_union() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan.clone()) + .union(LogicalPlanBuilder::from(table_scan).build()?)? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit should push down through union + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Union\ + \n Limit: 1010\ + \n TableScan: test projection=None, limit=1010\ + \n Limit: 1010\ + \n TableScan: test projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_should_not_push_down_with_offset_join() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .join( + &LogicalPlanBuilder::from(table_scan_2).build()?, + JoinType::Left, + (vec!["a"], vec!["a"]), + )? + .limit(1000)? + .offset(10)? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Offset: 10\ + \n Limit: 1010\ + \n Left Join: #test.a = #test2.a\ + \n TableScan: test projection=None\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> { + let table_scan_1 = test_table_scan_with_name("test1")?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let subquery = LogicalPlanBuilder::from(table_scan_1) + .project(vec![col("a")])? + .filter(col("a").eq(col("test1.a")))? + .build()?; + + let outer_query = LogicalPlanBuilder::from(table_scan_2) + .project(vec![col("a")])? + .filter(exists(Arc::new(subquery)))? + .limit(100)? + .offset(10)? + .build()?; + + // Limit pushdown Not supported in sub_query + let expected = "Offset: 10\ + \n Limit: 110\ + \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\ + \n Projection: #test1.a\ + \n TableScan: test1 projection=None)\ + \n Projection: #test2.a\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&outer_query, expected); + + Ok(()) + } } diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 85c2d8f0cc15..049f9b7d951d 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = self.order_by(plan, query.order_by)?; - let plan: LogicalPlan = self.offset(plan, query.offset)?; + let plan: LogicalPlan = self.limit(plan, query.limit)?; - self.limit(plan, query.limit) + //make limit as offset's input will enable limit push down simply + self.offset(plan, query.offset) } fn set_expr_to_plan( @@ -2647,6 +2648,9 @@ fn parse_sql_number(n: &str) -> Result { #[cfg(test)] mod tests { use crate::datasource::empty::EmptyTable; + use crate::execution::context::ExecutionProps; + use crate::optimizer::limit_push_down::LimitPushDown; + use crate::optimizer::optimizer::OptimizerRule; use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser}; use datafusion_expr::{ScalarFunctionImplementation, Volatility}; @@ -4375,6 +4379,16 @@ mod tests { assert_eq!(format!("{:?}", plan), expected); } + fn quick_test_with_limit_pushdown(sql: &str, expected: &str) { + let plan = logical_plan(sql).unwrap(); + let rule = LimitPushDown::new(); + let optimized_plan = rule + .optimize(&plan, &ExecutionProps::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + } + struct MockContextProvider {} impl ContextProvider for MockContextProvider { @@ -4823,10 +4837,10 @@ mod tests { } #[test] - fn test_offset_with_limit() { + fn test_zero_offset_with_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;"; - let expected = "Limit: 5\ - \n Offset: 0\ + let expected = "Offset: 0\ + \n Limit: 5\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; @@ -4847,6 +4861,29 @@ mod tests { quick_test(sql, expected); } + #[test] + fn test_offset_after_limit_with_limit_push() { + let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;"; + let expected = "Offset: 3\ + \n Limit: 8\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + + quick_test_with_limit_pushdown(sql, expected); + } + + #[test] + fn test_offset_before_limit_with_limit_push() { + let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;"; + let expected = "Offset: 3\ + \n Limit: 8\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + quick_test_with_limit_pushdown(sql, expected); + } + fn assert_field_not_found(err: DataFusionError, name: &str) { match err { DataFusionError::SchemaError { .. } => {