From 35bbdd5a49ea16d3c971beac33542bb07a9b08ae Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 18 May 2022 17:59:23 +0800 Subject: [PATCH 1/8] support offset push down --- .../core/src/optimizer/limit_push_down.rs | 148 +++++++++++++++++- 1 file changed, 143 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 0c68f1761601..237e4015ce4e 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection; use crate::logical_plan::{Limit, TableScan}; use crate::logical_plan::{LogicalPlan, Union}; use crate::optimizer::optimizer::OptimizerRule; +use datafusion_expr::logical_plan::Offset; use std::sync::Arc; /// Optimization rule that tries pushes down LIMIT n @@ -43,18 +44,24 @@ fn limit_push_down( upper_limit: Option, plan: &LogicalPlan, _execution_props: &ExecutionProps, + is_offset: bool, ) -> Result { match (plan, upper_limit) { (LogicalPlan::Limit(Limit { n, input }), upper_limit) => { - let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n); + let new_limit: usize = if is_offset { + *n + } else { + upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n) + }; Ok(LogicalPlan::Limit(Limit { - n: smallest, + n: new_limit, // push down limit to plan (minimum of upper limit and current limit) input: Arc::new(limit_push_down( _optimizer, - Some(smallest), + Some(new_limit), input.as_ref(), _execution_props, + false, )?), })) } @@ -95,6 +102,7 @@ fn limit_push_down( upper_limit, input.as_ref(), _execution_props, + false, )?), schema: schema.clone(), alias: alias.clone(), @@ -119,6 +127,7 @@ fn limit_push_down( Some(upper_limit), x, _execution_props, + false, )?), })) }) @@ -129,6 +138,21 @@ fn limit_push_down( schema: schema.clone(), })) } + // offset 5 limit 10 then push limit 15 (5 + 10) + // Limit should always be Offset's input + (LogicalPlan::Offset(Offset { offset, input }), Some(upper_limit)) => { + let new_limit = offset + upper_limit; + Ok(LogicalPlan::Offset(Offset { + offset: *offset, + input: Arc::new(limit_push_down( + _optimizer, + Some(new_limit), + input.as_ref(), + _execution_props, + true, + )?), + })) + } // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down _ => { @@ -138,7 +162,9 @@ fn limit_push_down( let inputs = plan.inputs(); let new_inputs = inputs .iter() - .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props)) + .map(|plan| { + limit_push_down(_optimizer, None, plan, _execution_props, false) + }) .collect::>>()?; utils::from_plan(plan, &expr, &new_inputs) @@ -152,7 +178,7 @@ impl OptimizerRule for LimitPushDown { plan: &LogicalPlan, execution_props: &ExecutionProps, ) -> Result { - limit_push_down(self, None, plan, execution_props) + limit_push_down(self, None, plan, execution_props, false) } fn name(&self) -> &str { @@ -278,4 +304,116 @@ mod test { Ok(()) } + + #[test] + fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .offset(10)? + .limit(1000)? + .build()?; + + // Should push the limit down to table provider + // When it has a select + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Projection: #test.a\ + \n TableScan: test projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_pushdown_with_offset_after_limit() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a")])? + .limit(1000)? + .offset(10)? + .build()?; + + // Not push the limit down to table provider + // When offset after limit + let expected = "Offset: 10\ + \n Limit: 1000\ + \n Projection: #test.a\ + \n TableScan: test projection=None, limit=1000"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .offset(10)? + .limit(1000)? + .limit(10)? + .build()?; + + // Should push down the smallest limit + // Towards table scan + // This rule doesn't replace multiple limits + let expected = "Limit: 10\ + \n Limit: 10\ + \n Offset: 10\ + \n TableScan: test projection=None, limit=20"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![max(col("b"))])? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit should *not* push down aggregate node + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_should_push_down_with_offset_union() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(table_scan.clone()) + .union(LogicalPlanBuilder::from(table_scan).build()?)? + .offset(10)? + .limit(1000)? + .build()?; + + // Limit should push down through union + let expected = "Limit: 1000\ + \n Offset: 10\ + \n Union\ + \n Limit: 1010\ + \n TableScan: test projection=None, limit=1010\ + \n Limit: 1010\ + \n TableScan: test projection=None, limit=1010"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } } From a6aaed570f96a4ceb1afb503b4eda2ef4fd1e568 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 18 May 2022 18:28:59 +0800 Subject: [PATCH 2/8] change the limit and offset order --- datafusion/core/src/sql/planner.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 85c2d8f0cc15..34f62b971329 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = self.order_by(plan, query.order_by)?; - let plan: LogicalPlan = self.offset(plan, query.offset)?; + let plan: LogicalPlan = self.limit(plan, query.limit)?; - self.limit(plan, query.limit) + //make limit as offset's input will enable limit push down simply + self.offset(plan, query.offset) } fn set_expr_to_plan( @@ -4825,8 +4826,8 @@ mod tests { #[test] fn test_offset_with_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;"; - let expected = "Limit: 5\ - \n Offset: 0\ + let expected = "Offset: 0\ + \n Limit: 5\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; From 302fd4b1db48be714ffd6b13e50c93716cf1fdc9 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 18 May 2022 21:22:11 +0800 Subject: [PATCH 3/8] add test for join and subquery --- .../core/src/optimizer/limit_push_down.rs | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 237e4015ce4e..9189866e27bf 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -188,6 +188,8 @@ impl OptimizerRule for LimitPushDown { #[cfg(test)] mod test { + use datafusion_expr::exists; + use datafusion_expr::logical_plan::JoinType; use super::*; use crate::{ logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder}, @@ -416,4 +418,59 @@ mod test { Ok(()) } + + #[test] + fn limit_should_push_down_with_offset_join() -> Result<()> { + let table_scan_1 = test_table_scan()?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let plan = LogicalPlanBuilder::from(table_scan_1) + .join(&LogicalPlanBuilder::from(table_scan_2).build()?, JoinType::Left, (vec!["a"], vec!["a"]))? + .limit(1000)? + .offset(10)? + .build()?; + + // Limit pushdown Not supported in Join + let expected = "Offset: 10\ + \n Limit: 1000\ + \n Left Join: #test.a = #test2.a\ + \n TableScan: test projection=None\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } + + #[test] + fn limit_should_push_down_with_offset_sub_query() -> Result<()> { + + let table_scan_1 = test_table_scan_with_name("test1")?; + let table_scan_2 = test_table_scan_with_name("test2")?; + + let subquery = LogicalPlanBuilder::from(table_scan_1) + .project(vec![col("a")])? + .filter(col("a").eq(col("test1.a")))? + .build()?; + + let outer_query = LogicalPlanBuilder::from(table_scan_2) + .project(vec![col("a")])? + .filter(exists(Arc::new(subquery)))? + .limit(100)? + .offset(10)? + .build()?; + + // Limit pushdown Not supported in sub_query + let expected = "Offset: 10\ + \n Limit: 100\ + \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\ + \n Projection: #test1.a\ + \n TableScan: test1 projection=None)\ + \n Projection: #test2.a\ + \n TableScan: test2 projection=None"; + + assert_optimized_plan_eq(&outer_query, expected); + + Ok(()) + } } From 27879ccc7aaff50c906db34008ccd35d061beda4 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 18 May 2022 21:23:06 +0800 Subject: [PATCH 4/8] fmt --- datafusion/core/src/optimizer/limit_push_down.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 9189866e27bf..6dc26a2baf07 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -188,13 +188,13 @@ impl OptimizerRule for LimitPushDown { #[cfg(test)] mod test { - use datafusion_expr::exists; - use datafusion_expr::logical_plan::JoinType; use super::*; use crate::{ logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder}, test::*, }; + use datafusion_expr::exists; + use datafusion_expr::logical_plan::JoinType; fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let rule = LimitPushDown::new(); @@ -425,7 +425,11 @@ mod test { let table_scan_2 = test_table_scan_with_name("test2")?; let plan = LogicalPlanBuilder::from(table_scan_1) - .join(&LogicalPlanBuilder::from(table_scan_2).build()?, JoinType::Left, (vec!["a"], vec!["a"]))? + .join( + &LogicalPlanBuilder::from(table_scan_2).build()?, + JoinType::Left, + (vec!["a"], vec!["a"]), + )? .limit(1000)? .offset(10)? .build()?; @@ -444,7 +448,6 @@ mod test { #[test] fn limit_should_push_down_with_offset_sub_query() -> Result<()> { - let table_scan_1 = test_table_scan_with_name("test1")?; let table_scan_2 = test_table_scan_with_name("test2")?; From 365dbf041a063130d008261126cf7b05cf162a60 Mon Sep 17 00:00:00 2001 From: Yang Jiang <37145547+Ted-Jiang@users.noreply.github.com> Date: Thu, 19 May 2022 10:11:37 +0800 Subject: [PATCH 5/8] Apply suggestions from code review Co-authored-by: Andy Grove --- datafusion/core/src/optimizer/limit_push_down.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 6dc26a2baf07..4f7aa57df77a 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -420,7 +420,7 @@ mod test { } #[test] - fn limit_should_push_down_with_offset_join() -> Result<()> { + fn limit_should_not_push_down_with_offset_join() -> Result<()> { let table_scan_1 = test_table_scan()?; let table_scan_2 = test_table_scan_with_name("test2")?; @@ -447,7 +447,7 @@ mod test { } #[test] - fn limit_should_push_down_with_offset_sub_query() -> Result<()> { + fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> { let table_scan_1 = test_table_scan_with_name("test1")?; let table_scan_2 = test_table_scan_with_name("test2")?; From 20366bd32654ee0691bae3598f4f41be7960de48 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 20 May 2022 14:06:43 +0800 Subject: [PATCH 6/8] add sql test in planner.rs --- datafusion/core/src/sql/planner.rs | 39 +++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 34f62b971329..4ebbd6708acf 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -2648,6 +2648,9 @@ fn parse_sql_number(n: &str) -> Result { #[cfg(test)] mod tests { use crate::datasource::empty::EmptyTable; + use crate::execution::context::ExecutionProps; + use crate::optimizer::limit_push_down::LimitPushDown; + use crate::optimizer::optimizer::OptimizerRule; use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser}; use datafusion_expr::{ScalarFunctionImplementation, Volatility}; @@ -4376,6 +4379,17 @@ mod tests { assert_eq!(format!("{:?}", plan), expected); } + fn quick_test_with_limit_pushdown(sql: &str, expected: &str) { + let plan = logical_plan(sql).unwrap(); + assert_eq!(format!("{:?}", plan), expected); + let rule = LimitPushDown::new(); + let optimized_plan = rule + .optimize(&plan, &ExecutionProps::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + } + struct MockContextProvider {} impl ContextProvider for MockContextProvider { @@ -4824,7 +4838,7 @@ mod tests { } #[test] - fn test_offset_with_limit() { + fn test_zero_offset_with_limit() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;"; let expected = "Offset: 0\ \n Limit: 5\ @@ -4848,6 +4862,29 @@ mod tests { quick_test(sql, expected); } + #[test] + fn test_offset_after_limit_with_limit_push() { + let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;"; + let expected = "Offset: 3\ + \n Limit: 5\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + + quick_test_with_limit_pushdown(sql, expected); + } + + #[test] + fn test_offset_before_limit_with_limit_push() { + let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;"; + let expected = "Offset: 3\ + \n Limit: 5\ + \n Projection: #person.id\ + \n Filter: #person.id > Int64(100)\ + \n TableScan: person projection=None"; + quick_test_with_limit_pushdown(sql, expected); + } + fn assert_field_not_found(err: DataFusionError, name: &str) { match err { DataFusionError::SchemaError { .. } => { From 59a18137e8a827aecaa2d9c10f5e50fa6df72d28 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 20 May 2022 14:06:43 +0800 Subject: [PATCH 7/8] add sql test in planner.rs --- .../core/src/optimizer/limit_push_down.rs | 20 ++++++++++--------- datafusion/core/src/sql/planner.rs | 5 ++--- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 4f7aa57df77a..d8b1eb38d926 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -49,7 +49,7 @@ fn limit_push_down( match (plan, upper_limit) { (LogicalPlan::Limit(Limit { n, input }), upper_limit) => { let new_limit: usize = if is_offset { - *n + *n + upper_limit.unwrap_or(0) } else { upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n) }; @@ -140,8 +140,12 @@ fn limit_push_down( } // offset 5 limit 10 then push limit 15 (5 + 10) // Limit should always be Offset's input - (LogicalPlan::Offset(Offset { offset, input }), Some(upper_limit)) => { - let new_limit = offset + upper_limit; + (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => { + let new_limit: usize = if upper_limit.is_some() { + upper_limit.unwrap() + *offset + } else { + *offset + }; Ok(LogicalPlan::Offset(Offset { offset: *offset, input: Arc::new(limit_push_down( @@ -339,12 +343,10 @@ mod test { .offset(10)? .build()?; - // Not push the limit down to table provider - // When offset after limit let expected = "Offset: 10\ - \n Limit: 1000\ + \n Limit: 1010\ \n Projection: #test.a\ - \n TableScan: test projection=None, limit=1000"; + \n TableScan: test projection=None, limit=1010"; assert_optimized_plan_eq(&plan, expected); @@ -436,7 +438,7 @@ mod test { // Limit pushdown Not supported in Join let expected = "Offset: 10\ - \n Limit: 1000\ + \n Limit: 1010\ \n Left Join: #test.a = #test2.a\ \n TableScan: test projection=None\ \n TableScan: test2 projection=None"; @@ -465,7 +467,7 @@ mod test { // Limit pushdown Not supported in sub_query let expected = "Offset: 10\ - \n Limit: 100\ + \n Limit: 110\ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\ \n Projection: #test1.a\ \n TableScan: test1 projection=None)\ diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs index 4ebbd6708acf..049f9b7d951d 100644 --- a/datafusion/core/src/sql/planner.rs +++ b/datafusion/core/src/sql/planner.rs @@ -4381,7 +4381,6 @@ mod tests { fn quick_test_with_limit_pushdown(sql: &str, expected: &str) { let plan = logical_plan(sql).unwrap(); - assert_eq!(format!("{:?}", plan), expected); let rule = LimitPushDown::new(); let optimized_plan = rule .optimize(&plan, &ExecutionProps::new()) @@ -4866,7 +4865,7 @@ mod tests { fn test_offset_after_limit_with_limit_push() { let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;"; let expected = "Offset: 3\ - \n Limit: 5\ + \n Limit: 8\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; @@ -4878,7 +4877,7 @@ mod tests { fn test_offset_before_limit_with_limit_push() { let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;"; let expected = "Offset: 3\ - \n Limit: 5\ + \n Limit: 8\ \n Projection: #person.id\ \n Filter: #person.id > Int64(100)\ \n TableScan: person projection=None"; From 613de258e003dce4503e7c3d08a9003818bfdfe9 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 20 May 2022 15:58:40 +0800 Subject: [PATCH 8/8] fix clippy --- datafusion/core/src/optimizer/limit_push_down.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index d8b1eb38d926..a52fd40df801 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -141,8 +141,8 @@ fn limit_push_down( // offset 5 limit 10 then push limit 15 (5 + 10) // Limit should always be Offset's input (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => { - let new_limit: usize = if upper_limit.is_some() { - upper_limit.unwrap() + *offset + let new_limit = if let Some(ul) = upper_limit { + ul + *offset } else { *offset };