-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566
Changes from 3 commits
15903d4
19bc990
c19381a
b2804fa
2504a80
b6a5ee4
7f2c641
a27ac77
f9ab83f
22fac6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,15 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::fmt::Write; | ||
|
||
use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; | ||
use crate::utils::{ | ||
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, | ||
resolve_columns, resolve_positions_to_exprs, | ||
}; | ||
use datafusion_common::{DataFusionError, Result}; | ||
use datafusion_expr::expr::*; | ||
use datafusion_expr::expr_rewriter::{ | ||
normalize_col, normalize_col_with_schemas_and_ambiguity_check, | ||
}; | ||
|
@@ -31,7 +34,8 @@ use datafusion_expr::utils::{ | |
}; | ||
use datafusion_expr::Expr::Alias; | ||
use datafusion_expr::{ | ||
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, | ||
BinaryExpr, Cast, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, | ||
Partitioning, TryCast, | ||
}; | ||
|
||
use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType}; | ||
|
@@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |
.iter() | ||
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan)) | ||
.collect::<Result<Vec<Expr>>>()?; | ||
|
||
if select.into.is_some() { | ||
for expr in select_exprs_post_aggr.iter_mut() { | ||
if let Expr::Column(_) = expr.clone() { | ||
*expr = expr.clone().alias(physical_name(expr)?); | ||
} | ||
} | ||
} | ||
plan | ||
}; | ||
|
||
|
@@ -555,3 +565,288 @@ fn match_window_definitions( | |
} | ||
Ok(()) | ||
} | ||
|
||
fn create_function_physical_name( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this functionality need to remain in sync with the creation of physical names? |
||
fun: &str, | ||
distinct: bool, | ||
args: &[Expr], | ||
) -> Result<String> { | ||
let names: Vec<String> = args | ||
.iter() | ||
.map(|e| create_physical_name(e, false)) | ||
.collect::<Result<_>>()?; | ||
|
||
let distinct_str = match distinct { | ||
true => "DISTINCT ", | ||
false => "", | ||
}; | ||
Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) | ||
} | ||
|
||
fn physical_name(e: &Expr) -> Result<String> { | ||
create_physical_name(e, true) | ||
} | ||
|
||
fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> { | ||
match e { | ||
Expr::Column(c) => { | ||
if is_first_expr { | ||
Ok(c.name.clone()) | ||
} else { | ||
Ok(c.flat_name()) | ||
} | ||
} | ||
Expr::Alias(_, name) => Ok(name.clone()), | ||
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")), | ||
Expr::Literal(value) => Ok(format!("{value:?}")), | ||
Expr::BinaryExpr(BinaryExpr { left, op, right }) => { | ||
let left = create_physical_name(left, false)?; | ||
let right = create_physical_name(right, false)?; | ||
Ok(format!("{left} {op} {right}")) | ||
} | ||
Expr::Case(case) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, this appears to the same code as https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77 If we ever changed the code in phsical-expr and did not change this code, would that cause problems? |
||
let mut name = "CASE ".to_string(); | ||
if let Some(e) = &case.expr { | ||
let _ = write!(name, "{e:?} "); | ||
} | ||
for (w, t) in &case.when_then_expr { | ||
let _ = write!(name, "WHEN {w:?} THEN {t:?} "); | ||
} | ||
if let Some(e) = &case.else_expr { | ||
let _ = write!(name, "ELSE {e:?} "); | ||
} | ||
name += "END"; | ||
Ok(name) | ||
} | ||
Expr::Cast(Cast { expr, .. }) => { | ||
// CAST does not change the expression name | ||
create_physical_name(expr, false) | ||
} | ||
Expr::TryCast(TryCast { expr, .. }) => { | ||
// CAST does not change the expression name | ||
create_physical_name(expr, false) | ||
} | ||
Expr::Not(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("NOT {expr}")) | ||
} | ||
Expr::Negative(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("(- {expr})")) | ||
} | ||
Expr::IsNull(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS NULL")) | ||
} | ||
Expr::IsNotNull(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS NOT NULL")) | ||
} | ||
Expr::IsTrue(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS TRUE")) | ||
} | ||
Expr::IsFalse(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS FALSE")) | ||
} | ||
Expr::IsUnknown(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS UNKNOWN")) | ||
} | ||
Expr::IsNotTrue(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS NOT TRUE")) | ||
} | ||
Expr::IsNotFalse(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS NOT FALSE")) | ||
} | ||
Expr::IsNotUnknown(expr) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr} IS NOT UNKNOWN")) | ||
} | ||
Expr::GetIndexedField(GetIndexedField { key, expr }) => { | ||
let expr = create_physical_name(expr, false)?; | ||
Ok(format!("{expr}[{key}]")) | ||
} | ||
Expr::ScalarFunction(func) => { | ||
create_function_physical_name(&func.fun.to_string(), false, &func.args) | ||
} | ||
Expr::ScalarUDF(ScalarUDF { fun, args }) => { | ||
create_function_physical_name(&fun.name, false, args) | ||
} | ||
Expr::WindowFunction(WindowFunction { fun, args, .. }) => { | ||
create_function_physical_name(&fun.to_string(), false, args) | ||
} | ||
Expr::AggregateFunction(AggregateFunction { | ||
fun, | ||
distinct, | ||
args, | ||
.. | ||
}) => create_function_physical_name(&fun.to_string(), *distinct, args), | ||
Expr::AggregateUDF(AggregateUDF { | ||
fun, | ||
args, | ||
filter, | ||
order_by, | ||
}) => { | ||
// TODO: Add support for filter and order by in AggregateUDF | ||
if filter.is_some() { | ||
return Err(DataFusionError::Execution( | ||
"aggregate expression with filter is not supported".to_string(), | ||
)); | ||
} | ||
if order_by.is_some() { | ||
return Err(DataFusionError::Execution( | ||
"aggregate expression with order_by is not supported".to_string(), | ||
)); | ||
} | ||
let mut names = Vec::with_capacity(args.len()); | ||
for e in args { | ||
names.push(create_physical_name(e, false)?); | ||
} | ||
Ok(format!("{}({})", fun.name, names.join(","))) | ||
} | ||
Expr::GroupingSet(grouping_set) => match grouping_set { | ||
GroupingSet::Rollup(exprs) => Ok(format!( | ||
"ROLLUP ({})", | ||
exprs | ||
.iter() | ||
.map(|e| create_physical_name(e, false)) | ||
.collect::<Result<Vec<_>>>()? | ||
.join(", ") | ||
)), | ||
GroupingSet::Cube(exprs) => Ok(format!( | ||
"CUBE ({})", | ||
exprs | ||
.iter() | ||
.map(|e| create_physical_name(e, false)) | ||
.collect::<Result<Vec<_>>>()? | ||
.join(", ") | ||
)), | ||
GroupingSet::GroupingSets(lists_of_exprs) => { | ||
let mut strings = vec![]; | ||
for exprs in lists_of_exprs { | ||
let exprs_str = exprs | ||
.iter() | ||
.map(|e| create_physical_name(e, false)) | ||
.collect::<Result<Vec<_>>>()? | ||
.join(", "); | ||
strings.push(format!("({exprs_str})")); | ||
} | ||
Ok(format!("GROUPING SETS ({})", strings.join(", "))) | ||
} | ||
}, | ||
|
||
Expr::InList(InList { | ||
expr, | ||
list, | ||
negated, | ||
}) => { | ||
let expr = create_physical_name(expr, false)?; | ||
let list = list.iter().map(|expr| create_physical_name(expr, false)); | ||
if *negated { | ||
Ok(format!("{expr} NOT IN ({list:?})")) | ||
} else { | ||
Ok(format!("{expr} IN ({list:?})")) | ||
} | ||
} | ||
Expr::Exists { .. } => Err(DataFusionError::NotImplemented( | ||
"EXISTS is not yet supported in the physical plan".to_string(), | ||
)), | ||
Expr::InSubquery(_) => Err(DataFusionError::NotImplemented( | ||
"IN subquery is not yet supported in the physical plan".to_string(), | ||
)), | ||
Expr::ScalarSubquery(_) => Err(DataFusionError::NotImplemented( | ||
"Scalar subqueries are not yet supported in the physical plan".to_string(), | ||
)), | ||
Expr::Between(Between { | ||
expr, | ||
negated, | ||
low, | ||
high, | ||
}) => { | ||
let expr = create_physical_name(expr, false)?; | ||
let low = create_physical_name(low, false)?; | ||
let high = create_physical_name(high, false)?; | ||
if *negated { | ||
Ok(format!("{expr} NOT BETWEEN {low} AND {high}")) | ||
} else { | ||
Ok(format!("{expr} BETWEEN {low} AND {high}")) | ||
} | ||
} | ||
Expr::Like(Like { | ||
negated, | ||
expr, | ||
pattern, | ||
escape_char, | ||
}) => { | ||
let expr = create_physical_name(expr, false)?; | ||
let pattern = create_physical_name(pattern, false)?; | ||
let escape = if let Some(char) = escape_char { | ||
format!("CHAR '{char}'") | ||
} else { | ||
"".to_string() | ||
}; | ||
if *negated { | ||
Ok(format!("{expr} NOT LIKE {pattern}{escape}")) | ||
} else { | ||
Ok(format!("{expr} LIKE {pattern}{escape}")) | ||
} | ||
} | ||
Expr::ILike(Like { | ||
negated, | ||
expr, | ||
pattern, | ||
escape_char, | ||
}) => { | ||
let expr = create_physical_name(expr, false)?; | ||
let pattern = create_physical_name(pattern, false)?; | ||
let escape = if let Some(char) = escape_char { | ||
format!("CHAR '{char}'") | ||
} else { | ||
"".to_string() | ||
}; | ||
if *negated { | ||
Ok(format!("{expr} NOT ILIKE {pattern}{escape}")) | ||
} else { | ||
Ok(format!("{expr} ILIKE {pattern}{escape}")) | ||
} | ||
} | ||
Expr::SimilarTo(Like { | ||
negated, | ||
expr, | ||
pattern, | ||
escape_char, | ||
}) => { | ||
let expr = create_physical_name(expr, false)?; | ||
let pattern = create_physical_name(pattern, false)?; | ||
let escape = if let Some(char) = escape_char { | ||
format!("CHAR '{char}'") | ||
} else { | ||
"".to_string() | ||
}; | ||
if *negated { | ||
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}")) | ||
} else { | ||
Ok(format!("{expr} SIMILAR TO {pattern}{escape}")) | ||
} | ||
} | ||
Expr::Sort { .. } => Err(DataFusionError::Internal( | ||
"Create physical name does not support sort expression".to_string(), | ||
)), | ||
Expr::Wildcard => Err(DataFusionError::Internal( | ||
"Create physical name does not support wildcard".to_string(), | ||
)), | ||
Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( | ||
"Create physical name does not support qualified wildcard".to_string(), | ||
)), | ||
Expr::Placeholder(_) => Err(DataFusionError::Internal( | ||
"Create physical name does not support placeholder".to_string(), | ||
)), | ||
Expr::OuterReferenceColumn(_, _) => Err(DataFusionError::Internal( | ||
"Create physical name does not support OuterReferenceColumn".to_string(), | ||
)), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry if my past comments have been confusing. Here is what I was trying to say earlier in #6566 (comment):
I ran this command to get some logs (with the extra debug in PR #6626):
Here is the content of debug.log: debug.log
From the log, here is the
LogialPlan
that shows theWindowAggr
declares it makes a column namedSUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
(yes that whole thing!)Here is the final
ExecutionPlan
, also showing the same giant column as the declared output name:However, looking at the logs, what the execution plan actually produces a column named
SUM(test_table.c1)
:Thus, what I was trying to say earlier is that I think the root of the problem is the mismatch between what the plans say the field name of the output is and what the field name that the WindowExec is actually producing.
So I think we should fix this bug by resolving the mismatch. Either:
BoundedWindowAggExec
actually producesBoundedWindowAggExec
to produce the field names declared by the `WindowAggExecThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb @berkaysynnada
Would you mind if I also open a small PR for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be very much appreciative, personally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. I have tried both suggestions @alamb:
a- In the
project()
function in select.rs, the final schema will be constructed with a shortened form of the window function.is expanded with that arm:
b- In the
project()
function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change theexpr_as_column_expr()
function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.c-
PushDownProjection
rule again creates new column expressions withdisplay_name()
function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.My opinion: Can we directly change the
display_name()
function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.new version:
create_window_expr()
such that it creates the window name withdisplay_name()
rather thanphysical_name()
, but it makes the plans longer, also lots of test change burden.I would like to wrap up this PR and any thoughts you have would be really helpful. Can you review the alternatives above when you get a chance? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada thanks for checking that.
I was also working on that.
Changing
display_name
was the one I started with but in this case other scenarios will fail. When window plan created the DFS schema check name uniqueness fromdisplay_name
not considering aliases. So this query will failI'm still thinking how to overcome that without breaking changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is what I think we should do. If the overly verbose column (at the output) names are a problem, perhaps we can look into updating the planner to automatically add more reasonable aliases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, we need to get back on column naming convention as currently long names are not user friendly and not useful without aliases in nested queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by realiasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We will go forward with that approach and @berkaysynnada will update you guys of the progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related issues: #6543 and #6758