From 96cc3d92d570e4002bf3d494a54b4d436676e5ca Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 10 May 2022 14:10:50 +0200 Subject: [PATCH] improve agg_list performance of chunked numerical data (#3351) --- Makefile | 1 - polars/polars-arrow/src/bitmap/mutable.rs | 6 - .../src/frame/groupby/aggregations.rs | 190 +++++++++--------- polars/polars-core/src/frame/groupby/mod.rs | 41 ++-- 4 files changed, 121 insertions(+), 117 deletions(-) diff --git a/Makefile b/Makefile index 9fa9a47e1f4e..c538ee424715 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,6 @@ coverage: $(MAKE) -C py-polars venv; \ source py-polars/venv/bin/activate; \ $(MAKE) -C polars test; \ - $(MAKE) -C polars integration-tests; \ $(MAKE) -C py-polars test-with-cov; \ cargo llvm-cov --no-run --lcov --output-path coverage.lcov; \ " diff --git a/polars/polars-arrow/src/bitmap/mutable.rs b/polars/polars-arrow/src/bitmap/mutable.rs index fea60c8577b4..c6fb7d53d23b 100644 --- a/polars/polars-arrow/src/bitmap/mutable.rs +++ b/polars/polars-arrow/src/bitmap/mutable.rs @@ -1,12 +1,6 @@ use arrow::bitmap::MutableBitmap; pub trait MutableBitmapExtension { - /// Initializes a [`MutableBitmap`] with all values set to valid/ true. - fn from_len_set(length: usize) -> MutableBitmap { - let values = vec![u8::MAX; length.saturating_add(7) / 8]; - MutableBitmap::from_vec(values, length) - } - fn as_slice_mut(&mut self) -> &mut [u8]; /// # Safety diff --git a/polars/polars-core/src/frame/groupby/aggregations.rs b/polars/polars-core/src/frame/groupby/aggregations.rs index 0e832de7097d..7d491ee68a60 100644 --- a/polars/polars-core/src/frame/groupby/aggregations.rs +++ b/polars/polars-core/src/frame/groupby/aggregations.rs @@ -1,4 +1,5 @@ use crate::POOL; +use arrow::bitmap::MutableBitmap; use num::{Bounded, Num, NumCast, ToPrimitive, Zero}; use rayon::prelude::*; @@ -757,63 +758,67 @@ where ChunkedArray: IntoSeries, { fn agg_list(&self, groups: &GroupsProxy) -> Series { + let ca = self.rechunk(); + match groups { GroupsProxy::Idx(groups) => { let mut can_fast_explode = true; - let arr = match self.cont_slice() { - Ok(values) => { - let mut offsets = Vec::::with_capacity(groups.len() + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); - let mut list_values = Vec::::with_capacity(self.len()); - groups.iter().for_each(|(_, idx)| { - let idx_len = idx.len(); - if idx_len == 0 { - can_fast_explode = false; - } + let arr = ca.downcast_iter().next().unwrap(); + let values = arr.values(); - length_so_far += idx_len as i64; - // Safety: - // group tuples are in bounds - unsafe { - list_values.extend(idx.iter().map(|idx| { - debug_assert!((*idx as usize) < values.len()); - *values.get_unchecked(*idx as usize) - })); - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(length_so_far); - } - }); - let array = PrimitiveArray::from_data( - T::get_dtype().to_arrow(), - list_values.into(), - None, - ); - let data_type = - ListArray::::default_datatype(T::get_dtype().to_arrow()); - ListArray::::from_data( - data_type, - offsets.into(), - Arc::new(array), - None, - ) + let mut offsets = Vec::::with_capacity(groups.len() + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); + + let mut list_values = Vec::::with_capacity(self.len()); + groups.iter().for_each(|(_, idx)| { + let idx_len = idx.len(); + if idx_len == 0 { + can_fast_explode = false; } - _ => { - let mut builder = ListPrimitiveChunkedBuilder::::new( - self.name(), - groups.len(), - self.len(), - self.dtype().clone(), - ); - for idx in groups.all().iter() { - let s = unsafe { self.take_unchecked(idx.into()).into_series() }; - builder.append_series(&s); - } - return builder.finish().into_series(); + + length_so_far += idx_len as i64; + // Safety: + // group tuples are in bounds + unsafe { + list_values.extend(idx.iter().map(|idx| { + debug_assert!((*idx as usize) < values.len()); + *values.get_unchecked(*idx as usize) + })); + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(length_so_far); } + }); + + let validity = if arr.null_count() > 0 { + let old_validity = arr.validity().unwrap(); + let mut validity = MutableBitmap::from_len_set(list_values.len()); + + let mut count = 0; + groups.iter().for_each(|(_, idx)| unsafe { + for i in idx { + if !old_validity.get_bit_unchecked(*i as usize) { + validity.set_bit_unchecked(count, false) + } + count += 1; + } + }); + Some(validity.into()) + } else { + None }; + + let array = PrimitiveArray::from_data( + T::get_dtype().to_arrow(), + list_values.into(), + validity, + ); + let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); + let arr = + ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); + let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); if can_fast_explode { ca.set_fast_explode() @@ -822,55 +827,54 @@ where } GroupsProxy::Slice(groups) => { let mut can_fast_explode = true; - let arr = match self.cont_slice() { - Ok(values) => { - let mut offsets = Vec::::with_capacity(groups.len() + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); + let arr = ca.downcast_iter().next().unwrap(); + let values = arr.values(); - let mut list_values = Vec::::with_capacity(self.len()); - groups.iter().for_each(|&[first, len]| { - if len == 0 { - can_fast_explode = false; - } + let mut offsets = Vec::::with_capacity(groups.len() + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); - length_so_far += len as i64; - list_values - .extend_from_slice(&values[first as usize..(first + len) as usize]); - unsafe { - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(length_so_far); - } - }); - let array = PrimitiveArray::from_data( - T::get_dtype().to_arrow(), - list_values.into(), - None, - ); - let data_type = - ListArray::::default_datatype(T::get_dtype().to_arrow()); - ListArray::::from_data( - data_type, - offsets.into(), - Arc::new(array), - None, - ) + let mut list_values = Vec::::with_capacity(self.len()); + groups.iter().for_each(|&[first, len]| { + if len == 0 { + can_fast_explode = false; } - _ => { - let mut builder = ListPrimitiveChunkedBuilder::::new( - self.name(), - groups.len(), - self.len(), - self.dtype().clone(), - ); - for &[first, len] in groups { - let s = self.slice(first as i64, len as usize).into_series(); - builder.append_series(&s); - } - return builder.finish().into_series(); + + length_so_far += len as i64; + list_values.extend_from_slice(&values[first as usize..(first + len) as usize]); + unsafe { + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(length_so_far); } + }); + + let validity = if arr.null_count() > 0 { + let old_validity = arr.validity().unwrap(); + let mut validity = MutableBitmap::from_len_set(list_values.len()); + + let mut count = 0; + groups.iter().for_each(|[first, len]| unsafe { + for i in *first..(*first + *len) { + if !old_validity.get_bit_unchecked(i as usize) { + validity.set_bit_unchecked(count, false) + } + count += 1; + } + }); + Some(validity.into()) + } else { + None }; + + let array = PrimitiveArray::from_data( + T::get_dtype().to_arrow(), + list_values.into(), + validity, + ); + let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); + let arr = + ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); if can_fast_explode { ca.set_fast_explode() diff --git a/polars/polars-core/src/frame/groupby/mod.rs b/polars/polars-core/src/frame/groupby/mod.rs index 4526319adf86..c8b617491728 100644 --- a/polars/polars-core/src/frame/groupby/mod.rs +++ b/polars/polars-core/src/frame/groupby/mod.rs @@ -306,27 +306,34 @@ impl<'df> GroupBy<'df> { } pub fn keys_sliced(&self, slice: Option<(i64, usize)>) -> Vec { + #[allow(unused_assignments)] + // needed to keep the lifetimes valid for this scope + let mut groups_owned = None; + + let groups = if let Some((offset, len)) = slice { + groups_owned = Some(self.groups.slice(offset, len)); + groups_owned.as_deref().unwrap() + } else { + &self.groups + }; + POOL.install(|| { self.selected_keys .par_iter() .map(|s| { - #[allow(unused_assignments)] - // needed to keep the lifetimes valid for this scope - let mut groups_owned = None; - - let groups = if let Some((offset, len)) = slice { - groups_owned = Some(self.groups.slice(offset, len)); - groups_owned.as_deref().unwrap() - } else { - &self.groups - }; - - // Safety - // groupby indexes are in bound. - unsafe { - s.take_iter_unchecked( - &mut groups.idx_ref().iter().map(|(idx, _)| idx as usize), - ) + match groups { + GroupsProxy::Idx(groups) => { + let mut iter = groups.iter().map(|(first, _idx)| first as usize); + // Safety: + // groups are always in bounds + unsafe { s.take_iter_unchecked(&mut iter) } + } + GroupsProxy::Slice(groups) => { + let mut iter = groups.iter().map(|&[first, _len]| first as usize); + // Safety: + // groups are always in bounds + unsafe { s.take_iter_unchecked(&mut iter) } + } } }) .collect()