diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 78f370c714cc..04cbf8b537b3 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1122,7 +1122,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let expr = exec .expr() .iter() - .map(|expr| serialize_physical_expr(Arc::clone(&expr.0), extension_codec)) + .map(|expr| serialize_physical_expr(&expr.0, extension_codec)) .collect::>>()?; let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); return Ok(protobuf::PhysicalPlanNode { @@ -1163,7 +1163,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { protobuf::FilterExecNode { input: Some(Box::new(input)), expr: Some(serialize_physical_expr( - Arc::clone(exec.predicate()), + exec.predicate(), extension_codec, )?), default_filter_selectivity: exec.default_selectivity() as u32, @@ -1220,8 +1220,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .on() .iter() .map(|tuple| { - let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?; - let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?; + let l = serialize_physical_expr(&tuple.0, extension_codec)?; + let r = serialize_physical_expr(&tuple.1, extension_codec)?; Ok::<_, DataFusionError>(protobuf::JoinOn { left: Some(l), right: Some(r), @@ -1233,10 +1233,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .filter() .as_ref() .map(|f| { - let expression = serialize_physical_expr( - f.expression().to_owned(), - extension_codec, - )?; + let expression = + serialize_physical_expr(f.expression(), extension_codec)?; let column_indices = f .column_indices() .iter() @@ -1294,8 +1292,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .on() .iter() .map(|tuple| { - let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?; - let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?; + let l = serialize_physical_expr(&tuple.0, extension_codec)?; + let r = serialize_physical_expr(&tuple.1, extension_codec)?; Ok::<_, DataFusionError>(protobuf::JoinOn { left: Some(l), right: Some(r), @@ -1307,10 +1305,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .filter() .as_ref() .map(|f| { - let expression = serialize_physical_expr( - f.expression().to_owned(), - extension_codec, - )?; + let expression = + serialize_physical_expr(f.expression(), extension_codec)?; let column_indices = f .column_indices() .iter() @@ -1348,7 +1344,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .map(|expr| { Ok(protobuf::PhysicalSortExprNode { expr: Some(Box::new(serialize_physical_expr( - expr.expr.to_owned(), + &expr.expr, extension_codec, )?)), asc: !expr.options.descending, @@ -1368,7 +1364,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .map(|expr| { Ok(protobuf::PhysicalSortExprNode { expr: Some(Box::new(serialize_physical_expr( - expr.expr.to_owned(), + &expr.expr, extension_codec, )?)), asc: !expr.options.descending, @@ -1475,14 +1471,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .group_expr() .null_expr() .iter() - .map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec)) + .map(|expr| serialize_physical_expr(&expr.0, extension_codec)) .collect::>>()?; let group_expr = exec .group_expr() .expr() .iter() - .map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec)) + .map(|expr| serialize_physical_expr(&expr.0, extension_codec)) .collect::>>()?; let limit = exec.limit().map(|value| protobuf::AggLimit { @@ -1581,7 +1577,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { if let Some(exec) = plan.downcast_ref::() { let predicate = exec .predicate() - .map(|pred| serialize_physical_expr(Arc::clone(pred), extension_codec)) + .map(|pred| serialize_physical_expr(pred, extension_codec)) .transpose()?; return Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ParquetScan( @@ -1653,7 +1649,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .map(|expr| { let sort_expr = Box::new(protobuf::PhysicalSortExprNode { expr: Some(Box::new(serialize_physical_expr( - expr.expr.to_owned(), + &expr.expr, extension_codec, )?)), asc: !expr.options.descending, @@ -1722,7 +1718,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .map(|expr| { let sort_expr = Box::new(protobuf::PhysicalSortExprNode { expr: Some(Box::new(serialize_physical_expr( - expr.expr.to_owned(), + &expr.expr, extension_codec, )?)), asc: !expr.options.descending, @@ -1761,10 +1757,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .filter() .as_ref() .map(|f| { - let expression = serialize_physical_expr( - f.expression().to_owned(), - extension_codec, - )?; + let expression = + serialize_physical_expr(f.expression(), extension_codec)?; let column_indices = f .column_indices() .iter() @@ -1806,13 +1800,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let window_expr = exec .window_expr() .iter() - .map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec)) + .map(|e| serialize_physical_window_expr(e, extension_codec)) .collect::>>()?; let partition_keys = exec .partition_keys .iter() - .map(|e| serialize_physical_expr(Arc::clone(e), extension_codec)) + .map(|e| serialize_physical_expr(e, extension_codec)) .collect::>>()?; return Ok(protobuf::PhysicalPlanNode { @@ -1836,13 +1830,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let window_expr = exec .window_expr() .iter() - .map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec)) + .map(|e| serialize_physical_window_expr(e, extension_codec)) .collect::>>()?; let partition_keys = exec .partition_keys .iter() - .map(|e| serialize_physical_expr(Arc::clone(e), extension_codec)) + .map(|e| serialize_physical_expr(e, extension_codec)) .collect::>>()?; let input_order_mode = match &exec.input_order_mode { @@ -1886,7 +1880,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { let expr: PhysicalSortExpr = requirement.to_owned().into(); let sort_expr = protobuf::PhysicalSortExprNode { expr: Some(Box::new(serialize_physical_expr( - expr.expr.to_owned(), + &expr.expr, extension_codec, )?)), asc: !expr.options.descending, @@ -2025,7 +2019,7 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync { fn try_encode_expr( &self, - _node: Arc, + _node: &Arc, _buf: &mut Vec, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 555ad22a9bc1..25be7de61cc3 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -52,7 +52,7 @@ pub fn serialize_physical_aggr_expr( aggr_expr: Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { - let expressions = serialize_physical_exprs(aggr_expr.expressions(), codec)?; + let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?; let ordering_req = aggr_expr.order_bys().unwrap_or(&[]).to_vec(); let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?; @@ -96,7 +96,7 @@ fn serialize_physical_window_aggr_expr( } pub fn serialize_physical_window_expr( - window_expr: Arc, + window_expr: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expr = window_expr.as_any(); @@ -187,9 +187,8 @@ pub fn serialize_physical_window_expr( return not_impl_err!("WindowExpr not supported: {window_expr:?}"); }; - let args = serialize_physical_exprs(args, codec)?; - let partition_by = - serialize_physical_exprs(window_expr.partition_by().to_vec(), codec)?; + let args = serialize_physical_exprs(&args, codec)?; + let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?; let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?; let window_frame: protobuf::WindowFrame = window_frame .as_ref() @@ -225,7 +224,7 @@ pub fn serialize_physical_sort_expr( codec: &dyn PhysicalExtensionCodec, ) -> Result { let PhysicalSortExpr { expr, options } = sort_expr; - let expr = serialize_physical_expr(expr, codec)?; + let expr = serialize_physical_expr(&expr, codec)?; Ok(PhysicalSortExprNode { expr: Some(Box::new(expr)), asc: !options.descending, @@ -233,12 +232,12 @@ pub fn serialize_physical_sort_expr( }) } -pub fn serialize_physical_exprs( +pub fn serialize_physical_exprs<'a, I>( values: I, codec: &dyn PhysicalExtensionCodec, ) -> Result> where - I: IntoIterator>, + I: IntoIterator>, { values .into_iter() @@ -251,7 +250,7 @@ where /// If required, a [`PhysicalExtensionCodec`] can be provided which can handle /// serialization of udfs requiring specialized serialization (see [`PhysicalExtensionCodec::try_encode_udf`]) pub fn serialize_physical_expr( - value: Arc, + value: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expr = value.as_any(); @@ -267,14 +266,8 @@ pub fn serialize_physical_expr( }) } else if let Some(expr) = expr.downcast_ref::() { let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode { - l: Some(Box::new(serialize_physical_expr( - Arc::clone(expr.left()), - codec, - )?)), - r: Some(Box::new(serialize_physical_expr( - Arc::clone(expr.right()), - codec, - )?)), + l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)), + r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)), op: format!("{:?}", expr.op()), }); @@ -292,8 +285,7 @@ pub fn serialize_physical_expr( expr: expr .expr() .map(|exp| { - serialize_physical_expr(Arc::clone(exp), codec) - .map(Box::new) + serialize_physical_expr(exp, codec).map(Box::new) }) .transpose()?, when_then_expr: expr @@ -308,10 +300,7 @@ pub fn serialize_physical_expr( >>()?, else_expr: expr .else_expr() - .map(|a| { - serialize_physical_expr(Arc::clone(a), codec) - .map(Box::new) - }) + .map(|a| serialize_physical_expr(a, codec).map(Box::new)) .transpose()?, }, ), @@ -322,10 +311,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { - expr: Some(Box::new(serialize_physical_expr( - expr.arg().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), }, ))), }) @@ -333,10 +319,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { - expr: Some(Box::new(serialize_physical_expr( - expr.arg().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), }), )), }) @@ -344,10 +327,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { - expr: Some(Box::new(serialize_physical_expr( - expr.arg().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), }), )), }) @@ -355,11 +335,8 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { - expr: Some(Box::new(serialize_physical_expr( - expr.expr().to_owned(), - codec, - )?)), - list: serialize_physical_exprs(expr.list().to_vec(), codec)?, + expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)), + list: serialize_physical_exprs(expr.list(), codec)?, negated: expr.negated(), }, ))), @@ -368,10 +345,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { - expr: Some(Box::new(serialize_physical_expr( - expr.arg().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)), }, ))), }) @@ -385,10 +359,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { - expr: Some(Box::new(serialize_physical_expr( - cast.expr().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)), arrow_type: Some(cast.cast_type().try_into()?), }, ))), @@ -397,10 +368,7 @@ pub fn serialize_physical_expr( Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { - expr: Some(Box::new(serialize_physical_expr( - cast.expr().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)), arrow_type: Some(cast.cast_type().try_into()?), }, ))), @@ -412,7 +380,7 @@ pub fn serialize_physical_expr( expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), - args: serialize_physical_exprs(expr.args().to_vec(), codec)?, + args: serialize_physical_exprs(expr.args(), codec)?, fun_definition: (!buf.is_empty()).then_some(buf), return_type: Some(expr.return_type().try_into()?), }, @@ -424,12 +392,9 @@ pub fn serialize_physical_expr( protobuf::PhysicalLikeExprNode { negated: expr.negated(), case_insensitive: expr.case_insensitive(), - expr: Some(Box::new(serialize_physical_expr( - expr.expr().to_owned(), - codec, - )?)), + expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)), pattern: Some(Box::new(serialize_physical_expr( - expr.pattern().to_owned(), + expr.pattern(), codec, )?)), }, @@ -437,12 +402,12 @@ pub fn serialize_physical_expr( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(Arc::clone(&value), &mut buf) { + match codec.try_encode_expr(value, &mut buf) { Ok(_) => { let inputs: Vec = value .children() .into_iter() - .map(|e| serialize_physical_expr(Arc::clone(e), codec)) + .map(|e| serialize_physical_expr(e, codec)) .collect::>()?; Ok(protobuf::PhysicalExprNode { expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( @@ -468,7 +433,7 @@ pub fn serialize_partitioning( )), }, Partitioning::Hash(exprs, partition_count) => { - let serialized_exprs = serialize_physical_exprs(exprs.clone(), codec)?; + let serialized_exprs = serialize_physical_exprs(exprs, codec)?; protobuf::Partitioning { partition_method: Some(protobuf::partitioning::PartitionMethod::Hash( protobuf::PhysicalHashRepartition { @@ -493,8 +458,8 @@ fn serialize_when_then_expr( codec: &dyn PhysicalExtensionCodec, ) -> Result { Ok(protobuf::PhysicalWhenThen { - when_expr: Some(serialize_physical_expr(Arc::clone(when_expr), codec)?), - then_expr: Some(serialize_physical_expr(Arc::clone(then_expr), codec)?), + when_expr: Some(serialize_physical_expr(when_expr, codec)?), + then_expr: Some(serialize_physical_expr(then_expr, codec)?), }) } @@ -608,7 +573,7 @@ pub fn serialize_maybe_filter( match expr { None => Ok(protobuf::MaybeFilter { expr: None }), Some(expr) => Ok(protobuf::MaybeFilter { - expr: Some(serialize_physical_expr(expr, codec)?), + expr: Some(serialize_physical_expr(&expr, codec)?), }), } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3e49dc24fd5a..b2ded88dfaf4 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -821,11 +821,10 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { fn try_encode_expr( &self, - node: Arc, + node: &Arc, buf: &mut Vec, ) -> Result<()> { if node - .as_ref() .as_any() .downcast_ref::() .is_some()