From 193ed9f180c69e2de9e4a6c2eda4156fe44efbf4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 19 Sep 2021 23:59:26 +0200 Subject: [PATCH 1/5] Optimize hashing --- Cargo.toml | 9 +++-- benches/filter_kernels.rs | 2 +- src/array/ord.rs | 71 ++++++++++++++++++++++++++++++--------- src/compute/hash.rs | 56 ++++++++++-------------------- 4 files changed, 79 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 218ed07b33b..51f14940b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ packed_simd = { version = "0.3", optional = true, package = "packed_simd_2" } futures = { version = "0.3", optional = true } # for faster hashing -ahash = { version = "0.7", optional = true } +ahash = "0.7" parquet2 = { version = "0.5", optional = true, default_features = false, features = ["stream"] } @@ -78,7 +78,7 @@ features = ["full"] rustdoc-args = ["--cfg", "docsrs"] [features] -default = [] +default = ["compute"] full = [ "io_csv", "io_json", @@ -91,7 +91,6 @@ full = [ "io_avro", "regex", "merge_sort", - "ahash", "compute", # parses timezones used in timestamp conversions "chrono-tz", @@ -211,3 +210,7 @@ harness = false [[bench]] name = "write_csv" harness = false + +[[bench]] +name = "hash_kernel" +harness = false diff --git a/benches/filter_kernels.rs b/benches/filter_kernels.rs index 13acb942300..4ed1205f721 100644 --- a/benches/filter_kernels.rs +++ b/benches/filter_kernels.rs @@ -22,7 +22,7 @@ use arrow2::array::*; use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter}; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::record_batch::RecordBatch; -use arrow2::util::bench_util::*; +use arrow2::util:: use criterion::{criterion_group, criterion_main, Criterion}; diff --git a/src/array/ord.rs b/src/array/ord.rs index 17dd0bc4653..319af374ab1 100644 --- a/src/array/ord.rs +++ b/src/array/ord.rs @@ -48,51 +48,90 @@ where } fn compare_primitives(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::>().unwrap().clone(); - let right = right.as_any().downcast_ref::>().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let right = right + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); Box::new(move |i, j| total_cmp(&left.value(i), &right.value(j))) } fn compare_boolean(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::().unwrap().clone(); - let right = right.as_any().downcast_ref::().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + let right = right + .as_any() + .downcast_ref::() + .unwrap() + .clone(); Box::new(move |i, j| left.value(i).cmp(&right.value(j))) } fn compare_f32(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::>().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); let right = right .as_any() .downcast_ref::>() - .unwrap().clone(); + .unwrap() + .clone(); Box::new(move |i, j| total_cmp_f32(&left.value(i), &right.value(j))) } fn compare_f64(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::>().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); let right = right .as_any() .downcast_ref::>() - .unwrap().clone(); + .unwrap() + .clone(); Box::new(move |i, j| total_cmp_f64(&left.value(i), &right.value(j))) } fn compare_string(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::>().unwrap().clone(); - let right = right.as_any().downcast_ref::>().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let right = right + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); Box::new(move |i, j| left.value(i).cmp(right.value(j))) } fn compare_binary(left: &dyn Array, right: &dyn Array) -> DynComparator { - let left = left.as_any().downcast_ref::>().unwrap().clone(); - let right = right.as_any().downcast_ref::>().unwrap().clone(); + let left = left + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); + let right = right + .as_any() + .downcast_ref::>() + .unwrap() + .clone(); Box::new(move |i, j| left.value(i).cmp(right.value(j))) } -fn compare_dict( - left: &DictionaryArray, - right: &DictionaryArray, -) -> Result +fn compare_dict(left: &DictionaryArray, right: &DictionaryArray) -> Result where K: DictionaryKey, { diff --git a/src/compute/hash.rs b/src/compute/hash.rs index 50133c7870f..af538be7de4 100644 --- a/src/compute/hash.rs +++ b/src/compute/hash.rs @@ -1,20 +1,9 @@ -use std::hash::{Hash, Hasher}; +use ahash::{CallHasher, RandomState}; +use std::hash::Hash; -#[cfg(feature = "ahash")] -use ahash::AHasher as DefaultHasher; -#[cfg(not(feature = "ahash"))] -use std::collections::hash_map::DefaultHasher; - -#[cfg(feature = "ahash")] -macro_rules! new_hasher { - () => { - DefaultHasher::new_with_keys(0, 0) - }; -} -#[cfg(not(feature = "ahash"))] -macro_rules! new_hasher { +macro_rules! new_state { () => { - DefaultHasher::new() + RandomState::with_seeds(0, 0, 0, 0) }; } @@ -30,46 +19,35 @@ use super::arity::unary; /// Element-wise hash of a [`PrimitiveArray`]. Validity is preserved. pub fn hash_primitive(array: &PrimitiveArray) -> PrimitiveArray { - unary( - array, - |x| { - let mut hasher = new_hasher!(); - x.hash(&mut hasher); - hasher.finish() - }, - DataType::UInt64, - ) + let state = new_state!(); + + unary(array, |x| T::get_hash(&x, &state), DataType::UInt64) } /// Element-wise hash of a [`BooleanArray`]. Validity is preserved. pub fn hash_boolean(array: &BooleanArray) -> PrimitiveArray { - let iter = array.values_iter().map(|x| { - let mut hasher = new_hasher!(); - x.hash(&mut hasher); - hasher.finish() - }); + let state = new_state!(); + + let iter = array.values_iter().map(|x| u8::get_hash(&x, &state)); let values = Buffer::from_trusted_len_iter(iter); PrimitiveArray::::from_data(DataType::UInt64, values, array.validity().clone()) } /// Element-wise hash of a [`Utf8Array`]. Validity is preserved. pub fn hash_utf8(array: &Utf8Array) -> PrimitiveArray { - let iter = array.values_iter().map(|x| { - let mut hasher = new_hasher!(); - x.hash(&mut hasher); - hasher.finish() - }); + let state = new_state!(); + + let iter = array + .values_iter() + .map(|x| <[u8]>::get_hash(&x.as_bytes(), &state)); let values = Buffer::from_trusted_len_iter(iter); PrimitiveArray::::from_data(DataType::UInt64, values, array.validity().clone()) } /// Element-wise hash of a [`BinaryArray`]. Validity is preserved. pub fn hash_binary(array: &BinaryArray) -> PrimitiveArray { - let iter = array.values_iter().map(|x| { - let mut hasher = new_hasher!(); - x.hash(&mut hasher); - hasher.finish() - }); + let state = new_state!(); + let iter = array.values_iter().map(|x| <[u8]>::get_hash(&x, &state)); let values = Buffer::from_trusted_len_iter(iter); PrimitiveArray::::from_data(DataType::UInt64, values, array.validity().clone()) } From 3bf1c0b9149182c2d44697d65e7e54976d43391f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 20 Sep 2021 00:08:51 +0200 Subject: [PATCH 2/5] Optimize hashing --- Cargo.toml | 2 +- benches/hash_kernel.rs | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 benches/hash_kernel.rs diff --git a/Cargo.toml b/Cargo.toml index 51f14940b97..b66920d1070 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ features = ["full"] rustdoc-args = ["--cfg", "docsrs"] [features] -default = ["compute"] +default = [] full = [ "io_csv", "io_json", diff --git a/benches/hash_kernel.rs b/benches/hash_kernel.rs new file mode 100644 index 00000000000..4a49100deeb --- /dev/null +++ b/benches/hash_kernel.rs @@ -0,0 +1,34 @@ +extern crate arrow2; + +use arrow2::compute::hash::hash; +use arrow2::datatypes::DataType; +use arrow2::util::bench_util::*; + +use criterion::{criterion_group, criterion_main, Criterion}; + +fn add_benchmark(c: &mut Criterion) { + let log2_size = 10; + let size = 2usize.pow(log2_size); + + let arr_a = create_primitive_array::(size, DataType::Int32, 0.0); + + c.bench_function(&format!("i32 2^{}", log2_size), |b| b.iter(|| hash(&arr_a))); + + let arr_a = create_primitive_array::(size, DataType::Int64, 0.0); + + c.bench_function(&format!("i64 2^{}", log2_size), |b| b.iter(|| hash(&arr_a))); + + let arr_a = create_string_array::(size, 5, 0.0, 0); + + c.bench_function(&format!("str 2^{}", log2_size), |b| b.iter(|| hash(&arr_a))); + + let arr_a = create_boolean_array(size, 0.5, 0.0); + + c.bench_function(&format!("bool 2^{}", log2_size), |b| { + b.iter(|| hash(&arr_a)) + }); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); + \ No newline at end of file From 905da20901b2cc71fa505e9d3768940b87855700 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 20 Sep 2021 00:12:55 +0200 Subject: [PATCH 3/5] Move ahash to compute --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b66920d1070..3685148639e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ packed_simd = { version = "0.3", optional = true, package = "packed_simd_2" } futures = { version = "0.3", optional = true } # for faster hashing -ahash = "0.7" +ahash = { version="0.7", optional = true } parquet2 = { version = "0.5", optional = true, default_features = false, features = ["stream"] } @@ -115,7 +115,7 @@ io_avro = ["avro-rs", "streaming-iterator", "serde_json"] io_json_integration = ["io_json", "serde_derive", "hex"] io_print = ["comfy-table"] # the compute kernels. Disabling this significantly reduces compile time. -compute = ["strength_reduce", "multiversion", "lexical-core"] +compute = ["strength_reduce", "multiversion", "lexical-core", "ahash"] # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. io_parquet = ["parquet2", "io_ipc", "base64", "futures"] benchmarks = ["rand"] From 2616d1aedc6fad9f3e28634936dde8784dc246c5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 20 Sep 2021 00:13:09 +0200 Subject: [PATCH 4/5] Formatting --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3685148639e..0e72860149b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ packed_simd = { version = "0.3", optional = true, package = "packed_simd_2" } futures = { version = "0.3", optional = true } # for faster hashing -ahash = { version="0.7", optional = true } +ahash = { version = "0.7", optional = true } parquet2 = { version = "0.5", optional = true, default_features = false, features = ["stream"] } From b8db919089915ca37e2f1327bfa3bcde1aa7d885 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 20 Sep 2021 00:59:20 +0200 Subject: [PATCH 5/5] Multi-versioning (30% speed up) --- src/compute/hash.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/compute/hash.rs b/src/compute/hash.rs index af538be7de4..c7f763d79e2 100644 --- a/src/compute/hash.rs +++ b/src/compute/hash.rs @@ -1,4 +1,5 @@ use ahash::{CallHasher, RandomState}; +use multiversion::multiversion; use std::hash::Hash; macro_rules! new_state { @@ -18,6 +19,8 @@ use crate::{ use super::arity::unary; /// Element-wise hash of a [`PrimitiveArray`]. Validity is preserved. +#[multiversion] +#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")] pub fn hash_primitive(array: &PrimitiveArray) -> PrimitiveArray { let state = new_state!(); @@ -25,6 +28,8 @@ pub fn hash_primitive(array: &PrimitiveArray) -> Primit } /// Element-wise hash of a [`BooleanArray`]. Validity is preserved. +#[multiversion] +#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")] pub fn hash_boolean(array: &BooleanArray) -> PrimitiveArray { let state = new_state!(); @@ -33,6 +38,8 @@ pub fn hash_boolean(array: &BooleanArray) -> PrimitiveArray { PrimitiveArray::::from_data(DataType::UInt64, values, array.validity().clone()) } +#[multiversion] +#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")] /// Element-wise hash of a [`Utf8Array`]. Validity is preserved. pub fn hash_utf8(array: &Utf8Array) -> PrimitiveArray { let state = new_state!();