From e599767ffeb9d1c9da2542ec77a802b39ed2f097 Mon Sep 17 00:00:00 2001 From: platoneko Date: Mon, 28 Feb 2022 11:41:20 +0800 Subject: [PATCH 1/4] simd selection --- .../datavalues/src/columns/primitive/mod.rs | 35 ++++++++++----- common/datavalues/src/lib.rs | 1 + .../datavalues/tests/it/columns/primitive.rs | 45 +++++++++++++++++++ 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/common/datavalues/src/columns/primitive/mod.rs b/common/datavalues/src/columns/primitive/mod.rs index c987aa874b7ae..4d63c4ec05797 100644 --- a/common/datavalues/src/columns/primitive/mod.rs +++ b/common/datavalues/src/columns/primitive/mod.rs @@ -189,17 +189,32 @@ impl Column for PrimitiveColumn { if length == self.len() { return Arc::new(self.clone()); } - let iter = self - .values() - .iter() - .zip(filter.values().iter()) - .filter(|(_, f)| *f) - .map(|(v, _)| *v); - let values: Vec = iter.collect(); - let col = PrimitiveColumn { - values: values.into(), - }; + let mut res = Vec::::with_capacity(length); + let mut offset = 0; + let values = self.values(); + + const MASK_BITS: usize = 64; + for mut mask in filter.values().chunks::() { + if mask == u64::MAX { + res.extend(&values[offset..offset + MASK_BITS]); + } else { + while mask != 0 { + let n = std::intrinsics::cttz(mask) as usize; + res.push(values[offset + n]); + mask = mask & (mask - 1); + } + } + offset += MASK_BITS; + } + + for i in offset..self.len() { + if filter.get_data(i) { + res.push(values[i]); + } + } + + let col = PrimitiveColumn { values: res.into() }; Arc::new(col) } diff --git a/common/datavalues/src/lib.rs b/common/datavalues/src/lib.rs index 7727cdb204cf8..b8b34fe8dbb7a 100644 --- a/common/datavalues/src/lib.rs +++ b/common/datavalues/src/lib.rs @@ -18,6 +18,7 @@ #![feature(generic_associated_types)] #![feature(trusted_len)] +#![feature(core_intrinsics)] #[macro_use] mod macros; diff --git a/common/datavalues/tests/it/columns/primitive.rs b/common/datavalues/tests/it/columns/primitive.rs index bd076e208965f..dc76fd7524a55 100644 --- a/common/datavalues/tests/it/columns/primitive.rs +++ b/common/datavalues/tests/it/columns/primitive.rs @@ -54,3 +54,48 @@ fn test_const_column() { let c = ConstColumn::new(Series::from_data(vec![PI]), 24).arc(); println!("{:?}", c); } + +#[test] +fn test_filter_column() { + const N: usize = 1000; + let it = (0..N).map(|i| i as i32); + let data_column: PrimitiveColumn = Int32Column::from_iterator(it); + + struct Test { + filter: BooleanColumn, + expect: Vec, + } + + let tests: Vec = vec![ + Test { + filter: BooleanColumn::from_iterator((0..N).map(|_| true)), + expect: (0..N).map(|i| i as i32).collect(), + }, + Test { + filter: BooleanColumn::from_iterator((0..N).map(|_| false)), + expect: vec![], + }, + Test { + filter: BooleanColumn::from_iterator((0..N).map(|i| i % 10 == 0)), + expect: (0..N).map(|i| i as i32).filter(|i| i % 10 == 0).collect(), + }, + Test { + filter: BooleanColumn::from_iterator((0..N).map(|i| i < 100 || i > 800)), + expect: (0..N) + .map(|i| i as i32) + .filter(|&i| i < 100 || i > 800) + .collect(), + }, + ]; + + for test in tests { + let res = data_column.filter(&test.filter); + assert_eq!( + res.as_any() + .downcast_ref::>() + .unwrap() + .values(), + test.expect + ); + } +} From 9071bc3b0553a788734a2164e14376e763025f36 Mon Sep 17 00:00:00 2001 From: platoneko Date: Mon, 28 Feb 2022 12:57:00 +0800 Subject: [PATCH 2/4] make clippy happy --- common/datavalues/src/columns/primitive/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/common/datavalues/src/columns/primitive/mod.rs b/common/datavalues/src/columns/primitive/mod.rs index 4d63c4ec05797..6bbff101fcff3 100644 --- a/common/datavalues/src/columns/primitive/mod.rs +++ b/common/datavalues/src/columns/primitive/mod.rs @@ -208,10 +208,13 @@ impl Column for PrimitiveColumn { offset += MASK_BITS; } - for i in offset..self.len() { - if filter.get_data(i) { - res.push(values[i]); - } + for (&v, _) in values + .iter() + .zip(filter.values().iter()) + .skip(offset) + .filter(|(_, f)| *f) + { + res.push(v); } let col = PrimitiveColumn { values: res.into() }; From 9b4ccf137ceabb814dbd1111f5079b34ec89c382 Mon Sep 17 00:00:00 2001 From: platoneko Date: Mon, 28 Feb 2022 23:38:40 +0800 Subject: [PATCH 3/4] directly copy via ptr --- .../datavalues/src/columns/primitive/mod.rs | 64 ++++++++++++------- common/datavalues/src/lib.rs | 1 - .../datavalues/tests/it/columns/primitive.rs | 4 +- 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/common/datavalues/src/columns/primitive/mod.rs b/common/datavalues/src/columns/primitive/mod.rs index 6bbff101fcff3..bcde80d951d20 100644 --- a/common/datavalues/src/columns/primitive/mod.rs +++ b/common/datavalues/src/columns/primitive/mod.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use common_arrow::arrow::array::Array; use common_arrow::arrow::array::PrimitiveArray; +use common_arrow::arrow::bitmap::utils::BitChunkIterExact; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::buffer::Buffer; use common_arrow::arrow::compute::arity::unary; @@ -190,34 +191,49 @@ impl Column for PrimitiveColumn { return Arc::new(self.clone()); } - let mut res = Vec::::with_capacity(length); - let mut offset = 0; - let values = self.values(); - - const MASK_BITS: usize = 64; - for mut mask in filter.values().chunks::() { - if mask == u64::MAX { - res.extend(&values[offset..offset + MASK_BITS]); - } else { - while mask != 0 { - let n = std::intrinsics::cttz(mask) as usize; - res.push(values[offset + n]); - mask = mask & (mask - 1); + let mut new = Vec::::with_capacity(length); + let mut dst = new.as_mut_ptr(); + + const CHUNK_SIZE: usize = 64; + let mut chunks = self.values().chunks_exact(CHUNK_SIZE); + let mut mask_chunks = filter.values().chunks::(); + + chunks + .by_ref() + .zip(mask_chunks.by_ref()) + .for_each(|(chunk, mut mask)| { + if mask == u64::MAX { + unsafe { + std::ptr::copy(chunk.as_ptr(), dst, CHUNK_SIZE); + dst = dst.add(CHUNK_SIZE); + } + } else { + while mask != 0 { + let n = mask.trailing_zeros() as usize; + unsafe { + dst.write(chunk[n]); + dst = dst.add(1); + } + mask = mask & (mask - 1); + } } - } - offset += MASK_BITS; - } + }); - for (&v, _) in values + chunks + .remainder() .iter() - .zip(filter.values().iter()) - .skip(offset) - .filter(|(_, f)| *f) - { - res.push(v); - } + .zip(mask_chunks.remainder_iter()) + .for_each(|(value, is_selected)| { + if is_selected { + unsafe { + dst.write(*value); + dst = dst.add(1); + } + } + }); - let col = PrimitiveColumn { values: res.into() }; + unsafe { new.set_len(length) }; + let col = PrimitiveColumn { values: new.into() }; Arc::new(col) } diff --git a/common/datavalues/src/lib.rs b/common/datavalues/src/lib.rs index b8b34fe8dbb7a..7727cdb204cf8 100644 --- a/common/datavalues/src/lib.rs +++ b/common/datavalues/src/lib.rs @@ -18,7 +18,6 @@ #![feature(generic_associated_types)] #![feature(trusted_len)] -#![feature(core_intrinsics)] #[macro_use] mod macros; diff --git a/common/datavalues/tests/it/columns/primitive.rs b/common/datavalues/tests/it/columns/primitive.rs index dc76fd7524a55..49165cce9f3cb 100644 --- a/common/datavalues/tests/it/columns/primitive.rs +++ b/common/datavalues/tests/it/columns/primitive.rs @@ -80,10 +80,10 @@ fn test_filter_column() { expect: (0..N).map(|i| i as i32).filter(|i| i % 10 == 0).collect(), }, Test { - filter: BooleanColumn::from_iterator((0..N).map(|i| i < 100 || i > 800)), + filter: BooleanColumn::from_iterator((0..N).map(|i| !(100..=800).contains(&i))), expect: (0..N) .map(|i| i as i32) - .filter(|&i| i < 100 || i > 800) + .filter(|&i| !(100..=800).contains(&i)) .collect(), }, ]; From 43d5d5e01ed3f7fff11e5fc0f6e82c315832eeda Mon Sep 17 00:00:00 2001 From: platoneko Date: Mon, 7 Mar 2022 01:58:04 +0800 Subject: [PATCH 4/4] use extra loop to consume offset --- .../datavalues/src/columns/primitive/mod.rs | 37 ++++++++++++++++--- .../datavalues/tests/it/columns/primitive.rs | 19 +++++++++- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/common/datavalues/src/columns/primitive/mod.rs b/common/datavalues/src/columns/primitive/mod.rs index bcde80d951d20..136398c3c6544 100644 --- a/common/datavalues/src/columns/primitive/mod.rs +++ b/common/datavalues/src/columns/primitive/mod.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use common_arrow::arrow::array::Array; use common_arrow::arrow::array::PrimitiveArray; use common_arrow::arrow::bitmap::utils::BitChunkIterExact; +use common_arrow::arrow::bitmap::utils::BitChunksExact; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::buffer::Buffer; use common_arrow::arrow::compute::arity::unary; @@ -186,17 +187,41 @@ impl Column for PrimitiveColumn { } fn filter(&self, filter: &BooleanColumn) -> ColumnRef { - let length = filter.values().len() - filter.values().null_count(); - if length == self.len() { + assert_eq!(self.len(), filter.values().len()); + + let selected = filter.values().len() - filter.values().null_count(); + if selected == self.len() { return Arc::new(self.clone()); } - let mut new = Vec::::with_capacity(length); + let mut new = Vec::::with_capacity(selected); let mut dst = new.as_mut_ptr(); + let (mut slice, offset, mut length) = filter.values().as_slice(); + let mut values = self.values(); + if offset > 0 { + // Consume the offset + let n = 8 - offset; + values + .iter() + .zip(filter.values().iter()) + .take(n) + .for_each(|(value, is_selected)| { + if is_selected { + unsafe { + dst.write(*value); + dst = dst.add(1); + } + } + }); + slice = &slice[1..]; + length -= n; + values = &values[n..]; + } + const CHUNK_SIZE: usize = 64; - let mut chunks = self.values().chunks_exact(CHUNK_SIZE); - let mut mask_chunks = filter.values().chunks::(); + let mut chunks = values.chunks_exact(CHUNK_SIZE); + let mut mask_chunks = BitChunksExact::::new(slice, length); chunks .by_ref() @@ -232,7 +257,7 @@ impl Column for PrimitiveColumn { } }); - unsafe { new.set_len(length) }; + unsafe { new.set_len(selected) }; let col = PrimitiveColumn { values: new.into() }; Arc::new(col) diff --git a/common/datavalues/tests/it/columns/primitive.rs b/common/datavalues/tests/it/columns/primitive.rs index 49165cce9f3cb..98d4399131fb3 100644 --- a/common/datavalues/tests/it/columns/primitive.rs +++ b/common/datavalues/tests/it/columns/primitive.rs @@ -66,7 +66,7 @@ fn test_filter_column() { expect: Vec, } - let tests: Vec = vec![ + let mut tests: Vec = vec![ Test { filter: BooleanColumn::from_iterator((0..N).map(|_| true)), expect: (0..N).map(|i| i as i32).collect(), @@ -88,6 +88,23 @@ fn test_filter_column() { }, ]; + let offset = 10; + let filter = BooleanColumn::from_iterator( + (0..N + offset).map(|i| !(100 + offset..=800 + offset).contains(&i)), + ) + .slice(offset, N) + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + tests.push(Test { + filter, + expect: (0..N) + .map(|i| i as i32) + .filter(|&i| !(100..=800).contains(&i)) + .collect(), + }); + for test in tests { let res = data_column.filter(&test.filter); assert_eq!(