diff --git a/Cargo.toml b/Cargo.toml index 91f09102ce48..0b5c74e15d13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,6 +141,7 @@ pbjson = { version = "0.7.0" } prost = "0.13.1" prost-derive = "0.13.1" rand = "0.8" +recursive = "0.1.1" regex = "1.8" rstest = "0.23.0" serde_json = "1" diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9983e247f9ac..a1154f58dcd9 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1306,6 +1306,7 @@ dependencies = [ "object_store", "parquet", "paste", + "recursive", "sqlparser", "tokio", ] @@ -1353,6 +1354,7 @@ dependencies = [ "datafusion-physical-expr-common", "indexmap", "paste", + "recursive", "serde_json", "sqlparser", "strum 0.26.3", @@ -1482,6 +1484,7 @@ dependencies = [ "itertools", "log", "paste", + "recursive", "regex", "regex-syntax", ] @@ -1537,6 +1540,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-plan", "itertools", + "recursive", ] [[package]] @@ -1583,6 +1587,7 @@ dependencies = [ "datafusion-expr", "indexmap", "log", + "recursive", "regex", "sqlparser", "strum 0.26.3", @@ -3042,6 +3047,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "psm" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa37f80ca58604976033fae9515a8a2989fc13797d953f7c04fb8fa36a11f205" +dependencies = [ + "cc", +] + [[package]] name = "quad-rand" version = "0.2.2" @@ -3156,6 +3170,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "recursive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0786a43debb760f491b1bc0269fe5e84155353c67482b9e60d0cfb596054b43e" +dependencies = [ + "recursive-proc-macro-impl", + "stacker", +] + +[[package]] +name = "recursive-proc-macro-impl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -3707,6 +3741,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stacker" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799c883d55abdb5e98af1a7b3f23b9b6de8ecada0ecac058672d7635eb48ca7b" +dependencies = [ + "cc", + "cfg-if", + "libc", + "psm", + "windows-sys 0.59.0", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 0747672a18f6..34d6e2411952 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -63,6 +63,7 @@ object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" pyo3 = { version = "0.22.0", optional = true } +recursive = { workspace = true } sqlparser = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index a0ad1e80be9b..c8ec7f18339a 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -17,6 +17,7 @@ //! [`TreeNode`] for visiting and rewriting expression and plan trees +use recursive::recursive; use std::sync::Arc; use crate::Result; @@ -123,6 +124,7 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` + #[recursive] fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( &'n self, visitor: &mut V, @@ -172,6 +174,7 @@ pub trait TreeNode: Sized { /// TreeNodeRewriter::f_up(ChildNode2) /// TreeNodeRewriter::f_up(ParentNode) /// ``` + #[recursive] fn rewrite>( self, rewriter: &mut R, @@ -194,6 +197,7 @@ pub trait TreeNode: Sized { &'n self, mut f: F, ) -> Result { + #[recursive] fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( node: &'n N, f: &mut F, @@ -228,6 +232,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { + #[recursive] fn transform_down_impl Result>>( node: N, f: &mut F, @@ -251,6 +256,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { + #[recursive] fn transform_up_impl Result>>( node: N, f: &mut F, @@ -365,6 +371,7 @@ pub trait TreeNode: Sized { mut f_down: FD, mut f_up: FU, ) -> Result> { + #[recursive] fn transform_down_up_impl< N: TreeNode, FD: FnMut(N) -> Result>, @@ -2079,4 +2086,17 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn test_large_tree() { + let mut item = TestTreeNode::new_leaf("initial".to_string()); + for i in 0..3000 { + item = TestTreeNode::new(vec![item], format!("parent-{}", i)); + } + + let mut visitor = + TestVisitor::new(Box::new(visit_continue), Box::new(visit_continue)); + + item.visit(&mut visitor).unwrap(); + } } diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index d7dc1afe4d50..19cd5ed3158b 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -50,6 +50,7 @@ datafusion-functions-window-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } indexmap = { workspace = true } paste = "^1.0" +recursive = { workspace = true } serde_json = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index d16fe42098f5..e7dfe8791924 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -42,6 +42,7 @@ use crate::{ LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window, }; +use recursive::recursive; use std::ops::Deref; use std::sync::Arc; @@ -745,6 +746,7 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. + #[recursive] pub fn visit_with_subqueries TreeNodeVisitor<'n, Node = Self>>( &self, visitor: &mut V, @@ -761,6 +763,7 @@ impl LogicalPlan { /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. + #[recursive] pub fn rewrite_with_subqueries>( self, rewriter: &mut R, @@ -779,6 +782,7 @@ impl LogicalPlan { &self, mut f: F, ) -> Result { + #[recursive] fn apply_with_subqueries_impl< F: FnMut(&LogicalPlan) -> Result, >( @@ -814,6 +818,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { + #[recursive] fn transform_down_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -839,6 +844,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { + #[recursive] fn transform_up_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -866,6 +872,7 @@ impl LogicalPlan { mut f_down: FD, mut f_up: FU, ) -> Result> { + #[recursive] fn transform_down_up_with_subqueries_impl< FD: FnMut(LogicalPlan) -> Result>, FU: FnMut(LogicalPlan) -> Result>, diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 2ea3ebf337eb..34e35c66107a 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -47,6 +47,7 @@ indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } paste = "1.0.14" +recursive = { workspace = true } regex = { workspace = true } regex-syntax = "0.8.0" diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index fa04835f0967..0b54b302c2df 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -17,6 +17,7 @@ use crate::analyzer::check_plan; use crate::utils::collect_subquery_cols; +use recursive::recursive; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, Result}; @@ -128,6 +129,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { } // Recursively check the unsupported outer references in the sub query plan. +#[recursive] fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { if !can_contain_outer_ref && inner_plan.contains_outer_reference() { return plan_err!("Accessing outer reference columns is not allowed in the plan"); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 71327ad3e21d..16a4fa6be38d 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -22,6 +22,7 @@ use std::fmt::Debug; use std::sync::Arc; use crate::{OptimizerConfig, OptimizerRule}; +use recursive::recursive; use crate::optimizer::ApplyOrder; use crate::utils::NamePreserver; @@ -531,6 +532,7 @@ impl OptimizerRule for CommonSubexprEliminate { None } + #[recursive] fn rewrite( &self, plan: LogicalPlan, diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 65ebac2106ad..32b7ce44a63a 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -16,9 +16,9 @@ // under the License. //! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available. -use std::sync::Arc; - use crate::{OptimizerConfig, OptimizerRule}; +use recursive::recursive; +use std::sync::Arc; use crate::join_key_set::JoinKeySet; use datafusion_common::tree_node::{Transformed, TreeNode}; @@ -80,6 +80,7 @@ impl OptimizerRule for EliminateCrossJoin { true } + #[recursive] fn rewrite( &self, plan: LogicalPlan, diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 04a523f9b115..b659e477f67e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -19,11 +19,11 @@ mod required_indices; -use std::collections::HashSet; -use std::sync::Arc; - use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; +use recursive::recursive; +use std::collections::HashSet; +use std::sync::Arc; use datafusion_common::{ get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column, @@ -110,6 +110,7 @@ impl OptimizerRule for OptimizeProjections { /// columns. /// - `Ok(None)`: Signal that the given logical plan did not require any change. /// - `Err(error)`: An error occurred during the optimization process. +#[recursive] fn optimize_projections( plan: LogicalPlan, config: &dyn OptimizerConfig, diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index e7bf4a80fc45..04f01f8badb8 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -40,6 +40,7 @@ datafusion-expr-common = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } itertools = { workspace = true } +recursive = { workspace = true } [dev-dependencies] datafusion-functions-aggregate = { workspace = true } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 27870c7865f3..87077183110d 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -16,8 +16,6 @@ // under the License. //! Utilizing exact statistics from sources to avoid scanning data -use std::sync::Arc; - use datafusion_common::config::ConfigOptions; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; @@ -27,6 +25,8 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs}; use datafusion_physical_plan::{expressions, ExecutionPlan}; +use recursive::recursive; +use std::sync::Arc; use crate::PhysicalOptimizerRule; @@ -42,6 +42,7 @@ impl AggregateStatistics { } impl PhysicalOptimizerRule for AggregateStatistics { + #[recursive] fn optimize( &self, plan: Arc, diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 1eef1b718ba6..94c3ce97a441 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -48,6 +48,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } indexmap = { workspace = true } log = { workspace = true } +recursive = { workspace = true } regex = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index b68be90b03e1..72f88abcea99 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -20,6 +20,7 @@ use arrow_schema::TimeUnit; use datafusion_expr::planner::{ PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, }; +use recursive::recursive; use sqlparser::ast::{ BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField, Expr as SQLExpr, MapEntry, StructField, Subscript, TrimWhereField, Value, @@ -168,16 +169,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Internal implementation. Use /// [`Self::sql_expr_to_logical_expr`] to plan exprs. + #[recursive] fn sql_expr_to_logical_expr_internal( &self, sql: SQLExpr, schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - // NOTE: This function is called recusively, so each match arm body should be as - // small as possible to avoid stack overflows in debug builds. Follow the - // common pattern of extracting into a separate function for non-trivial - // arms. See https://github.com/apache/datafusion/pull/12384 for more context. + // NOTE: This function is called recursively, so each match arm body should be as + // small as possible to decrease stack requirement. + // Follow the common pattern of extracting into a separate function for + // non-trivial arms. See https://github.com/apache/datafusion/pull/12384 for + // more context. match sql { SQLExpr::Value(value) => { self.parse_value(value, planner_context.prepare_param_data_types()) diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index 7dc15de7ad71..1cf090aa64aa 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -133,8 +133,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ))) } - // IMPORTANT: Keep sql_array_literal's function body small to prevent stack overflow - // This function is recursively called, potentially leading to deep call stacks. pub(super) fn sql_array_literal( &self, elements: Vec, diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1ef009132f9e..740f9ad3b42c 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -59,7 +59,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.select_into(plan, select_into) } other => { + // The functions called from `set_expr_to_plan()` need more than 128KB + // stack in debug builds as investigated in: + // https://github.com/apache/datafusion/pull/13310#discussion_r1836813902 + let min_stack_size = recursive::get_minimum_stack_size(); + recursive::set_minimum_stack_size(256 * 1024); let plan = self.set_expr_to_plan(other, planner_context)?; + recursive::set_minimum_stack_size(min_stack_size); let oby_exprs = to_order_by_exprs(query.order_by)?; let order_by_rex = self.order_by_to_sort_expr( oby_exprs, diff --git a/datafusion/sql/src/set_expr.rs b/datafusion/sql/src/set_expr.rs index 248aad846996..e56ebb4d323f 100644 --- a/datafusion/sql/src/set_expr.rs +++ b/datafusion/sql/src/set_expr.rs @@ -18,9 +18,11 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{not_impl_err, Result}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; +use recursive::recursive; use sqlparser::ast::{SetExpr, SetOperator, SetQuantifier}; impl<'a, S: ContextProvider> SqlToRel<'a, S> { + #[recursive] pub(super) fn set_expr_to_plan( &self, set_expr: SetExpr,