-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support arbitrary expressions in LIMIT
plan
#13028
Changes from all commits
2789af7
58f7962
f4c8675
00f5d2a
9323702
61f34d8
b0bcf94
e6d1297
d6da97e
ec1d62c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; | |
use datafusion_common::{ | ||
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, | ||
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, | ||
FunctionalDependencies, ParamValues, Result, TableReference, UnnestOptions, | ||
FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference, | ||
UnnestOptions, | ||
}; | ||
use indexmap::IndexSet; | ||
|
||
|
@@ -960,11 +961,21 @@ impl LogicalPlan { | |
.map(LogicalPlan::SubqueryAlias) | ||
} | ||
LogicalPlan::Limit(Limit { skip, fetch, .. }) => { | ||
self.assert_no_expressions(expr)?; | ||
let old_expr_len = skip.iter().chain(fetch.iter()).count(); | ||
if old_expr_len != expr.len() { | ||
return internal_err!( | ||
"Invalid number of new Limit expressions: expected {}, got {}", | ||
old_expr_len, | ||
expr.len() | ||
); | ||
} | ||
// Pop order is same as the order returned by `LogicalPlan::expressions()` | ||
let new_skip = skip.as_ref().and(expr.pop()); | ||
let new_fetch = fetch.as_ref().and(expr.pop()); | ||
let input = self.only_input(inputs)?; | ||
Ok(LogicalPlan::Limit(Limit { | ||
skip: *skip, | ||
fetch: *fetch, | ||
skip: new_skip.map(Box::new), | ||
fetch: new_fetch.map(Box::new), | ||
input: Arc::new(input), | ||
})) | ||
} | ||
|
@@ -1339,7 +1350,10 @@ impl LogicalPlan { | |
LogicalPlan::RecursiveQuery(_) => None, | ||
LogicalPlan::Subquery(_) => None, | ||
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(), | ||
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch, | ||
LogicalPlan::Limit(limit) => match limit.get_fetch_type() { | ||
Ok(FetchType::Literal(s)) => s, | ||
_ => None, | ||
}, | ||
LogicalPlan::Distinct( | ||
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), | ||
) => input.max_rows(), | ||
|
@@ -1909,16 +1923,20 @@ impl LogicalPlan { | |
) | ||
} | ||
}, | ||
LogicalPlan::Limit(Limit { | ||
ref skip, | ||
ref fetch, | ||
.. | ||
}) => { | ||
LogicalPlan::Limit(limit) => { | ||
// Attempt to display `skip` and `fetch` as literals if possible, otherwise as expressions. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Display literals as before to avoid breaking too many tests. Maybe we could display them in the expr-style through a follow-up PR. For example, |
||
let skip_str = match limit.get_skip_type() { | ||
Ok(SkipType::Literal(n)) => n.to_string(), | ||
_ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()), | ||
}; | ||
let fetch_str = match limit.get_fetch_type() { | ||
Ok(FetchType::Literal(Some(n))) => n.to_string(), | ||
Ok(FetchType::Literal(None)) => "None".to_string(), | ||
_ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()) | ||
}; | ||
write!( | ||
f, | ||
"Limit: skip={}, fetch={}", | ||
skip, | ||
fetch.map_or_else(|| "None".to_string(), |x| x.to_string()) | ||
"Limit: skip={}, fetch={}", skip_str,fetch_str, | ||
) | ||
} | ||
LogicalPlan::Subquery(Subquery { .. }) => { | ||
|
@@ -2778,14 +2796,71 @@ impl PartialOrd for Extension { | |
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] | ||
pub struct Limit { | ||
/// Number of rows to skip before fetch | ||
pub skip: usize, | ||
pub skip: Option<Box<Expr>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the constraints on the expression that can be used here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to do the constant folding when building Limit node, so that logical plan structure remains intact? See also #12723 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it can be any integer expression, and can also contain column references. Both PostgreSQL and DuckDB support v1.1.1-dev319 af39bd0dcf
D create table t as values(1);
D select 1 limit (select max(col0) from t);
┌───────┐
│ 1 │
│ int32 │
├───────┤
│ 1 │
└───────┘ |
||
/// Maximum number of rows to fetch, | ||
/// None means fetching all rows | ||
pub fetch: Option<usize>, | ||
pub fetch: Option<Box<Expr>>, | ||
/// The logical plan | ||
pub input: Arc<LogicalPlan>, | ||
} | ||
|
||
/// Different types of skip expression in Limit plan. | ||
pub enum SkipType { | ||
/// The skip expression is a literal value. | ||
Literal(usize), | ||
/// Currently only supports expressions that can be folded into constants. | ||
UnsupportedExpr, | ||
Comment on lines
+2811
to
+2812
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Limit is a relational operator, so this will always need to be constant-foldable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but some expressions like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jonahgao good point. I also thought about this, but ignored, assuming we don't plan to support this. We could have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constant folding requires the Another reason the Limit node needs to contain expressions is to support Prepare statements. Issue #12294 requires the Limit node to use |
||
} | ||
|
||
/// Different types of fetch expression in Limit plan. | ||
pub enum FetchType { | ||
/// The fetch expression is a literal value. | ||
/// `Literal(None)` means the fetch expression is not provided. | ||
Literal(Option<usize>), | ||
/// Currently only supports expressions that can be folded into constants. | ||
UnsupportedExpr, | ||
} | ||
|
||
impl Limit { | ||
/// Get the skip type from the limit plan. | ||
pub fn get_skip_type(&self) -> Result<SkipType> { | ||
match self.skip.as_deref() { | ||
Some(expr) => match *expr { | ||
Expr::Literal(ScalarValue::Int64(s)) => { | ||
// `skip = NULL` is equivalent to `skip = 0` | ||
let s = s.unwrap_or(0); | ||
if s >= 0 { | ||
Ok(SkipType::Literal(s as usize)) | ||
} else { | ||
plan_err!("OFFSET must be >=0, '{}' was provided", s) | ||
} | ||
} | ||
_ => Ok(SkipType::UnsupportedExpr), | ||
}, | ||
// `skip = None` is equivalent to `skip = 0` | ||
None => Ok(SkipType::Literal(0)), | ||
} | ||
} | ||
|
||
/// Get the fetch type from the limit plan. | ||
pub fn get_fetch_type(&self) -> Result<FetchType> { | ||
match self.fetch.as_deref() { | ||
Some(expr) => match *expr { | ||
Expr::Literal(ScalarValue::Int64(Some(s))) => { | ||
if s >= 0 { | ||
Ok(FetchType::Literal(Some(s as usize))) | ||
} else { | ||
plan_err!("LIMIT must be >= 0, '{}' was provided", s) | ||
} | ||
} | ||
Expr::Literal(ScalarValue::Int64(None)) => Ok(FetchType::Literal(None)), | ||
_ => Ok(FetchType::UnsupportedExpr), | ||
}, | ||
None => Ok(FetchType::Literal(None)), | ||
} | ||
} | ||
} | ||
|
||
/// Removes duplicate rows from the input | ||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] | ||
pub enum Distinct { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