Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve median performance. #6837

Merged
merged 4 commits into from
Jul 5, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions datafusion/physical-expr/src/aggregate/median.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl AggregateExpr for Median {
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
Ok(Box::new(MedianAccumulator {
data_type: self.data_type.clone(),
arrays: vec![],
all_values: vec![],
}))
}
Expand Down Expand Up @@ -108,29 +109,31 @@ impl PartialEq<dyn Any> for Median {
/// The median accumulator accumulates the raw input values
/// as `ScalarValue`s
///
/// The intermediate state is represented as a List of those scalars
/// The intermediate state is represented as a List of scalar values updated by
/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
/// in the final evaluation step so that we avoid expensive conversions and
/// allocations during `update_batch`.
struct MedianAccumulator {
data_type: DataType,
arrays: Vec<ArrayRef>,
all_values: Vec<ScalarValue>,
}

impl Accumulator for MedianAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
let state =
ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
let all_values = to_scalar_values(&self.arrays)?;
let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can change ScalarValue::List to use ArrayRef instead of Vec<ScalarValue> internally.
This would avoid quite some expensive conversions from/to ScalarValue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would help a lot, maybe we can add a ScalarValue::Array variant to ScalarValue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @Dandandan was suggesting

impl ScalarValue {
  ...
  List(ArrayRef)
  ...
}

In general this would work well with the approach @tustvold is working on upstream in arrow-rs with Datum -- apache/arrow-rs#4393

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I was trying to suggest - this would avoid the need to convert / allocate to individual ScalarValues and convert to Array later on again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work well, the reason I was suggesting adding an Array variant instead of changing List was to avoid changing all the code that depends on List(Option<Vec<ScalarValue>>, FieldRef), but yes long term that would probably be best.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this some more, I think the most performant thing to do will be to implement a native GroupsAccumulator (aka #6800 ) for median. With sufficient effort we could make median be very fast -- so I think this is a good improvement for now


Ok(vec![state])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
assert_eq!(values.len(), 1);
let array = &values[0];

// Defer conversions to scalar values to final evaluation.
assert_eq!(array.data_type(), &self.data_type);
self.all_values.reserve(array.len());
for index in 0..array.len() {
self.all_values
.push(ScalarValue::try_from_array(array, index)?);
}
self.arrays.push(array.clone());

Ok(())
}
Expand All @@ -157,7 +160,14 @@ impl Accumulator for MedianAccumulator {
}

fn evaluate(&self) -> Result<ScalarValue> {
if !self.all_values.iter().any(|v| !v.is_null()) {
let batch_values = to_scalar_values(&self.arrays)?;

if !self
.all_values
.iter()
.chain(batch_values.iter())
.any(|v| !v.is_null())
{
return ScalarValue::try_from(&self.data_type);
}

Expand All @@ -166,6 +176,7 @@ impl Accumulator for MedianAccumulator {
let array = ScalarValue::iter_to_array(
self.all_values
.iter()
.chain(batch_values.iter())
// ignore null values
.filter(|v| !v.is_null())
.cloned(),
Expand Down Expand Up @@ -214,13 +225,30 @@ impl Accumulator for MedianAccumulator {
}

fn size(&self) -> usize {
std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values)
let arrays_size: usize = self.arrays.iter().map(|a| a.len()).sum();

std::mem::size_of_val(self)
+ ScalarValue::size_of_vec(&self.all_values)
+ arrays_size
- std::mem::size_of_val(&self.all_values)
+ self.data_type.size()
- std::mem::size_of_val(&self.data_type)
}
}

fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
let num_values: usize = arrays.iter().map(|a| a.len()).sum();
let mut all_values = Vec::with_capacity(num_values);

for array in arrays {
for index in 0..array.len() {
all_values.push(ScalarValue::try_from_array(&array, index)?);
}
}

Ok(all_values)
}

/// Given a returns `array[indicies[indicie_index]]` as a `ScalarValue`
fn scalar_at_index(
array: &dyn Array,
Expand Down