Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR]: Simplify enforce_distribution, minor changes #7924

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 78 additions & 118 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! EnforceSorting optimizer rule inspects the physical plan with respect
//! to local sorting requirements and does the following:
//! - Adds a [SortExec] when a requirement is not met,
//! - Removes an already-existing [SortExec] if it is possible to prove
//! - Adds a [`SortExec`] when a requirement is not met,
//! - Removes an already-existing [`SortExec`] if it is possible to prove
//! that this sort is unnecessary
//! The rule can work on valid *and* invalid physical plans with respect to
//! sorting requirements, but always produces a valid physical plan in this sense.
Expand Down Expand Up @@ -496,9 +496,10 @@ fn ensure_sorting(
{
// This SortPreservingMergeExec is unnecessary, input already has a
// single partition.
sort_onwards.truncate(1);
return Ok(Transformed::Yes(PlanWithCorrespondingSort {
plan: children[0].clone(),
sort_onwards: vec![sort_onwards[0].clone()],
plan: children.swap_remove(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://doc.rust-lang.org/std/vec/struct.Vec.html#method.swap_remove says "does not preserve the order" -- is the idea here that there is only one element in child and this avoids a clone?

I think we have used the pop method to do something similar elsewhere in the code (like children.pop().unwrap())

Not that this needs to change, just an observation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly, your observation is correct -- in this case children.pop().unwrap() and swap_remove(0) is equivalent.

sort_onwards,
}));
}
Ok(Transformed::Yes(PlanWithCorrespondingSort {
Expand Down Expand Up @@ -649,7 +650,7 @@ fn remove_corresponding_coalesce_in_sub_plan(
&& is_repartition(&new_plan)
&& is_repartition(parent)
{
new_plan = new_plan.children()[0].clone()
new_plan = new_plan.children().swap_remove(0)
}
new_plan
} else {
Expand Down Expand Up @@ -689,7 +690,7 @@ fn remove_corresponding_sort_from_sub_plan(
) -> Result<Arc<dyn ExecutionPlan>> {
// A `SortExec` is always at the bottom of the tree.
let mut updated_plan = if is_sort(&sort_onwards.plan) {
sort_onwards.plan.children()[0].clone()
sort_onwards.plan.children().swap_remove(0)
} else {
let plan = &sort_onwards.plan;
let mut children = plan.children();
Expand All @@ -703,12 +704,12 @@ fn remove_corresponding_sort_from_sub_plan(
}
// Replace with variants that do not preserve order.
if is_sort_preserving_merge(plan) {
children[0].clone()
children.swap_remove(0)
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
Arc::new(
RepartitionExec::try_new(
children[0].clone(),
children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
Expand All @@ -730,7 +731,7 @@ fn remove_corresponding_sort_from_sub_plan(
updated_plan,
));
} else {
updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan.clone()));
updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan));
}
}
Ok(updated_plan)
Expand Down Expand Up @@ -777,8 +778,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::windows::PartitionSearchMode;

use async_trait::async_trait;

Expand Down Expand Up @@ -239,7 +240,7 @@ pub fn bounded_window_exec(
.unwrap()],
input.clone(),
vec![],
crate::physical_plan::windows::PartitionSearchMode::Sorted,
PartitionSearchMode::Sorted,
)
.unwrap(),
)
Expand Down
17 changes: 9 additions & 8 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,33 @@ use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::{
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunction,
};

use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

#[cfg(test)]
mod tests {
use super::*;
use datafusion::physical_plan::windows::PartitionSearchMode::{

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};

Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
};

use arrow::array::ArrayRef;
use arrow::compute;
use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{DataFusionError, Result, ScalarValue};
Expand Down
71 changes: 58 additions & 13 deletions datafusion/physical-expr/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::intervals::Interval;
use crate::sort_properties::SortProperties;
use crate::utils::scatter;
Expand All @@ -27,11 +32,6 @@ use datafusion_common::utils::DataPtr;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::ColumnarValue;

use std::any::Any;
use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Expression that can be evaluated against a RecordBatch
/// A Physical expression knows its type, nullability and how to evaluate itself.
pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
Expand All @@ -54,13 +54,12 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq<dyn Any> {
let tmp_batch = filter_record_batch(batch, selection)?;

let tmp_result = self.evaluate(&tmp_batch)?;
// All values from the `selection` filter are true.

if batch.num_rows() == tmp_batch.num_rows() {
return Ok(tmp_result);
}
if let ColumnarValue::Array(a) = tmp_result {
let result = scatter(selection, a.as_ref())?;
Ok(ColumnarValue::Array(result))
// All values from the `selection` filter are true.
Ok(tmp_result)
} else if let ColumnarValue::Array(a) = tmp_result {
scatter(selection, a.as_ref()).map(ColumnarValue::Array)
} else {
Ok(tmp_result)
}
Expand Down Expand Up @@ -216,8 +215,8 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
}
}

/// It is similar to contains method of vector.
/// Finds whether `expr` is among `physical_exprs`.
/// This function is similar to the `contains` method of `Vec`. It finds
/// whether `expr` is among `physical_exprs`.
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
Expand All @@ -226,3 +225,49 @@ pub fn physical_exprs_contains(
.iter()
.any(|physical_expr| physical_expr.eq(expr))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::expressions::{Column, Literal};
use crate::physical_expr::{physical_exprs_contains, PhysicalExpr};

use datafusion_common::{Result, ScalarValue};

#[test]
fn test_physical_exprs_contains() -> Result<()> {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit4 =
Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;

// lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
lit_true.clone(),
lit_false.clone(),
lit4.clone(),
lit2.clone(),
col_a_expr.clone(),
col_b_expr.clone(),
];
// below expressions are inside physical_exprs
assert!(physical_exprs_contains(&physical_exprs, &lit_true));
assert!(physical_exprs_contains(&physical_exprs, &lit2));
assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));

