Skip to content

Commit

Permalink
Remove Sort expression (Expr::Sort) (#12177)
Browse files Browse the repository at this point in the history
* Take Sort (SortExpr) in file options

Part of effort to remove `Expr::Sort`.

* Return Sort from Expr.Sort

Part of effort to remove `Expr::Sort`.

* Accept Sort (SortExpr) in `LogicalPlanBuilder.sort`

Take `expr::Sort` in `LogicalPlanBuilder.sort`.
Accept any `Expr` in new function, `LogicalPlanBuilder.sort_by` which
apply default sort ordering.

Part of effort to remove `Expr::Sort`.

* Operate on `Sort` in to_substrait_sort_field / from_substrait_sorts

Part of effort to remove `Expr::Sort`.

* Take Sort (SortExpr) in tests' TopKPlanNode

Part of effort to remove `Expr::Sort`.

* Remove Sort expression (`Expr::Sort`)

Remove sort as an expression, i.e. remove `Expr::Sort` from `Expr` enum.
Use `expr::Sort` directly when sorting.

The sort expression was used in context of ordering (sort, topk, create
table, file sorting).  Those places  require their sort expression to be
of type Sort anyway and no other expression was allowed, so this change
improves static typing.  Sort as an expression was illegal in other
contexts.

* use assert_eq just like in LogicalPlan.with_new_exprs

* avoid clone in replace_sort_expressions

* reduce cloning in EliminateDuplicatedExpr

* restore SortExprWrapper

this commit is longer than advised in the review comment, but after
squashing the diff will be smaller

* shorthand SortExprWrapper struct definition
  • Loading branch information
findepi authored Aug 29, 2024
1 parent f5dcdf0 commit 85adb6c
Show file tree
Hide file tree
Showing 57 changed files with 704 additions and 772 deletions.
4 changes: 2 additions & 2 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod non_windows {
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::SortExpr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
Expand All @@ -49,7 +49,7 @@ mod non_windows {
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
sort: Vec<Vec<SortExpr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand Down Expand Up @@ -577,7 +577,7 @@ impl DataFrame {
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -776,6 +776,15 @@ impl DataFrame {
})
}

/// Apply a sort by provided expressions with default direction
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
self.sort(
expr.into_iter()
.map(|e| e.sort(true, false))
.collect::<Vec<SortExpr>>(),
)
}

/// Sort the DataFrame by the specified sorting expressions.
///
/// Note that any expression can be turned into
Expand All @@ -797,7 +806,7 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::TableOptions;
Expand All @@ -41,6 +40,7 @@ use datafusion_common::{
};

use async_trait::async_trait;
use datafusion_expr::SortExpr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct CsvReadOptions<'a> {
/// File compression type
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a> CsvReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct ParquetReadOptions<'a> {
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<'a> ParquetReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -397,7 +397,7 @@ pub struct NdJsonReadOptions<'a> {
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<'a> NdJsonReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
Expand Down
14 changes: 4 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::TableType;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
Expand Down Expand Up @@ -222,7 +222,7 @@ pub struct ListingOptions {
/// ordering (encapsulated by a `Vec<Expr>`). If there aren't
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl ListingOptions {
Expand Down Expand Up @@ -385,7 +385,7 @@ impl ListingOptions {
///
/// assert_eq!(listing_options.file_sort_order, file_sort_order);
/// ```
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -909,8 +909,7 @@ impl TableProvider for ListingTable {
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
let order_requirements = if self.options().file_sort_order != unsorted {
let order_requirements = if !self.options().file_sort_order.is_empty() {
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
.try_create_output_ordering()?
Expand Down Expand Up @@ -1160,11 +1159,6 @@ mod tests {
// (file_sort_order, expected_result)
let cases = vec![
(vec![], Ok(vec![])),
// not a sort expr
(
vec![vec![col("string_col")]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
// sort expr, but non column
(
vec![vec![
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand All @@ -64,7 +65,7 @@ pub struct MemTable {
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<Expr>>>>,
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}

impl MemTable {
Expand Down Expand Up @@ -118,7 +119,7 @@ impl MemTable {
///
/// Note that multiple sort orders are supported, if some are known to be
/// equivalent,
pub fn with_sort_order(self, mut sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
self
}
Expand Down
41 changes: 21 additions & 20 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,39 @@ pub use statistics::get_statistics_with_limit;

use arrow_schema::{Schema, SortOptions};
use datafusion_common::{plan_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::{Expr, SortExpr};
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};

fn create_ordering(
schema: &Schema,
sort_order: &[Vec<Expr>],
sort_order: &[Vec<SortExpr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
for sort in exprs {
match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => {
return plan_err!(
"Expected single column references in output_ordering, got {expr}"
)
}
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
}
}
if !sort_exprs.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ mod tests {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<datafusion_expr::Expr>,
sort: Vec<datafusion_expr::SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
Expand Down Expand Up @@ -248,7 +248,7 @@ impl StreamProvider for FileStreamProvider {
#[derive(Debug)]
pub struct StreamConfig {
source: Arc<dyn StreamProvider>,
order: Vec<Vec<Expr>>,
order: Vec<Vec<SortExpr>>,
constraints: Constraints,
}

Expand All @@ -263,7 +263,7 @@ impl StreamConfig {
}

/// Specify a sort order for the stream
pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
self.order = order;
self
}
Expand Down
32 changes: 14 additions & 18 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
WindowFrame, WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -1641,31 +1641,27 @@ pub fn create_aggregate_expr_and_maybe_filter(

/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
e: &Expr,
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
if let Expr::Sort(expr::Sort {
let SortExpr {
expr,
asc,
nulls_first,
}) = e
{
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
} else {
internal_err!("Expects a sort expression")
}
} = e;
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}

/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[Expr],
exprs: &[SortExpr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<LexOrdering> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::TableReference;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr};

Expand Down Expand Up @@ -360,7 +360,7 @@ pub fn register_unbounded_file_with_ordering(
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
file_sort_order: Vec<Vec<SortExpr>>,
) -> Result<()> {
let source = FileStreamProvider::new_file(schema, file_path.into());
let config = StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
Expand Down
Loading

0 comments on commit 85adb6c

Please sign in to comment.