From 4baea8f0a09a08667c1691396c367b58f7acd1a8 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Fri, 8 Nov 2024 13:45:53 +0100 Subject: [PATCH] Add stacker and recursive --- Cargo.toml | 1 + datafusion-cli/Cargo.lock | 46 +++++++++++++++++++ datafusion/common/Cargo.toml | 1 + datafusion/common/src/tree_node.rs | 7 +++ datafusion/expr/Cargo.toml | 1 + datafusion/expr/src/logical_plan/tree_node.rs | 7 +++ datafusion/optimizer/Cargo.toml | 1 + datafusion/optimizer/src/analyzer/subquery.rs | 2 + .../optimizer/src/common_subexpr_eliminate.rs | 2 + datafusion/sql/Cargo.toml | 1 + datafusion/sql/src/expr/mod.rs | 2 + datafusion/sql/src/query.rs | 3 ++ datafusion/sql/src/set_expr.rs | 2 + 13 files changed, 76 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 91f09102ce48..1ceab5860c68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,7 @@ prost = "0.13.1" prost-derive = "0.13.1" rand = "0.8" regex = "1.8" +recursive = "0.1.1" rstest = "0.23.0" serde_json = "1" sqlparser = { version = "0.51.0", features = ["visitor"] } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5140ae7eb0dc..72cd4e7b65fd 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1306,6 +1306,7 @@ dependencies = [ "object_store", "parquet", "paste", + "recursive", "sqlparser", "tokio", ] @@ -1353,6 +1354,7 @@ dependencies = [ "datafusion-physical-expr-common", "indexmap", "paste", + "recursive", "serde_json", "sqlparser", "strum 0.26.3", @@ -1482,6 +1484,7 @@ dependencies = [ "itertools", "log", "paste", + "recursive", "regex-syntax", ] @@ -1582,6 +1585,7 @@ dependencies = [ "datafusion-expr", "indexmap", "log", + "recursive", "regex", "sqlparser", "strum 0.26.3", @@ -3041,6 +3045,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "psm" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa37f80ca58604976033fae9515a8a2989fc13797d953f7c04fb8fa36a11f205" +dependencies = [ + "cc", +] + [[package]] name = "quad-rand" version = "0.2.2" @@ -3155,6 +3168,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "recursive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0786a43debb760f491b1bc0269fe5e84155353c67482b9e60d0cfb596054b43e" +dependencies = [ + "recursive-proc-macro-impl", + "stacker", +] + +[[package]] +name = "recursive-proc-macro-impl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -3706,6 +3739,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stacker" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799c883d55abdb5e98af1a7b3f23b9b6de8ecada0ecac058672d7635eb48ca7b" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "windows-sys 0.59.0", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 0747672a18f6..55163639bb8a 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -61,6 +61,7 @@ libc = "0.2.140" num_cpus = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } +recursive = { workspace = true } paste = "1.0.15" pyo3 = { version = "0.22.0", optional = true } sqlparser = { workspace = true } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index a0ad1e80be9b..df8abf317809 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -17,6 +17,7 @@ //! [`TreeNode`] for visiting and rewriting expression and plan trees +use recursive::recursive; use std::sync::Arc; use crate::Result; @@ -123,6 +124,7 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` + #[recursive] fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( &'n self, visitor: &mut V, @@ -172,6 +174,7 @@ pub trait TreeNode: Sized { /// TreeNodeRewriter::f_up(ChildNode2) /// TreeNodeRewriter::f_up(ParentNode) /// ``` + #[recursive] fn rewrite>( self, rewriter: &mut R, @@ -194,6 +197,7 @@ pub trait TreeNode: Sized { &'n self, mut f: F, ) -> Result { + #[recursive] fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( node: &'n N, f: &mut F, @@ -228,6 +232,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { + #[recursive] fn transform_down_impl Result>>( node: N, f: &mut F, @@ -251,6 +256,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { + #[recursive] fn transform_up_impl Result>>( node: N, f: &mut F, @@ -365,6 +371,7 @@ pub trait TreeNode: Sized { mut f_down: FD, mut f_up: FU, ) -> Result> { + #[recursive] fn transform_down_up_impl< N: TreeNode, FD: FnMut(N) -> Result>, diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index d7dc1afe4d50..19cd5ed3158b 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -50,6 +50,7 @@ datafusion-functions-window-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } indexmap = { workspace = true } paste = "^1.0" +recursive = { workspace = true } serde_json = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ff2c1ec1d58f..2fb589f15847 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -43,6 +43,7 @@ use crate::{ Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window, }; +use recursive::recursive; use std::ops::Deref; use std::sync::Arc; @@ -773,6 +774,7 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. + #[recursive] pub fn visit_with_subqueries TreeNodeVisitor<'n, Node = Self>>( &self, visitor: &mut V, @@ -789,6 +791,7 @@ impl LogicalPlan { /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. + #[recursive] pub fn rewrite_with_subqueries>( self, rewriter: &mut R, @@ -807,6 +810,7 @@ impl LogicalPlan { &self, mut f: F, ) -> Result { + #[recursive] fn apply_with_subqueries_impl< F: FnMut(&LogicalPlan) -> Result, >( @@ -842,6 +846,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { + #[recursive] fn transform_down_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -867,6 +872,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { + #[recursive] fn transform_up_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -894,6 +900,7 @@ impl LogicalPlan { mut f_down: FD, mut f_up: FU, ) -> Result> { + #[recursive] fn transform_down_up_with_subqueries_impl< FD: FnMut(LogicalPlan) -> Result>, FU: FnMut(LogicalPlan) -> Result>, diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 79a5bb24e918..d915076823e5 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -47,6 +47,7 @@ indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } paste = "1.0.14" +recursive = { workspace = true } regex-syntax = "0.8.0" [dev-dependencies] diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index fa04835f0967..0b54b302c2df 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -17,6 +17,7 @@ use crate::analyzer::check_plan; use crate::utils::collect_subquery_cols; +use recursive::recursive; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, Result}; @@ -128,6 +129,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { } // Recursively check the unsupported outer references in the sub query plan. +#[recursive] fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { if !can_contain_outer_ref && inner_plan.contains_outer_reference() { return plan_err!("Accessing outer reference columns is not allowed in the plan"); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 53a0453d8001..6ba2b48fcc9a 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -22,6 +22,7 @@ use std::fmt::Debug; use std::sync::Arc; use crate::{OptimizerConfig, OptimizerRule}; +use recursive::recursive; use crate::optimizer::ApplyOrder; use crate::utils::NamePreserver; @@ -531,6 +532,7 @@ impl OptimizerRule for CommonSubexprEliminate { None } + #[recursive] fn rewrite( &self, plan: LogicalPlan, diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 1eef1b718ba6..94c3ce97a441 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -48,6 +48,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } indexmap = { workspace = true } log = { workspace = true } +recursive = { workspace = true } regex = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 432e8668c52e..149e094f3d6e 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -20,6 +20,7 @@ use arrow_schema::TimeUnit; use datafusion_expr::planner::{ PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, }; +use recursive::recursive; use sqlparser::ast::{ BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField, Expr as SQLExpr, MapEntry, StructField, Subscript, TrimWhereField, Value, @@ -168,6 +169,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Internal implementation. Use /// [`Self::sql_expr_to_logical_expr`] to plan exprs. + #[recursive] fn sql_expr_to_logical_expr_internal( &self, sql: SQLExpr, diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1ef009132f9e..4ae56a5a162c 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -59,6 +59,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.select_into(plan, select_into) } other => { + // TODO: check why set_expr_to_plan or the functions it calls need so huge + // minimum stack + recursive::set_minimum_stack_size(1024 * 1024); let plan = self.set_expr_to_plan(other, planner_context)?; let oby_exprs = to_order_by_exprs(query.order_by)?; let order_by_rex = self.order_by_to_sort_expr( diff --git a/datafusion/sql/src/set_expr.rs b/datafusion/sql/src/set_expr.rs index 248aad846996..e56ebb4d323f 100644 --- a/datafusion/sql/src/set_expr.rs +++ b/datafusion/sql/src/set_expr.rs @@ -18,9 +18,11 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{not_impl_err, Result}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use recursive::recursive; use sqlparser::ast::{SetExpr, SetOperator, SetQuantifier}; impl<'a, S: ContextProvider> SqlToRel<'a, S> { + #[recursive] pub(super) fn set_expr_to_plan( &self, set_expr: SetExpr,