Skip to content

Commit

Permalink
Merge pull request #1701 from zhang2014/refactor/aggregator_final
Browse files Browse the repository at this point in the history
Refactor Aggregator for improve performance and prepare refactor transform_aggregator_final
  • Loading branch information
BohuTANG authored Sep 2, 2021
2 parents 236528e + 0198752 commit c469d93
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 103 deletions.
2 changes: 2 additions & 0 deletions common/functions/src/aggregates/aggregate_function_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct StateAddr {
addr: usize,
}

pub type StateAddrs = Vec<StateAddr>;

impl StateAddr {
#[inline]
pub fn new(addr: usize) -> StateAddr {
Expand Down
1 change: 1 addition & 0 deletions common/functions/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub use aggregate_function::AggregateFunctionRef;
pub use aggregate_function_factory::AggregateFunctionFactory;
pub use aggregate_function_state::get_layout_offsets;
pub use aggregate_function_state::StateAddr;
pub use aggregate_function_state::StateAddrs;
pub use aggregate_min_max::AggregateMinMaxFunction;
pub use aggregate_stddev_pop::AggregateStddevPopFunction;
pub use aggregate_sum::AggregateSumFunction;
Expand Down
187 changes: 96 additions & 91 deletions query/src/pipelines/transforms/group_by/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;

use common_datablocks::DataBlock;
use common_datablocks::HashMethod;
use common_datavalues::arrays::BinaryArrayBuilder;
Expand All @@ -23,11 +21,10 @@ use common_datavalues::prelude::Series;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchemaRefExt;
use common_exception::Result;
use common_functions::aggregates::get_layout_offsets;
use common_functions::aggregates::StateAddr;
use common_functions::aggregates::StateAddrs;
use common_infallible::Mutex;
use common_io::prelude::BytesMut;
use common_planners::Expression;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;
Expand All @@ -42,120 +39,105 @@ use crate::pipelines::transforms::group_by::PolymorphicKeysHelper;
pub struct Aggregator<Method: HashMethod> {
method: Method,
params: AggregatorParamsRef,
layout: Layout,
offsets_aggregate_states: Vec<usize>,
}

impl<Method: HashMethod + PolymorphicKeysHelper<Method>> Aggregator<Method> {
pub fn create(
method: Method,
aggr_exprs: &[Expression],
schema: DataSchemaRef,
) -> Result<Aggregator<Method>> {
let aggregator_params = AggregatorParams::try_create(schema, aggr_exprs)?;
// let aggregator_area = AggregatorArea::try_create(&aggregator_params)?;

let aggregate_functions = &aggregator_params.aggregate_functions;
let (states_layout, states_offsets) = unsafe { get_layout_offsets(aggregate_functions) };

Ok(Aggregator {
method,
params: aggregator_params,
layout: states_layout,
offsets_aggregate_states: states_offsets,
})
pub fn create(method: Method, params: AggregatorParamsRef) -> Aggregator<Method> {
Aggregator { method, params }
}

// If we set it to inline(performance degradation).
// Because it will make other internal functions to no inline
#[inline(never)]
pub async fn aggregate(
&self,
group_cols: Vec<String>,
mut stream: SendableDataBlockStream,
) -> Result<Mutex<Method::State>> {
// This may be confusing
// It will help us improve performance ~10% when we declare local references for them.
let hash_method = &self.method;

let groups_locker = Mutex::new(hash_method.aggregate_state());

let aggregator_params = self.params.as_ref();

let aggr_len = aggregator_params.aggregate_functions.len();
let aggregate_functions = &aggregator_params.aggregate_functions;
let aggregate_functions_arguments = &aggregator_params.aggregate_functions_arguments_name;

let layout = self.layout;
let offsets_aggregate_states = &self.offsets_aggregate_states;
let aggregate_state = Mutex::new(hash_method.aggregate_state());

while let Some(block) = stream.next().await {
let block = block?;
let mut groups = aggregate_state.lock();

// 1.1 and 1.2.
let group_columns = Self::group_columns(&group_cols, &block)?;
let mut aggregate_arguments_columns = Vec::with_capacity(aggr_len);
let group_keys = hash_method.build_keys(&group_columns, block.num_rows())?;

{
for function_arguments in aggregate_functions_arguments {
let mut function_arguments_column =
Vec::with_capacity(function_arguments.len());
// TODO: This can be moved outside the while
// In fact, the rust compiler will help us do this(optimize the while match to match while),
// but we need to ensure that the match is simple enough(otherwise there will be performance degradation).
let places: StateAddrs = match aggregator_params.aggregate_functions.is_empty() {
true => self.lookup_key(group_keys, &mut groups),
false => self.lookup_state(group_keys, &mut groups),
};

for argument_name in function_arguments {
let argument_column = block.try_column_by_name(argument_name)?;
function_arguments_column.push(argument_column.to_array()?);
}
Self::execute(aggregator_params, &block, &places)?;
}
Ok(aggregate_state)
}

aggregate_arguments_columns.push(function_arguments_column);
}
}
#[inline(always)]
#[allow(clippy::ptr_arg)] // &[StateAddr] slower than &StateAddrs ~20%
fn execute(params: &AggregatorParams, block: &DataBlock, places: &StateAddrs) -> Result<()> {
let aggregate_functions = &params.aggregate_functions;
let offsets_aggregate_states = &params.offsets_aggregate_states;
let aggregate_arguments_columns = Self::aggregate_arguments(block, params)?;

// This can benificial for the case of dereferencing
// This will help improve the performance ~hundreds of megabits per second
let aggr_arg_columns_slice = &aggregate_arguments_columns;

for index in 0..aggregate_functions.len() {
let rows = block.num_rows();
let function = &aggregate_functions[index];
let state_offset = offsets_aggregate_states[index];
let function_arguments = &aggr_arg_columns_slice[index];
function.accumulate_keys(places, state_offset, function_arguments, rows)?;
}

let mut places = Vec::with_capacity(block.num_rows());
let group_keys = hash_method.build_keys(&group_columns, block.num_rows())?;
let mut groups = groups_locker.lock();
{
let mut inserted = true;
for group_key in group_keys.iter() {
let entity = groups.entity(group_key, &mut inserted);

match inserted {
true => {
if aggr_len == 0 {
entity.set_state_value(0);
} else {
let place: StateAddr = groups.alloc_layout(layout).into();
for idx in 0..aggr_len {
let aggr_state = offsets_aggregate_states[idx];
let aggr_state_place = place.next(aggr_state);
aggregate_functions[idx].init_state(aggr_state_place);
}
places.push(place);
entity.set_state_value(place.addr());
}
}
false => {
let place: StateAddr = (*entity.get_state_value()).into();
places.push(place);
}
}
}
}
Ok(())
}

#[inline(always)]
fn lookup_key(&self, keys: Vec<Method::HashKey>, state: &mut Method::State) -> StateAddrs {
let mut inserted = true;
for key in keys.iter() {
state.entity(key, &mut inserted);
}

{
// this can benificial for the case of dereferencing
let aggr_arg_columns_slice = &aggregate_arguments_columns;

for ((idx, func), args) in aggregate_functions
.iter()
.enumerate()
.zip(aggr_arg_columns_slice.iter())
{
func.accumulate_keys(
&places,
offsets_aggregate_states[idx],
args,
block.num_rows(),
)?;
vec![0_usize.into(); keys.len()]
}

/// Allocate aggregation function state for each key(the same key can always get the same state)
#[inline(always)]
fn lookup_state(&self, keys: Vec<Method::HashKey>, state: &mut Method::State) -> StateAddrs {
let mut places = Vec::with_capacity(keys.len());

let mut inserted = true;
let params = self.params.as_ref();

for key in keys.iter() {
let entity = state.entity(key, &mut inserted);

match inserted {
true => {
let place = state.alloc_layout(params);
places.push(place);
entity.set_state_value(place.addr());
}
false => {
let place: StateAddr = (*entity.get_state_value()).into();
places.push(place);
}
}
}
Ok(groups_locker)
places
}

#[inline(always)]
Expand All @@ -166,6 +148,29 @@ impl<Method: HashMethod + PolymorphicKeysHelper<Method>> Aggregator<Method> {
.collect::<Result<Vec<&DataColumn>>>()
}

#[inline(always)]
fn aggregate_arguments(
block: &DataBlock,
params: &AggregatorParams,
) -> Result<Vec<Vec<Series>>> {
let aggregate_functions_arguments = &params.aggregate_functions_arguments_name;
let mut aggregate_arguments_columns =
Vec::with_capacity(aggregate_functions_arguments.len());
for function_arguments in aggregate_functions_arguments {
let mut function_arguments_column = Vec::with_capacity(function_arguments.len());

for argument_name in function_arguments {
let argument_column = block.try_column_by_name(argument_name)?;
function_arguments_column.push(argument_column.to_array()?);
}

aggregate_arguments_columns.push(function_arguments_column);
}

Ok(aggregate_arguments_columns)
}

#[inline(never)]
pub fn aggregate_finalized(
&self,
groups: &Method::State,
Expand All @@ -182,7 +187,7 @@ impl<Method: HashMethod + PolymorphicKeysHelper<Method>> Aggregator<Method> {
let aggregator_params = self.params.as_ref();
let funcs = &aggregator_params.aggregate_functions;
let aggr_len = funcs.len();
let offsets_aggregate_states = &self.offsets_aggregate_states;
let offsets_aggregate_states = &aggregator_params.offsets_aggregate_states;

// Builders.
let mut state_builders: Vec<BinaryArrayBuilder> = (0..aggr_len)
Expand Down
10 changes: 10 additions & 0 deletions query/src/pipelines/transforms/group_by/aggregator_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::alloc::Layout;
use std::sync::Arc;

use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_functions::aggregates::get_layout_offsets;
use common_functions::aggregates::AggregateFunctionRef;
use common_planners::Expression;

pub struct AggregatorParams {
pub aggregate_functions: Vec<AggregateFunctionRef>,
pub aggregate_functions_column_name: Vec<String>,
pub aggregate_functions_arguments_name: Vec<Vec<String>>,

// about function state memory layout
pub layout: Layout,
pub offsets_aggregate_states: Vec<usize>,
}

pub type AggregatorParamsRef = Arc<AggregatorParams>;
Expand All @@ -39,10 +45,14 @@ impl AggregatorParams {
aggregate_functions_arguments_name.push(expr.to_aggregate_function_names()?);
}

let (states_layout, states_offsets) = unsafe { get_layout_offsets(&aggregate_functions) };

Ok(Arc::new(AggregatorParams {
aggregate_functions,
aggregate_functions_column_name,
aggregate_functions_arguments_name,
layout: states_layout,
offsets_aggregate_states: states_offsets,
}))
}
}
Loading

0 comments on commit c469d93

Please sign in to comment.