From 128d7c6700bd6b8300cae86932b1e6d9bf74414d Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 26 Oct 2023 01:04:12 +0300 Subject: [PATCH] [MINOR]: Simplify enforce_distribution, minor changes (#7924) * Initial commit * Simplifications * Cleanup imports * Review --------- Co-authored-by: Mehmet Ozan Kabak --- .../enforce_distribution.rs | 196 +++++++----------- .../src/physical_optimizer/enforce_sorting.rs | 22 +- .../core/src/physical_optimizer/test_utils.rs | 3 +- .../core/tests/fuzz_cases/window_fuzz.rs | 17 +- .../physical-expr/src/aggregate/first_last.rs | 7 +- datafusion/physical-expr/src/physical_expr.rs | 71 +++++-- .../physical-expr/src/scalar_function.rs | 21 +- .../physical-plan/src/aggregates/mod.rs | 2 +- .../src/windows/bounded_window_agg_exec.rs | 12 +- datafusion/physical-plan/src/windows/mod.rs | 12 +- 10 files changed, 184 insertions(+), 179 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index d3fbc46a6659..072c3cb6d7a6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -486,7 +486,7 @@ fn reorder_aggregate_keys( parent_required: &[Arc], agg_exec: &AggregateExec, ) -> Result { - let out_put_columns = agg_exec + let output_columns = agg_exec .group_by() .expr() .iter() @@ -494,44 +494,32 @@ fn reorder_aggregate_keys( .map(|(index, (_col, name))| Column::new(name, index)) .collect::>(); - let out_put_exprs = out_put_columns + let output_exprs = output_columns .iter() - .map(|c| Arc::new(c.clone()) as Arc) + .map(|c| Arc::new(c.clone()) as _) .collect::>(); - if parent_required.len() != out_put_exprs.len() + if parent_required.len() != output_exprs.len() || !agg_exec.group_by().null_expr().is_empty() - || expr_list_eq_strict_order(&out_put_exprs, parent_required) + || expr_list_eq_strict_order(&output_exprs, parent_required) { Ok(PlanWithKeyRequirements::new(agg_plan)) } else { - let new_positions = expected_expr_positions(&out_put_exprs, parent_required); + let new_positions = expected_expr_positions(&output_exprs, parent_required); match new_positions { None => Ok(PlanWithKeyRequirements::new(agg_plan)), Some(positions) => { let new_partial_agg = if let Some(agg_exec) = agg_exec.input().as_any().downcast_ref::() - /*AggregateExec { - mode, - group_by, - aggr_expr, - filter_expr, - order_by_expr, - input, - input_schema, - .. - }) = - */ { if matches!(agg_exec.mode(), &AggregateMode::Partial) { - let mut new_group_exprs = vec![]; - for idx in positions.iter() { - new_group_exprs - .push(agg_exec.group_by().expr()[*idx].clone()); - } + let group_exprs = agg_exec.group_by().expr(); + let new_group_exprs = positions + .into_iter() + .map(|idx| group_exprs[idx].clone()) + .collect(); let new_partial_group_by = PhysicalGroupBy::new_single(new_group_exprs); - // new Partial AggregateExec Some(Arc::new(AggregateExec::try_new( AggregateMode::Partial, new_partial_group_by, @@ -549,18 +537,13 @@ fn reorder_aggregate_keys( }; if let Some(partial_agg) = new_partial_agg { // Build new group expressions that correspond to the output of partial_agg - let new_final_group: Vec> = - partial_agg.output_group_expr(); + let group_exprs = partial_agg.group_expr().expr(); + let new_final_group = partial_agg.output_group_expr(); let new_group_by = PhysicalGroupBy::new_single( new_final_group .iter() .enumerate() - .map(|(i, expr)| { - ( - expr.clone(), - partial_agg.group_expr().expr()[i].1.clone(), - ) - }) + .map(|(idx, expr)| (expr.clone(), group_exprs[idx].1.clone())) .collect(), ); @@ -575,29 +558,29 @@ fn reorder_aggregate_keys( )?); // Need to create a new projection to change the expr ordering back - let mut proj_exprs = out_put_columns + let agg_schema = new_final_agg.schema(); + let mut proj_exprs = output_columns .iter() .map(|col| { + let name = col.name(); ( Arc::new(Column::new( - col.name(), - new_final_agg.schema().index_of(col.name()).unwrap(), + name, + agg_schema.index_of(name).unwrap(), )) as Arc, - col.name().to_owned(), + name.to_owned(), ) }) .collect::>(); let agg_schema = new_final_agg.schema(); let agg_fields = agg_schema.fields(); for (idx, field) in - agg_fields.iter().enumerate().skip(out_put_columns.len()) + agg_fields.iter().enumerate().skip(output_columns.len()) { - proj_exprs.push(( - Arc::new(Column::new(field.name().as_str(), idx)) - as Arc, - field.name().clone(), - )) + let name = field.name(); + proj_exprs + .push((Arc::new(Column::new(name, idx)) as _, name.clone())) } // TODO merge adjacent Projections if there are Ok(PlanWithKeyRequirements::new(Arc::new( @@ -615,15 +598,14 @@ fn shift_right_required( parent_required: &[Arc], left_columns_len: usize, ) -> Option>> { - let new_right_required: Vec> = parent_required + let new_right_required = parent_required .iter() .filter_map(|r| { if let Some(col) = r.as_any().downcast_ref::() { - if col.index() >= left_columns_len { - Some( - Arc::new(Column::new(col.name(), col.index() - left_columns_len)) - as Arc, - ) + let idx = col.index(); + if idx >= left_columns_len { + let result = Column::new(col.name(), idx - left_columns_len); + Some(Arc::new(result) as _) } else { None } @@ -634,11 +616,7 @@ fn shift_right_required( .collect::>(); // if the parent required are all comming from the right side, the requirements can be pushdown - if new_right_required.len() != parent_required.len() { - None - } else { - Some(new_right_required) - } + (new_right_required.len() == parent_required.len()).then_some(new_right_required) } /// When the physical planner creates the Joins, the ordering of join keys is from the original query. @@ -662,8 +640,8 @@ fn shift_right_required( /// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources /// and then can't apply the Top-Down reordering process. pub(crate) fn reorder_join_keys_to_inputs( - plan: Arc, -) -> Result> { + plan: Arc, +) -> Result> { let plan_any = plan.as_any(); if let Some(HashJoinExec { left, @@ -676,41 +654,34 @@ pub(crate) fn reorder_join_keys_to_inputs( .. }) = plan_any.downcast_ref::() { - match mode { - PartitionMode::Partitioned => { - let join_key_pairs = extract_join_keys(on); - if let Some(( - JoinKeyPairs { - left_keys, - right_keys, - }, - new_positions, - )) = reorder_current_join_keys( - join_key_pairs, - Some(left.output_partitioning()), - Some(right.output_partitioning()), - &left.equivalence_properties(), - &right.equivalence_properties(), - ) { - if !new_positions.is_empty() { - let new_join_on = new_join_conditions(&left_keys, &right_keys); - Ok(Arc::new(HashJoinExec::try_new( - left.clone(), - right.clone(), - new_join_on, - filter.clone(), - join_type, - PartitionMode::Partitioned, - *null_equals_null, - )?)) - } else { - Ok(plan) - } - } else { - Ok(plan) + if matches!(mode, PartitionMode::Partitioned) { + let join_key_pairs = extract_join_keys(on); + if let Some(( + JoinKeyPairs { + left_keys, + right_keys, + }, + new_positions, + )) = reorder_current_join_keys( + join_key_pairs, + Some(left.output_partitioning()), + Some(right.output_partitioning()), + &left.equivalence_properties(), + &right.equivalence_properties(), + ) { + if !new_positions.is_empty() { + let new_join_on = new_join_conditions(&left_keys, &right_keys); + return Ok(Arc::new(HashJoinExec::try_new( + left.clone(), + right.clone(), + new_join_on, + filter.clone(), + join_type, + PartitionMode::Partitioned, + *null_equals_null, + )?)); } } - _ => Ok(plan), } } else if let Some(SortMergeJoinExec { left, @@ -742,23 +713,18 @@ pub(crate) fn reorder_join_keys_to_inputs( for idx in 0..sort_options.len() { new_sort_options.push(sort_options[new_positions[idx]]) } - Ok(Arc::new(SortMergeJoinExec::try_new( + return Ok(Arc::new(SortMergeJoinExec::try_new( left.clone(), right.clone(), new_join_on, *join_type, new_sort_options, *null_equals_null, - )?)) - } else { - Ok(plan) + )?)); } - } else { - Ok(plan) } - } else { - Ok(plan) } + Ok(plan) } /// Reorder the current join keys ordering based on either left partition or right partition @@ -886,12 +852,7 @@ fn expected_expr_positions( fn extract_join_keys(on: &[(Column, Column)]) -> JoinKeyPairs { let (left_keys, right_keys) = on .iter() - .map(|(l, r)| { - ( - Arc::new(l.clone()) as Arc, - Arc::new(r.clone()) as Arc, - ) - }) + .map(|(l, r)| (Arc::new(l.clone()) as _, Arc::new(r.clone()) as _)) .unzip(); JoinKeyPairs { left_keys, @@ -903,7 +864,7 @@ fn new_join_conditions( new_left_keys: &[Arc], new_right_keys: &[Arc], ) -> Vec<(Column, Column)> { - let new_join_on = new_left_keys + new_left_keys .iter() .zip(new_right_keys.iter()) .map(|(l_key, r_key)| { @@ -912,8 +873,7 @@ fn new_join_conditions( r_key.as_any().downcast_ref::().unwrap().clone(), ) }) - .collect::>(); - new_join_on + .collect::>() } /// Updates `dist_onward` such that, to keep track of @@ -977,10 +937,10 @@ fn add_roundrobin_on_top( // (determined by flag `config.optimizer.bounded_order_preserving_variants`) let should_preserve_ordering = input.output_ordering().is_some(); - let new_plan = Arc::new( - RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(n_target))? - .with_preserve_order(should_preserve_ordering), - ) as Arc; + let partitioning = Partitioning::RoundRobinBatch(n_target); + let repartition = RepartitionExec::try_new(input, partitioning)? + .with_preserve_order(should_preserve_ordering); + let new_plan = Arc::new(repartition) as Arc; // update distribution onward with new operator update_distribution_onward(new_plan.clone(), dist_onward, input_idx); @@ -1009,7 +969,7 @@ fn add_roundrobin_on_top( /// /// # Returns /// -/// A [Result] object that contains new execution plan, where desired distribution is +/// A [`Result`] object that contains new execution plan, where desired distribution is /// satisfied by adding Hash Repartition. fn add_hash_on_top( input: Arc, @@ -1053,10 +1013,10 @@ fn add_hash_on_top( } else { input }; - new_plan = Arc::new( - RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, n_target))? - .with_preserve_order(should_preserve_ordering), - ) as _; + let partitioning = Partitioning::Hash(hash_exprs, n_target); + let repartition = RepartitionExec::try_new(new_plan, partitioning)? + .with_preserve_order(should_preserve_ordering); + new_plan = Arc::new(repartition) as _; // update distribution onward with new operator update_distribution_onward(new_plan.clone(), dist_onward, input_idx); @@ -1146,7 +1106,7 @@ fn remove_dist_changing_operators( { // All of above operators have a single child. When we remove the top // operator, we take the first child. - plan = plan.children()[0].clone(); + plan = plan.children().swap_remove(0); distribution_onwards = get_children_exectrees(plan.children().len(), &distribution_onwards[0]); } @@ -1199,14 +1159,14 @@ fn replace_order_preserving_variants_helper( } if is_sort_preserving_merge(&exec_tree.plan) { return Ok(Arc::new(CoalescePartitionsExec::new( - updated_children[0].clone(), + updated_children.swap_remove(0), ))); } if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::() { if repartition.preserve_order() { return Ok(Arc::new( RepartitionExec::try_new( - updated_children[0].clone(), + updated_children.swap_remove(0), repartition.partitioning().clone(), )? .with_preserve_order(false), @@ -1427,7 +1387,7 @@ fn ensure_distribution( // Data Arc::new(InterleaveExec::try_new(new_children)?) } else { - plan.clone().with_new_children(new_children)? + plan.with_new_children(new_children)? }, distribution_onwards, }; @@ -1624,7 +1584,7 @@ impl PlanWithKeyRequirements { let length = child.children().len(); PlanWithKeyRequirements { plan: child, - required_key_ordering: from_parent.clone(), + required_key_ordering: from_parent, request_key_ordering: vec![None; length], } }) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 913dae07faa1..822a224d236a 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -17,8 +17,8 @@ //! EnforceSorting optimizer rule inspects the physical plan with respect //! to local sorting requirements and does the following: -//! - Adds a [SortExec] when a requirement is not met, -//! - Removes an already-existing [SortExec] if it is possible to prove +//! - Adds a [`SortExec`] when a requirement is not met, +//! - Removes an already-existing [`SortExec`] if it is possible to prove //! that this sort is unnecessary //! The rule can work on valid *and* invalid physical plans with respect to //! sorting requirements, but always produces a valid physical plan in this sense. @@ -496,9 +496,10 @@ fn ensure_sorting( { // This SortPreservingMergeExec is unnecessary, input already has a // single partition. + sort_onwards.truncate(1); return Ok(Transformed::Yes(PlanWithCorrespondingSort { - plan: children[0].clone(), - sort_onwards: vec![sort_onwards[0].clone()], + plan: children.swap_remove(0), + sort_onwards, })); } Ok(Transformed::Yes(PlanWithCorrespondingSort { @@ -649,7 +650,7 @@ fn remove_corresponding_coalesce_in_sub_plan( && is_repartition(&new_plan) && is_repartition(parent) { - new_plan = new_plan.children()[0].clone() + new_plan = new_plan.children().swap_remove(0) } new_plan } else { @@ -689,7 +690,7 @@ fn remove_corresponding_sort_from_sub_plan( ) -> Result> { // A `SortExec` is always at the bottom of the tree. let mut updated_plan = if is_sort(&sort_onwards.plan) { - sort_onwards.plan.children()[0].clone() + sort_onwards.plan.children().swap_remove(0) } else { let plan = &sort_onwards.plan; let mut children = plan.children(); @@ -703,12 +704,12 @@ fn remove_corresponding_sort_from_sub_plan( } // Replace with variants that do not preserve order. if is_sort_preserving_merge(plan) { - children[0].clone() + children.swap_remove(0) } else if let Some(repartition) = plan.as_any().downcast_ref::() { Arc::new( RepartitionExec::try_new( - children[0].clone(), + children.swap_remove(0), repartition.partitioning().clone(), )? .with_preserve_order(false), @@ -730,7 +731,7 @@ fn remove_corresponding_sort_from_sub_plan( updated_plan, )); } else { - updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan.clone())); + updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan)); } } Ok(updated_plan) @@ -777,8 +778,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::Result; use datafusion_expr::JoinType; - use datafusion_physical_expr::expressions::Column; - use datafusion_physical_expr::expressions::{col, NotExpr}; + use datafusion_physical_expr::expressions::{col, Column, NotExpr}; fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 53401751b67e..159ee5089075 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -44,6 +44,7 @@ use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_plan::windows::PartitionSearchMode; use async_trait::async_trait; @@ -239,7 +240,7 @@ pub fn bounded_window_exec( .unwrap()], input.clone(), vec![], - crate::physical_plan::windows::PartitionSearchMode::Sorted, + PartitionSearchMode::Sorted, ) .unwrap(), ) diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index 83c8e1f57896..db940a9794a1 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -22,9 +22,6 @@ use arrow::compute::{concat_batches, SortOptions}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use hashbrown::HashMap; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; @@ -32,22 +29,26 @@ use datafusion::physical_plan::windows::{ create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec, }; use datafusion::physical_plan::{collect, ExecutionPlan}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::type_coercion::aggregates::coerce_types; use datafusion_expr::{ AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction, }; - -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::type_coercion::aggregates::coerce_types; use datafusion_physical_expr::expressions::{cast, col, lit}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use test_utils::add_empty_batches; +use hashbrown::HashMap; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + #[cfg(test)] mod tests { use super::*; - use datafusion::physical_plan::windows::PartitionSearchMode::{ + + use datafusion_physical_plan::windows::PartitionSearchMode::{ Linear, PartiallySorted, Sorted, }; diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs index ce7a1daeec64..a4e0a6dc49a9 100644 --- a/datafusion/physical-expr/src/aggregate/first_last.rs +++ b/datafusion/physical-expr/src/aggregate/first_last.rs @@ -26,12 +26,9 @@ use crate::{ reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, }; -use arrow::array::ArrayRef; -use arrow::compute; -use arrow::compute::{lexsort_to_indices, SortColumn}; +use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; +use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; -use arrow_array::cast::AsArray; -use arrow_array::{Array, BooleanArray}; use arrow_schema::SortOptions; use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; use datafusion_common::{DataFusionError, Result, ScalarValue}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 0eff45b6b9f7..11fa6c899621 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -15,6 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::fmt::{Debug, Display}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + use crate::intervals::Interval; use crate::sort_properties::SortProperties; use crate::utils::scatter; @@ -27,11 +32,6 @@ use datafusion_common::utils::DataPtr; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; -use std::any::Any; -use std::fmt::{Debug, Display}; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; - /// Expression that can be evaluated against a RecordBatch /// A Physical expression knows its type, nullability and how to evaluate itself. pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { @@ -54,13 +54,12 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { let tmp_batch = filter_record_batch(batch, selection)?; let tmp_result = self.evaluate(&tmp_batch)?; - // All values from the `selection` filter are true. + if batch.num_rows() == tmp_batch.num_rows() { - return Ok(tmp_result); - } - if let ColumnarValue::Array(a) = tmp_result { - let result = scatter(selection, a.as_ref())?; - Ok(ColumnarValue::Array(result)) + // All values from the `selection` filter are true. + Ok(tmp_result) + } else if let ColumnarValue::Array(a) = tmp_result { + scatter(selection, a.as_ref()).map(ColumnarValue::Array) } else { Ok(tmp_result) } @@ -216,8 +215,8 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { } } -/// It is similar to contains method of vector. -/// Finds whether `expr` is among `physical_exprs`. +/// This function is similar to the `contains` method of `Vec`. It finds +/// whether `expr` is among `physical_exprs`. pub fn physical_exprs_contains( physical_exprs: &[Arc], expr: &Arc, @@ -226,3 +225,49 @@ pub fn physical_exprs_contains( .iter() .any(|physical_expr| physical_expr.eq(expr)) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::expressions::{Column, Literal}; + use crate::physical_expr::{physical_exprs_contains, PhysicalExpr}; + + use datafusion_common::{Result, ScalarValue}; + + #[test] + fn test_physical_exprs_contains() -> Result<()> { + let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) + as Arc; + let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))) + as Arc; + let lit4 = + Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let col_a_expr = Arc::new(Column::new("a", 0)) as Arc; + let col_b_expr = Arc::new(Column::new("b", 1)) as Arc; + let col_c_expr = Arc::new(Column::new("c", 2)) as Arc; + + // lit(true), lit(false), lit(4), lit(2), Col(a), Col(b) + let physical_exprs: Vec> = vec![ + lit_true.clone(), + lit_false.clone(), + lit4.clone(), + lit2.clone(), + col_a_expr.clone(), + col_b_expr.clone(), + ]; + // below expressions are inside physical_exprs + assert!(physical_exprs_contains(&physical_exprs, &lit_true)); + assert!(physical_exprs_contains(&physical_exprs, &lit2)); + assert!(physical_exprs_contains(&physical_exprs, &col_b_expr)); + + // below expressions are not inside physical_exprs + assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr)); + assert!(!physical_exprs_contains(&physical_exprs, &lit1)); + Ok(()) + } +} diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index dc48baa23ab3..43598ce56489 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -29,24 +29,25 @@ //! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed //! to a function that supports f64, it is coerced to f64. +use std::any::Any; +use std::fmt::Debug; +use std::fmt::{self, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + use crate::functions::out_ordering; use crate::physical_expr::down_cast_any_ref; use crate::sort_properties::SortProperties; use crate::utils::expr_list_eq_strict_order; use crate::PhysicalExpr; + use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::Result; -use datafusion_expr::expr_vec_fmt; -use datafusion_expr::BuiltinScalarFunction; -use datafusion_expr::ColumnarValue; -use datafusion_expr::FuncMonotonicity; -use datafusion_expr::ScalarFunctionImplementation; -use std::any::Any; -use std::fmt::Debug; -use std::fmt::{self, Formatter}; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; +use datafusion_expr::{ + expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity, + ScalarFunctionImplementation, +}; /// Physical expression of a scalar function pub struct ScalarFunctionExpr { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 1fa129680cea..4c612223178c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -228,7 +228,7 @@ impl PhysicalGroupBy { } /// Return grouping expressions as they occur in the output schema. - fn output_exprs(&self) -> Vec> { + pub fn output_exprs(&self) -> Vec> { self.expr .iter() .enumerate() diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index eab47886c764..f6ffe2e26795 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -31,6 +31,7 @@ use crate::expressions::PhysicalSortExpr; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence, + PartitionSearchMode, }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, @@ -68,17 +69,6 @@ use hashbrown::raw::RawTable; use indexmap::IndexMap; use log::debug; -#[derive(Debug, Clone, PartialEq)] -/// Specifies partition column properties in terms of input ordering -pub enum PartitionSearchMode { - /// None of the columns among the partition columns is ordered. - Linear, - /// Some columns of the partition columns are ordered but not all - PartiallySorted(Vec), - /// All Partition columns are ordered (Also empty case) - Sorted, -} - /// Window execution plan #[derive(Debug)] pub struct BoundedWindowAggExec { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index cc915e54af60..aff936499a5e 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -54,13 +54,23 @@ mod bounded_window_agg_exec; mod window_agg_exec; pub use bounded_window_agg_exec::BoundedWindowAggExec; -pub use bounded_window_agg_exec::PartitionSearchMode; pub use window_agg_exec::WindowAggExec; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; +#[derive(Debug, Clone, PartialEq)] +/// Specifies partition column properties in terms of input ordering +pub enum PartitionSearchMode { + /// None of the columns among the partition columns is ordered. + Linear, + /// Some columns of the partition columns are ordered but not all + PartiallySorted(Vec), + /// All Partition columns are ordered (Also empty case) + Sorted, +} + /// Create a physical expression for window function pub fn create_window_expr( fun: &WindowFunction,