Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve median performance. #6837

Merged
merged 4 commits into from
Jul 5, 2023
Merged

Improve median performance. #6837

merged 4 commits into from
Jul 5, 2023

Conversation

vincev
Copy link
Contributor

@vincev vincev commented Jul 4, 2023

Which issue does this PR close?

Related to discussion in #4973.

Rationale for this change

This PR improves the median aggregator by reducing the number of allocations.

For this example I am using one year of data from the nyctaxi dataset, running a group by payment_type and computing the total_amount median, with the current version it takes ~22secs:

$ time ./target/release/datafusion-median
+--------------+--------------+----------+
| payment_type | total_amount | n        |
+--------------+--------------+----------+
| 5            | 5.275        | 6        |
| 3            | 7.7          | 194323   |
| 4            | -6.8         | 244364   |
| 0            | 23.0         | 1368303  |
| 2            | 13.3         | 7763339  |
| 1            | 16.56        | 30085763 |
+--------------+--------------+----------+

real    0m22.861s
user    1m54.033s
sys     0m2.872s

with the changes introduced in this PR we get the same result in ~2secs:

$ time ./target/release/datafusion-median
+--------------+--------------+----------+
| payment_type | total_amount | n        |
+--------------+--------------+----------+
| 5            | 5.275        | 6        |
| 3            | 7.7          | 194323   |
| 4            | -6.8         | 244364   |
| 0            | 23.0         | 1368303  |
| 2            | 13.3         | 7763339  |
| 1            | 16.56        | 30085763 |
+--------------+--------------+----------+

real    0m2.514s
user    0m4.725s
sys     0m1.765s

What changes are included in this PR?

Reduce number of allocations.

Are these changes tested?

Run tests locally they all passed.

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Jul 4, 2023
let state =
ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
let all_values = to_scalar_values(&self.batches)?;
let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can change ScalarValue::List to use ArrayRef instead of Vec<ScalarValue> internally.
This would avoid quite some expensive conversions from/to ScalarValue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would help a lot, maybe we can add a ScalarValue::Array variant to ScalarValue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @Dandandan was suggesting

impl ScalarValue {
  ...
  List(ArrayRef)
  ...
}

In general this would work well with the approach @tustvold is working on upstream in arrow-rs with Datum -- apache/arrow-rs#4393

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I was trying to suggest - this would avoid the need to convert / allocate to individual ScalarValues and convert to Array later on again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work well, the reason I was suggesting adding an Array variant instead of changing List was to avoid changing all the code that depends on List(Option<Vec<ScalarValue>>, FieldRef), but yes long term that would probably be best.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this some more, I think the most performant thing to do will be to implement a native GroupsAccumulator (aka #6800 ) for median. With sufficient effort we could make median be very fast -- so I think this is a good improvement for now

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jackwener jackwener self-requested a review July 4, 2023 09:22
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vincev -- this looks great -- I will try and review this tomorrow

let state =
ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
let all_values = to_scalar_values(&self.batches)?;
let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @Dandandan was suggesting

impl ScalarValue {
  ...
  List(ArrayRef)
  ...
}

In general this would work well with the approach @tustvold is working on upstream in arrow-rs with Datum -- apache/arrow-rs#4393

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the code carefully -- thank you @vincev

I think the code could be merged in as is, but I also left some comments which I think would help.

datafusion/physical-expr/src/aggregate/median.rs Outdated Show resolved Hide resolved
let state =
ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
let all_values = to_scalar_values(&self.batches)?;
let state = ScalarValue::new_list(Some(all_values), self.data_type.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this some more, I think the most performant thing to do will be to implement a native GroupsAccumulator (aka #6800 ) for median. With sufficient effort we could make median be very fast -- so I think this is a good improvement for now

datafusion/physical-expr/src/aggregate/median.rs Outdated Show resolved Hide resolved
all_values: Vec<ScalarValue>,
}

fn to_scalar_values(arrays: &[ArrayRef]) -> Result<Vec<ScalarValue>> {
let num_values: usize = arrays.iter().map(|a| a.len()).sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for computing the capacity up front

datafusion/physical-expr/src/aggregate/median.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Jul 5, 2023

Thanks @vincev

@alamb alamb merged commit c9a6fb8 into apache:main Jul 5, 2023
@vincev
Copy link
Contributor Author

vincev commented Jul 5, 2023

Thank you @alamb, @Dandandan for your feedback and review.

@vincev vincev deleted the median branch July 5, 2023 19:13
2010YOUY01 pushed a commit to 2010YOUY01/arrow-datafusion that referenced this pull request Jul 5, 2023
* Improve median performance.

* Fix formatting.

* Review feedback

* Renamed arrays size.
alamb pushed a commit to alamb/datafusion that referenced this pull request Jul 6, 2023
* Improve median performance.

* Fix formatting.

* Review feedback

* Renamed arrays size.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants