Skip to content

Commit

Permalink
feat(frontend): support order by clause of agg call (#3720)
Browse files Browse the repository at this point in the history
* support binding agg order by clause

* support planning agg call with order by clause

* print order by clause of agg call

* correct the generation of BatchProject

* publicly expose AggOrderBy::sort_exprs

* remove a useless log

* fix debug format of PlanAggCall

* revert changes to PlanAggCall::filter

* add plan tests for order by clause

* return error for some invalid syntax

* use Option<OrderType> instead of Option<bool>

* move ORDER BY clause in debug output

* fix plan tests

* use Direction instead of OrderType

* remove unused derive

* change default value of AggOrderByExpr::direction to Direction::Asc

* add a todo comment

* change nulls_first field to bool without Option

* fix plan tests

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
stdrc and mergify[bot] authored Jul 13, 2022
1 parent ff4c0d7 commit eb1d49e
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 23 deletions.
43 changes: 39 additions & 4 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use risingwave_sqlparser::ast::{Function, FunctionArg, FunctionArgExpr};

use crate::binder::bind_context::Clause;
use crate::binder::Binder;
use crate::expr::{AggCall, Expr, ExprImpl, ExprType, FunctionCall, Literal};
use crate::expr::{
AggCall, AggOrderBy, AggOrderByExpr, Expr, ExprImpl, ExprType, FunctionCall, Literal,
};
use crate::optimizer::property::Direction;
use crate::utils::Condition;

impl Binder {
Expand Down Expand Up @@ -77,12 +80,44 @@ impl Binder {
}
None => Condition::true_cond(),
};
// TODO(yuchao): handle DISTINCT and ORDER BY appear at the same time
if f.distinct && !f.order_by.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(
"DISTINCT and ORDER BY are not supported to appear at the same time now"
.to_string(),
)
.into());
}
let order_by = AggOrderBy::new(
f.order_by
.into_iter()
.map(|e| -> Result<AggOrderByExpr> {
Ok({
let mut expr = AggOrderByExpr {
expr: self.bind_expr(e.expr)?,
direction: match e.asc {
None | Some(true) => Direction::Asc,
Some(false) => Direction::Desc,
},
nulls_first: Default::default(),
};
expr.nulls_first =
e.nulls_first.unwrap_or_else(|| match expr.direction {
Direction::Asc => false,
Direction::Desc => true,
Direction::Any => unreachable!(),
});
expr
})
})
.try_collect()?,
);
return Ok(ExprImpl::AggCall(Box::new(AggCall::new(
kind, inputs, f.distinct, filter,
kind, inputs, f.distinct, order_by, filter,
)?)));
} else if f.filter.is_some() {
} else if f.distinct || !f.order_by.is_empty() || f.filter.is_some() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"filter clause is only allowed in aggregation functions, but `{}` is not an aggregation function", function_name
"DISTINCT, ORDER BY or FILTER is only allowed in aggregation functions, but `{}` is not an aggregation function", function_name
)
)
.into());
Expand Down
54 changes: 51 additions & 3 deletions src/frontend/src/expr/agg_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,55 @@ use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use risingwave_expr::expr::AggKind;

use super::{Expr, ExprImpl};
use super::{Expr, ExprImpl, ExprRewriter};
use crate::optimizer::property::Direction;
use crate::utils::Condition;

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct AggOrderByExpr {
pub expr: ExprImpl,
pub direction: Direction,
pub nulls_first: bool,
}

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct AggOrderBy {
pub sort_exprs: Vec<AggOrderByExpr>,
}

impl AggOrderBy {
pub fn any() -> Self {
Self {
sort_exprs: Vec::new(),
}
}

pub fn new(sort_exprs: Vec<AggOrderByExpr>) -> Self {
Self { sort_exprs }
}

pub fn rewrite_expr(self, rewriter: &mut (impl ExprRewriter + ?Sized)) -> Self {
Self {
sort_exprs: self
.sort_exprs
.into_iter()
.map(|e| AggOrderByExpr {
expr: rewriter.rewrite_expr(e.expr),
direction: e.direction,
nulls_first: e.nulls_first,
})
.collect(),
}
}
}

#[derive(Clone, Eq, PartialEq, Hash)]
pub struct AggCall {
agg_kind: AggKind,
return_type: DataType,
inputs: Vec<ExprImpl>,
distinct: bool,
order_by: AggOrderBy,
filter: Condition,
}

Expand Down Expand Up @@ -111,6 +151,7 @@ impl AggCall {
agg_kind: AggKind,
inputs: Vec<ExprImpl>,
distinct: bool,
order_by: AggOrderBy,
filter: Condition,
) -> Result<Self> {
let data_types = inputs.iter().map(ExprImpl::return_type).collect_vec();
Expand All @@ -120,12 +161,19 @@ impl AggCall {
return_type,
inputs,
distinct,
order_by,
filter,
})
}

pub fn decompose(self) -> (AggKind, Vec<ExprImpl>, bool, Condition) {
(self.agg_kind, self.inputs, self.distinct, self.filter)
pub fn decompose(self) -> (AggKind, Vec<ExprImpl>, bool, AggOrderBy, Condition) {
(
self.agg_kind,
self.inputs,
self.distinct,
self.order_by,
self.filter,
)
}

pub fn agg_kind(&self) -> AggKind {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/expr/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ pub trait ExprRewriter {
FunctionCall::new_unchecked(func_type, inputs, ret).into()
}
fn rewrite_agg_call(&mut self, agg_call: AggCall) -> ExprImpl {
let (func_type, inputs, distinct, filter) = agg_call.decompose();
let (func_type, inputs, distinct, order_by, filter) = agg_call.decompose();
let inputs = inputs
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
let order_by = order_by.rewrite_expr(self);
let filter = filter.rewrite_expr(self);
AggCall::new(func_type, inputs, distinct, filter)
AggCall::new(func_type, inputs, distinct, order_by, filter)
.unwrap()
.into()
}
Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod expr_visitor;
mod type_inference;
mod utils;

pub use agg_call::AggCall;
pub use agg_call::{AggCall, AggOrderBy, AggOrderByExpr};
pub use correlated_input_ref::CorrelatedInputRef;
pub use function_call::FunctionCall;
pub use input_ref::{as_alias_display, input_ref_to_column_indices, InputRef, InputRefDisplay};
Expand Down Expand Up @@ -91,9 +91,15 @@ impl ExprImpl {
/// A `count(*)` aggregate function.
#[inline(always)]
pub fn count_star() -> Self {
AggCall::new(AggKind::Count, vec![], false, Condition::true_cond())
.unwrap()
.into()
AggCall::new(
AggKind::Count,
vec![],
false,
AggOrderBy::any(),
Condition::true_cond(),
)
.unwrap()
.into()
}

/// Collect all `InputRef`s' indexes in the expression.
Expand Down
Loading

0 comments on commit eb1d49e

Please sign in to comment.