Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into sgrebnov/improve-join…
Browse files Browse the repository at this point in the history
…-upstream
  • Loading branch information
sgrebnov committed Oct 28, 2024
2 parents 936b076 + 1fd6116 commit e560ece
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 53 deletions.
7 changes: 7 additions & 0 deletions datafusion/core/src/bin/print_functions_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ fn print_docs(
);
}

if let Some(alt_syntax) = &documentation.alternative_syntax {
let _ = writeln!(docs, "#### Alternative Syntax\n");
for syntax in alt_syntax {
let _ = writeln!(docs, "```sql\n{}\n```", syntax);
}
}

// next, aliases
if !f.get_aliases().is_empty() {
let _ = writeln!(docs, "#### Aliases");
Expand Down
9 changes: 9 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,15 @@ pub fn validate_unique_names<'a>(
/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
if left_plan.schema().fields().len() != right_plan.schema().fields().len() {
return plan_err!(
"UNION queries have different number of columns: \
left has {} columns whereas right has {} columns",
left_plan.schema().fields().len(),
right_plan.schema().fields().len()
);
}

// Temporarily use the schema from the left input and later rely on the analyzer to
// coerce the two schemas into a common one.

Expand Down
13 changes: 13 additions & 0 deletions datafusion/expr/src/udf_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct Documentation {
/// Left member of a pair is the argument name, right is a
/// description for the argument
pub arguments: Option<Vec<(String, String)>>,
/// A list of alternative syntax examples for a function
pub alternative_syntax: Option<Vec<String>>,
/// Related functions if any. Values should match the related
/// udf's name exactly. Related udf's must be of the same
/// UDF type (scalar, aggregate or window) for proper linking to
Expand Down Expand Up @@ -96,6 +98,7 @@ pub struct DocumentationBuilder {
pub syntax_example: Option<String>,
pub sql_example: Option<String>,
pub arguments: Option<Vec<(String, String)>>,
pub alternative_syntax: Option<Vec<String>>,
pub related_udfs: Option<Vec<String>>,
}

Expand All @@ -107,6 +110,7 @@ impl DocumentationBuilder {
syntax_example: None,
sql_example: None,
arguments: None,
alternative_syntax: None,
related_udfs: None,
}
}
Expand Down Expand Up @@ -172,6 +176,13 @@ impl DocumentationBuilder {
self.with_argument(arg_name, description)
}

pub fn with_alternative_syntax(mut self, syntax_name: impl Into<String>) -> Self {
let mut alternative_syntax_array = self.alternative_syntax.unwrap_or_default();
alternative_syntax_array.push(syntax_name.into());
self.alternative_syntax = Some(alternative_syntax_array);
self
}

pub fn with_related_udf(mut self, related_udf: impl Into<String>) -> Self {
let mut related = self.related_udfs.unwrap_or_default();
related.push(related_udf.into());
Expand All @@ -186,6 +197,7 @@ impl DocumentationBuilder {
syntax_example,
sql_example,
arguments,
alternative_syntax,
related_udfs,
} = self;

Expand All @@ -205,6 +217,7 @@ impl DocumentationBuilder {
syntax_example: syntax_example.unwrap(),
sql_example,
arguments,
alternative_syntax,
related_udfs,
})
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/unicode/strpos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ fn get_strpos_doc() -> &'static Documentation {
```"#)
.with_standard_argument("str", Some("String"))
.with_argument("substr", "Substring expression to search for.")
.with_alternative_syntax("position(substr in origstr)")
.build()
.unwrap()
})
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ strum = { version = "0.26.1", features = ["derive"] }
ctor = { workspace = true }
datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true }
datafusion-functions-window = { workspace = true }
env_logger = { workspace = true }
paste = "^1.0"
Expand Down
35 changes: 34 additions & 1 deletion datafusion/sql/src/unparser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::expr::Unnest;
use sqlparser::ast::Value::SingleQuotedString;
use sqlparser::ast::{
self, BinaryOperator, Expr as AstExpr, Function, Ident, Interval, ObjectName,
Expand Down Expand Up @@ -466,7 +467,7 @@ impl Unparser<'_> {
Ok(ast::Expr::Value(ast::Value::Placeholder(p.id.to_string())))
}
Expr::OuterReferenceColumn(_, col) => self.col_to_sql(col),
Expr::Unnest(_) => not_impl_err!("Unsupported Expr conversion: {expr:?}"),
Expr::Unnest(unnest) => self.unnest_to_sql(unnest),
}
}

Expand Down Expand Up @@ -1340,6 +1341,29 @@ impl Unparser<'_> {
}
}

/// Converts an UNNEST operation to an AST expression by wrapping it as a function call,
/// since there is no direct representation for UNNEST in the AST.
fn unnest_to_sql(&self, unnest: &Unnest) -> Result<ast::Expr> {
let args = self.function_args_to_sql(std::slice::from_ref(&unnest.expr))?;

Ok(ast::Expr::Function(Function {
name: ast::ObjectName(vec![Ident {
value: "UNNEST".to_string(),
quote_style: None,
}]),
args: ast::FunctionArguments::List(ast::FunctionArgumentList {
duplicate_treatment: None,
args,
clauses: vec![],
}),
filter: None,
null_treatment: None,
over: None,
within_group: vec![],
parameters: ast::FunctionArguments::None,
}))
}

fn arrow_dtype_to_ast_dtype(&self, data_type: &DataType) -> Result<ast::DataType> {
match data_type {
DataType::Null => {
Expand Down Expand Up @@ -1855,6 +1879,15 @@ mod tests {
}),
r#"CAST(a AS DECIMAL(12,0))"#,
),
(
Expr::Unnest(Unnest {
expr: Box::new(Expr::Column(Column {
relation: Some(TableReference::partial("schema", "table")),
name: "array_col".to_string(),
})),
}),
r#"UNNEST("schema"."table".array_col)"#,
),
];

for (expr, expected) in tests {
Expand Down
54 changes: 42 additions & 12 deletions datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use super::{
subquery_alias_inner_query_and_columns, TableAliasRewriter,
},
utils::{
find_agg_node_within_select, find_window_nodes_within_select,
try_transform_to_simple_table_scan_with_filters, unproject_sort_expr,
unproject_window_exprs,
find_agg_node_within_select, find_unnest_node_within_select,
find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
},
Unparser,
};
Expand Down Expand Up @@ -174,15 +174,24 @@ impl Unparser<'_> {
p: &Projection,
select: &mut SelectBuilder,
) -> Result<()> {
let mut exprs = p.expr.clone();

// If an Unnest node is found within the select, find and unproject the unnest column
if let Some(unnest) = find_unnest_node_within_select(plan) {
exprs = exprs
.into_iter()
.map(|e| unproject_unnest_expr(e, unnest))
.collect::<Result<Vec<_>>>()?;
};

match (
find_agg_node_within_select(plan, true),
find_window_nodes_within_select(plan, None, true),
) {
(Some(agg), window) => {
let window_option = window.as_deref();
let items = p
.expr
.iter()
let items = exprs
.into_iter()
.map(|proj_expr| {
let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
self.select_item_to_sql(&unproj)
Expand All @@ -199,9 +208,8 @@ impl Unparser<'_> {
));
}
(None, Some(window)) => {
let items = p
.expr
.iter()
let items = exprs
.into_iter()
.map(|proj_expr| {
let unproj = unproject_window_exprs(proj_expr, &window)?;
self.select_item_to_sql(&unproj)
Expand All @@ -211,8 +219,7 @@ impl Unparser<'_> {
select.projection(items);
}
_ => {
let items = p
.expr
let items = exprs
.iter()
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -319,7 +326,8 @@ impl Unparser<'_> {
if let Some(agg) =
find_agg_node_within_select(plan, select.already_projected())
{
let unprojected = unproject_agg_exprs(&filter.predicate, agg, None)?;
let unprojected =
unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
let filter_expr = self.expr_to_sql(&unprojected)?;
select.having(Some(filter_expr));
} else {
Expand Down Expand Up @@ -652,6 +660,28 @@ impl Unparser<'_> {
Ok(())
}
LogicalPlan::Extension(_) => not_impl_err!("Unsupported operator: {plan:?}"),
LogicalPlan::Unnest(unnest) => {
if !unnest.struct_type_columns.is_empty() {
return internal_err!(
"Struct type columns are not currently supported in UNNEST: {:?}",
unnest.struct_type_columns
);
}

// In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
// Otherwise, there will be a duplicate SELECT clause.
// | Projection: table.col1, UNNEST(table.col2)
// | Unnest: UNNEST(table.col2)
// | Projection: table.col1, table.col2 AS UNNEST(table.col2)
// | Filter: table.col3 = Int64(3)
// | TableScan: table projection=None
if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
// continue with projection input
self.select_to_sql_recursively(&p.input, query, select, relation)
} else {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
_ => not_impl_err!("Unsupported operator: {plan:?}"),
}
}
Expand Down
Loading

0 comments on commit e560ece

Please sign in to comment.