Skip to content

Commit

Permalink
fixup! WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Aug 26, 2024
1 parent 4ad3416 commit eef0084
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 36 deletions.
4 changes: 2 additions & 2 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod non_windows {
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::{SortExpr};

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
Expand All @@ -49,7 +49,7 @@ mod non_windows {
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
sort: Vec<Vec<SortExpr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ use datafusion::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::Projection;
use datafusion_expr::{Projection, SortExpr};
use datafusion_expr::tree_node::replace_sort_expression;
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;

Expand Down Expand Up @@ -392,7 +393,7 @@ struct TopKPlanNode {
input: LogicalPlan,
/// The sort expression (this example only supports a single sort
/// expr)
expr: Expr,
expr: SortExpr,
}

impl Debug for TopKPlanNode {
Expand All @@ -418,7 +419,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
}

fn expressions(&self) -> Vec<Expr> {
vec![self.expr.clone()]
vec![self.expr.expr.as_ref().clone()]
}

/// For example: `TopK: k=10`
Expand All @@ -436,7 +437,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
Ok(Self {
k: self.k,
input: inputs.swap_remove(0),
expr: exprs.swap_remove(0),
expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)),
})
}
}
Expand Down
13 changes: 8 additions & 5 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,14 @@ pub fn replace_sort_expressions(sorts: Vec<Sort>, new_expr: Vec<Expr>) -> Vec<So

let mut new_sorts = Vec::with_capacity(sorts.len());
for (i, expr) in new_expr.into_iter().enumerate() {
new_sorts.push(Sort {
expr: Box::new(expr),
asc: sorts[i].asc,
nulls_first: sorts[i].nulls_first,
});
new_sorts.push(replace_sort_expression(sorts[i].clone(), expr));
}
new_sorts
}

pub fn replace_sort_expression(sort: Sort, new_expr: Expr) -> Sort {
Sort {
expr: Box::new(new_expr),
..sort
}
}
2 changes: 1 addition & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 16 additions & 15 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
use crate::protobuf::{CustomTableScanNode, LogicalExprNodeCollection};
use crate::protobuf::{CustomTableScanNode, SortExprNodeCollection};
use crate::{
convert_required, into_required,
protobuf::{
Expand Down Expand Up @@ -65,6 +65,7 @@ use datafusion_expr::{AggregateUDF, Unnest};
use self::to_proto::{serialize_expr, serialize_exprs};
use prost::bytes::BufMut;
use prost::Message;
use crate::logical_plan::to_proto::serialize_sorts;

pub mod file_formats;
pub mod from_proto;
Expand Down Expand Up @@ -344,7 +345,7 @@ impl AsLogicalPlan for LogicalPlanNode {
let mut all_sort_orders = vec![];
for order in &scan.file_sort_order {
all_sort_orders.push(from_proto::parse_sorts(
&order.logical_expr_nodes,
&order.sort_expr_nodes,
ctx,
extension_codec,
)?)
Expand Down Expand Up @@ -472,7 +473,7 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan =
into_logical_plan!(sort.input, ctx, extension_codec)?;
let sort_expr: Vec<Expr> =
let sort_expr: Vec<SortExpr> =
from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).sort(sort_expr)?.build()
}
Expand Down Expand Up @@ -533,7 +534,7 @@ impl AsLogicalPlan for LogicalPlanNode {
let mut order_exprs = vec![];
for expr in &create_extern_table.order_exprs {
order_exprs.push(from_proto::parse_sorts(
&expr.logical_expr_nodes,
&expr.sort_expr_nodes,
ctx,
extension_codec,
)?);
Expand Down Expand Up @@ -768,7 +769,7 @@ impl AsLogicalPlan for LogicalPlanNode {
)?;
let sort_expr = match distinct_on.sort_expr.len() {
0 => None,
_ => Some(from_proto::parse_exprs(
_ => Some(from_proto::parse_sorts(
&distinct_on.sort_expr,
ctx,
extension_codec,
Expand Down Expand Up @@ -977,10 +978,10 @@ impl AsLogicalPlan for LogicalPlanNode {

let options = listing_table.options();

let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
for order in &options.file_sort_order {
let expr_vec = LogicalExprNodeCollection {
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
let expr_vec = SortExprNodeCollection {
sort_expr_nodes: serialize_sorts(order, extension_codec)?,
};
exprs_vec.push(expr_vec);
}
Expand Down Expand Up @@ -1110,7 +1111,7 @@ impl AsLogicalPlan for LogicalPlanNode {
)?;
let sort_expr = match sort_expr {
None => vec![],
Some(sort_expr) => serialize_exprs(sort_expr, extension_codec)?,
Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
};
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
Expand Down Expand Up @@ -1254,13 +1255,13 @@ impl AsLogicalPlan for LogicalPlanNode {
input.as_ref(),
extension_codec,
)?;
let selection_expr: Vec<protobuf::LogicalExprNode> =
serialize_exprs(expr, extension_codec)?;
let sort_expr: Vec<protobuf::SortExprNode> =
serialize_sorts(expr, extension_codec)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
protobuf::SortNode {
input: Some(Box::new(input)),
expr: selection_expr,
expr: sort_expr,
fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
},
))),
Expand Down Expand Up @@ -1330,10 +1331,10 @@ impl AsLogicalPlan for LogicalPlanNode {
column_defaults,
},
)) => {
let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
for order in order_exprs {
let temp = LogicalExprNodeCollection {
logical_expr_nodes: serialize_exprs(order, extension_codec)?,
let temp = SortExprNodeCollection {
sort_expr_nodes: serialize_sorts(order, extension_codec)?,
};
converted_order_exprs.push(temp);
}
Expand Down
10 changes: 1 addition & 9 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_common::{
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, Between, BinaryExpr, Case, Cast, GroupingSet, InList, Like, ScalarFunction,
Sort, Unnest, WildcardOptions,
Unnest, WildcardOptions,
};
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
use datafusion_expr::{
Expand Down Expand Up @@ -1939,14 +1939,6 @@ fn roundtrip_try_cast() {
roundtrip_expr_test(test_expr, ctx);
}

#[test]
fn roundtrip_sort_expr() {
let test_expr = Expr::Sort(Sort::new(Box::new(lit(1.0_f32)), true, true));

let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
}

#[test]
fn roundtrip_negative() {
let test_expr = Expr::Negative(Box::new(lit(1.0_f32)));
Expand Down

0 comments on commit eef0084

Please sign in to comment.