// below expressions are not inside physical_exprs
assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
assert!(!physical_exprs_contains(&physical_exprs, &lit1));
Ok(())
}
}
21 changes: 11 additions & 10 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@
//! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed
//! to a function that supports f64, it is coerced to f64.

use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::functions::out_ordering;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::utils::expr_list_eq_strict_order;
use crate::PhysicalExpr;

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::BuiltinScalarFunction;
use datafusion_expr::ColumnarValue;
use datafusion_expr::FuncMonotonicity;
use datafusion_expr::ScalarFunctionImplementation;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use datafusion_expr::{
expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity,
ScalarFunctionImplementation,
};

/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl PhysicalGroupBy {
}

/// Return grouping expressions as they occur in the output schema.
fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
pub fn output_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.expr
.iter()
.enumerate()
Expand Down
12 changes: 1 addition & 11 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
calc_requirements, get_ordered_partition_by_indices, window_ordering_equivalence,
PartitionSearchMode,
};
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
Expand Down Expand Up @@ -68,17 +69,6 @@ use hashbrown::raw::RawTable;
use indexmap::IndexMap;
use log::debug;

#[derive(Debug, Clone, PartialEq)]
/// Specifies partition column properties in terms of input ordering
pub enum PartitionSearchMode {
/// None of the columns among the partition columns is ordered.
Linear,
/// Some columns of the partition columns are ordered but not all
PartiallySorted(Vec<usize>),
/// All Partition columns are ordered (Also empty case)
Sorted,
}

/// Window execution plan
#[derive(Debug)]
pub struct BoundedWindowAggExec {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,23 @@ mod bounded_window_agg_exec;
mod window_agg_exec;

pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
pub use window_agg_exec::WindowAggExec;

pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};

#[derive(Debug, Clone, PartialEq)]
/// Specifies partition column properties in terms of input ordering
pub enum PartitionSearchMode {
/// None of the columns among the partition columns is ordered.
Linear,
/// Some columns of the partition columns are ordered but not all
PartiallySorted(Vec<usize>),
/// All Partition columns are ordered (Also empty case)
Sorted,
}

/// Create a physical expression for window function
pub fn create_window_expr(
fun: &WindowFunction,
Expand Down