From 4d80b0f3e6610c2f254e5d6e5b82a114a0ec9035 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Wed, 4 Sep 2024 21:25:25 -0700 Subject: [PATCH 1/4] adding max_by and min_by --- .../functions-aggregate/src/first_last.rs | 2 +- datafusion/functions-aggregate/src/lib.rs | 3 + .../functions-aggregate/src/max_min_by.rs | 205 ++++++++++++++++++ .../sqllogictest/test_files/aggregate.slt | 67 ++++++ .../user-guide/sql/aggregate_functions.md | 32 +++ 5 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 datafusion/functions-aggregate/src/max_min_by.rs diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 30f5d5b07561..3ba2cdc02eb1 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -213,7 +213,7 @@ impl FirstValueAccumulator { } // Updates state with the values in the given row. - fn update_with_new_row(&mut self, row: &[ScalarValue]) { + pub fn update_with_new_row(&mut self, row: &[ScalarValue]) { self.first = row[0].clone(); self.orderings = row[1..].to_vec(); self.is_set = true; diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 60e2602eb6ed..14acad87d29e 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -79,6 +79,7 @@ pub mod bit_and_or_xor; pub mod bool_and_or; pub mod grouping; pub mod kurtosis_pop; +pub mod max_min_by; pub mod nth_value; pub mod string_agg; @@ -172,6 +173,8 @@ pub fn all_default_aggregate_functions() -> Vec> { grouping::grouping_udaf(), nth_value::nth_value_udaf(), kurtosis_pop::kurtosis_pop_udaf(), + max_min_by::max_by_udaf(), + max_min_by::min_by_udaf(), ] } diff --git a/datafusion/functions-aggregate/src/max_min_by.rs b/datafusion/functions-aggregate/src/max_min_by.rs new file mode 100644 index 000000000000..a78606bbaa33 --- /dev/null +++ b/datafusion/functions-aggregate/src/max_min_by.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::first_last::last_value_udaf; +use arrow_schema::DataType; +use datafusion_common::{exec_err, Result}; +use datafusion_expr::expr::{AggregateFunction, Sort}; +use datafusion_expr::simplify::SimplifyInfo; +use datafusion_expr::{Accumulator, AggregateUDFImpl, Expr, Signature, Volatility}; +use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; +use std::any::Any; +use std::fmt::Debug; +use std::ops::Deref; + +make_udaf_expr_and_func!( + MaxByFunction, + max_by, + x y, + "Returns the value of the first column corresponding to the maximum value in the second column.", + max_by_udaf +); + +pub struct MaxByFunction { + signature: Signature, +} + +impl Debug for MaxByFunction { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("MaxBy") + .field("name", &self.name()) + .field("signature", &self.signature) + .field("accumulator", &"") + .finish() + } +} +impl Default for MaxByFunction { + fn default() -> Self { + Self::new() + } +} + +impl MaxByFunction { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +fn get_min_max_by_result_type(input_types: &[DataType]) -> Result> { + match &input_types[0] { + DataType::Dictionary(_, dict_value_type) => { + // TODO add checker, if the value type is complex data type + Ok(vec![dict_value_type.deref().clone()]) + } + _ => Ok(input_types.to_vec()), + } +} + +impl AggregateUDFImpl for MaxByFunction { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "max_by" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].to_owned()) + } + + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + exec_err!("should not reach here") + } + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + get_min_max_by_result_type(arg_types) + } + + fn simplify( + &self, + ) -> Option { + let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, + _: &dyn SimplifyInfo| { + let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]); + let (second_arg, first_arg) = + (aggr_func.args.remove(1), aggr_func.args.remove(0)); + + order_by.push(Sort::new(second_arg, true, false)); + + Ok(Expr::AggregateFunction(AggregateFunction::new_udf( + last_value_udaf(), + vec![first_arg], + aggr_func.distinct, + aggr_func.filter, + Some(order_by), + aggr_func.null_treatment, + ))) + }; + Some(Box::new(simplify)) + } +} + +make_udaf_expr_and_func!( + MinByFunction, + min_by, + x y, + "Returns the value of the first column corresponding to the minimum value in the second column.", + min_by_udaf +); + +pub struct MinByFunction { + signature: Signature, +} + +impl Debug for MinByFunction { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("MinBy") + .field("name", &self.name()) + .field("signature", &self.signature) + .field("accumulator", &"") + .finish() + } +} + +impl Default for MinByFunction { + fn default() -> Self { + Self::new() + } +} + +impl MinByFunction { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl AggregateUDFImpl for MinByFunction { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "min_by" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].to_owned()) + } + + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + exec_err!("should not reach here") + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + get_min_max_by_result_type(arg_types) + } + + fn simplify( + &self, + ) -> Option { + let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, + _: &dyn SimplifyInfo| { + let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]); + let (second_arg, first_arg) = + (aggr_func.args.remove(1), aggr_func.args.remove(0)); + + order_by.push(Sort::new(second_arg, false, false)); // false for ascending sort + + Ok(Expr::AggregateFunction(AggregateFunction::new_udf( + last_value_udaf(), + vec![first_arg], + aggr_func.distinct, + aggr_func.filter, + Some(order_by), + aggr_func.null_treatment, + ))) + }; + Some(Box::new(simplify)) + } +} diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c52445c561ee..4888b6a290f2 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5924,3 +5924,70 @@ SELECT kurtosis_pop(c1) FROM t1; statement ok DROP TABLE t1; + +# test max_by function +query I +SELECT max_by(num_column, value) AS max_num +FROM VALUES + (10, 1), + (20, 2), + (30, 3) AS tab(num_column, value); +---- +30 + +query R +SELECT max_by(float_column, value) AS max_float +FROM VALUES + (10.5, 5), + (20.75, 10), + (15.25, 15) AS tab(float_column, value); +---- +15.25 + +query T +SELECT max_by(date_column, value) AS max_date +FROM VALUES + ('2024-01-01', 1), + ('2024-02-01', 2), + ('2024-03-01', 3) AS tab(date_column, value); +---- +2024-03-01 + + +# Test max_by function with same maximum values (order sensitivity) +query T +SELECT max_by(value_column, metric) AS max_value +FROM VALUES + ('A', 2), + ('B', 2), + ('C', 1) AS tab(value_column, metric); +---- +A + +query T +SELECT max_by(value_column, metric) AS max_value +FROM VALUES + ('B', 2), -- Reversed order + ('A', 2), + ('C', 1) AS tab(value_column, metric); +---- +B + + +query T +SELECT min_by(value_column, metric) AS min_value +FROM VALUES + ('A', 1), + ('B', 0), -- same min metric value as 'C' + ('C', 0) AS tab(value_column, metric); +---- +B + +query T +SELECT min_by(value_column, metric) AS min_value +FROM VALUES + ('C', 0), -- Reversed order + ('B', 0), + ('A', 1) AS tab(value_column, metric); +---- +C diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md index 1c214084b3fa..485597a27e63 100644 --- a/docs/source/user-guide/sql/aggregate_functions.md +++ b/docs/source/user-guide/sql/aggregate_functions.md @@ -253,6 +253,8 @@ last_value(expression [ORDER BY expression]) - [regr_syy](#regr_syy) - [regr_sxy](#regr_sxy) - [kurtosis_pop](#kurtosis_pop) +- [max_by](#max_by) +- [min_by](#min_by) ### `corr` @@ -541,6 +543,36 @@ kurtois_pop(expression) - **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators. +### `max_by` + +Returns the value of the first expression corresponding to the maximum value in the second expression. If there are multiple values in the first expression with the same maximum value in the second expression, the result will be the first occurrence of such a value based on the input order. + +``` +max_by(expression1, expression2) +``` +#### Arguments + +- **expression1**: First expression to return the value from. +Can be a constant, column, or function, and any combination of arithmetic operators. + +- **expression2** Second expression used to determine the maximum value. +Can be a constant, column, or function, and any combination of arithmetic operators. + +### `min_by` + +Returns the value of the first expression corresponding to the minimum value in the second expression. If there are multiple values in the first expression with the same minimum value in the second expression, the result will be the first occurrence of such a value based on the input order. + +``` +min_by(expression1, expression2) +``` +#### Arguments + +- **expression1**: First expression to return the value from. +Can be a constant, column, or function, and any combination of arithmetic operators. + +- **expression2** Second expression used to determine the maximum value. +Can be a constant, column, or function, and any combination of arithmetic operators. + ## Approximate - [approx_distinct](#approx_distinct) From 326b8dc5f44c2156de009d50ccda1714f32b1d76 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Wed, 4 Sep 2024 21:27:00 -0700 Subject: [PATCH 2/4] format --- docs/source/user-guide/sql/aggregate_functions.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md index 485597a27e63..f3bb71e0f6a9 100644 --- a/docs/source/user-guide/sql/aggregate_functions.md +++ b/docs/source/user-guide/sql/aggregate_functions.md @@ -550,13 +550,14 @@ Returns the value of the first expression corresponding to the maximum value in ``` max_by(expression1, expression2) ``` + #### Arguments - **expression1**: First expression to return the value from. -Can be a constant, column, or function, and any combination of arithmetic operators. + Can be a constant, column, or function, and any combination of arithmetic operators. - **expression2** Second expression used to determine the maximum value. -Can be a constant, column, or function, and any combination of arithmetic operators. + Can be a constant, column, or function, and any combination of arithmetic operators. ### `min_by` @@ -565,13 +566,14 @@ Returns the value of the first expression corresponding to the minimum value in ``` min_by(expression1, expression2) ``` + #### Arguments - **expression1**: First expression to return the value from. -Can be a constant, column, or function, and any combination of arithmetic operators. + Can be a constant, column, or function, and any combination of arithmetic operators. - **expression2** Second expression used to determine the maximum value. -Can be a constant, column, or function, and any combination of arithmetic operators. + Can be a constant, column, or function, and any combination of arithmetic operators. ## Approximate From 629a4d907315b2334968bc222e511a9d897d1dd6 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Wed, 4 Sep 2024 21:45:47 -0700 Subject: [PATCH 3/4] fix clippy --- datafusion/functions-aggregate/src/max_min_by.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/max_min_by.rs b/datafusion/functions-aggregate/src/max_min_by.rs index a78606bbaa33..3a684e824e96 100644 --- a/datafusion/functions-aggregate/src/max_min_by.rs +++ b/datafusion/functions-aggregate/src/max_min_by.rs @@ -100,7 +100,7 @@ impl AggregateUDFImpl for MaxByFunction { ) -> Option { let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, _: &dyn SimplifyInfo| { - let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]); + let mut order_by = aggr_func.order_by.unwrap_or_else(Vec::new); let (second_arg, first_arg) = (aggr_func.args.remove(1), aggr_func.args.remove(0)); @@ -185,7 +185,7 @@ impl AggregateUDFImpl for MinByFunction { ) -> Option { let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, _: &dyn SimplifyInfo| { - let mut order_by = aggr_func.order_by.unwrap_or_else(|| vec![]); + let mut order_by = aggr_func.order_by.unwrap_or_else(Vec::new); let (second_arg, first_arg) = (aggr_func.args.remove(1), aggr_func.args.remove(0)); From c67e983d748ed1f1a0fcba492549a2b0db3bf9a1 Mon Sep 17 00:00:00 2001 From: Lordworms Date: Wed, 4 Sep 2024 22:09:52 -0700 Subject: [PATCH 4/4] fix clippy --- datafusion/functions-aggregate/src/max_min_by.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/max_min_by.rs b/datafusion/functions-aggregate/src/max_min_by.rs index 3a684e824e96..d994720c4804 100644 --- a/datafusion/functions-aggregate/src/max_min_by.rs +++ b/datafusion/functions-aggregate/src/max_min_by.rs @@ -100,7 +100,7 @@ impl AggregateUDFImpl for MaxByFunction { ) -> Option { let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, _: &dyn SimplifyInfo| { - let mut order_by = aggr_func.order_by.unwrap_or_else(Vec::new); + let mut order_by = aggr_func.order_by.unwrap_or_default(); let (second_arg, first_arg) = (aggr_func.args.remove(1), aggr_func.args.remove(0)); @@ -185,7 +185,7 @@ impl AggregateUDFImpl for MinByFunction { ) -> Option { let simplify = |mut aggr_func: datafusion_expr::expr::AggregateFunction, _: &dyn SimplifyInfo| { - let mut order_by = aggr_func.order_by.unwrap_or_else(Vec::new); + let mut order_by = aggr_func.order_by.unwrap_or_default(); let (second_arg, first_arg) = (aggr_func.args.remove(1), aggr_func.args.remove(0));