Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Optimize hashing using ahash and multiversion (-30%) #428

Merged
merged 5 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ full = [
"io_avro",
"regex",
"merge_sort",
"ahash",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
Expand All @@ -116,7 +115,7 @@ io_avro = ["avro-rs", "streaming-iterator", "serde_json"]
io_json_integration = ["io_json", "serde_derive", "hex"]
io_print = ["comfy-table"]
# the compute kernels. Disabling this significantly reduces compile time.
compute = ["strength_reduce", "multiversion", "lexical-core"]
compute = ["strength_reduce", "multiversion", "lexical-core", "ahash"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures"]
benchmarks = ["rand"]
Expand Down Expand Up @@ -211,3 +210,7 @@ harness = false
[[bench]]
name = "write_csv"
harness = false

[[bench]]
name = "hash_kernel"
harness = false
2 changes: 1 addition & 1 deletion benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow2::array::*;
use arrow2::compute::filter::{build_filter, filter, filter_record_batch, Filter};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;
use arrow2::util::

use criterion::{criterion_group, criterion_main, Criterion};

Expand Down
34 changes: 34 additions & 0 deletions benches/hash_kernel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
extern crate arrow2;

use arrow2::compute::hash::hash;
use arrow2::datatypes::DataType;
use arrow2::util::bench_util::*;

use criterion::{criterion_group, criterion_main, Criterion};

fn add_benchmark(c: &mut Criterion) {
let log2_size = 10;
let size = 2usize.pow(log2_size);

let arr_a = create_primitive_array::<i32>(size, DataType::Int32, 0.0);

c.bench_function(&format!("i32 2^{}", log2_size), |b| b.iter(|| hash(&arr_a)));

let arr_a = create_primitive_array::<i64>(size, DataType::Int64, 0.0);

c.bench_function(&format!("i64 2^{}", log2_size), |b| b.iter(|| hash(&arr_a)));

let arr_a = create_string_array::<i32>(size, 5, 0.0, 0);

c.bench_function(&format!("str 2^{}", log2_size), |b| b.iter(|| hash(&arr_a)));

let arr_a = create_boolean_array(size, 0.5, 0.0);

c.bench_function(&format!("bool 2^{}", log2_size), |b| {
b.iter(|| hash(&arr_a))
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);

71 changes: 55 additions & 16 deletions src/array/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,51 +48,90 @@ where
}

fn compare_primitives<T: NativeType + Ord>(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap().clone();
let right = right.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap()
.clone();
Box::new(move |i, j| total_cmp(&left.value(i), &right.value(j)))
}

fn compare_boolean(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
let right = right.as_any().downcast_ref::<BooleanArray>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.clone();
Box::new(move |i, j| left.value(i).cmp(&right.value(j)))
}

fn compare_f32(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<PrimitiveArray<f32>>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<PrimitiveArray<f32>>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<PrimitiveArray<f32>>()
.unwrap().clone();
.unwrap()
.clone();
Box::new(move |i, j| total_cmp_f32(&left.value(i), &right.value(j)))
}

fn compare_f64(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap().clone();
.unwrap()
.clone();
Box::new(move |i, j| total_cmp_f64(&left.value(i), &right.value(j)))
}

fn compare_string<O: Offset>(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<Utf8Array<O>>().unwrap().clone();
let right = right.as_any().downcast_ref::<Utf8Array<O>>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<Utf8Array<O>>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<Utf8Array<O>>()
.unwrap()
.clone();
Box::new(move |i, j| left.value(i).cmp(right.value(j)))
}

fn compare_binary<O: Offset>(left: &dyn Array, right: &dyn Array) -> DynComparator {
let left = left.as_any().downcast_ref::<BinaryArray<O>>().unwrap().clone();
let right = right.as_any().downcast_ref::<BinaryArray<O>>().unwrap().clone();
let left = left
.as_any()
.downcast_ref::<BinaryArray<O>>()
.unwrap()
.clone();
let right = right
.as_any()
.downcast_ref::<BinaryArray<O>>()
.unwrap()
.clone();
Box::new(move |i, j| left.value(i).cmp(right.value(j)))
}

fn compare_dict<K>(
left: &DictionaryArray<K>,
right: &DictionaryArray<K>,
) -> Result<DynComparator>
fn compare_dict<K>(left: &DictionaryArray<K>, right: &DictionaryArray<K>) -> Result<DynComparator>
where
K: DictionaryKey,
{
Expand Down
63 changes: 24 additions & 39 deletions src/compute/hash.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use std::hash::{Hash, Hasher};
use ahash::{CallHasher, RandomState};
use multiversion::multiversion;
use std::hash::Hash;

#[cfg(feature = "ahash")]
use ahash::AHasher as DefaultHasher;
#[cfg(not(feature = "ahash"))]
use std::collections::hash_map::DefaultHasher;

#[cfg(feature = "ahash")]
macro_rules! new_hasher {
() => {
DefaultHasher::new_with_keys(0, 0)
};
}
#[cfg(not(feature = "ahash"))]
macro_rules! new_hasher {
macro_rules! new_state {
() => {
DefaultHasher::new()
RandomState::with_seeds(0, 0, 0, 0)
};
}

Expand All @@ -29,47 +19,42 @@ use crate::{
use super::arity::unary;

/// Element-wise hash of a [`PrimitiveArray`]. Validity is preserved.
#[multiversion]
#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")]
pub fn hash_primitive<T: NativeType + Hash>(array: &PrimitiveArray<T>) -> PrimitiveArray<u64> {
unary(
array,
|x| {
let mut hasher = new_hasher!();
x.hash(&mut hasher);
hasher.finish()
},
DataType::UInt64,
)
let state = new_state!();
Copy link
Collaborator

@sundy-li sundy-li Sep 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state is initialized once for one array, will it cause a different hash result in another array?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it uses the same seeds each time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but there is a hash builder inside get_hash, this may introduce extra allocate in each time, I did not find a way to improve that.

#[inline]
    fn get_hash<H: Hash + ?Sized, B: BuildHasher>(value: &H, build_hasher: &B) -> u64 {
        let mut hasher = build_hasher.build_hasher();
        value.hash(&mut hasher);
        hasher.finish()
    }


unary(array, |x| T::get_hash(&x, &state), DataType::UInt64)
}

/// Element-wise hash of a [`BooleanArray`]. Validity is preserved.
#[multiversion]
#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")]
pub fn hash_boolean(array: &BooleanArray) -> PrimitiveArray<u64> {
let iter = array.values_iter().map(|x| {
let mut hasher = new_hasher!();
x.hash(&mut hasher);
hasher.finish()
});
let state = new_state!();

let iter = array.values_iter().map(|x| u8::get_hash(&x, &state));
let values = Buffer::from_trusted_len_iter(iter);
PrimitiveArray::<u64>::from_data(DataType::UInt64, values, array.validity().clone())
}

#[multiversion]
#[clone(target = "x86_64+aes+sse3+ssse3+avx+avx2")]
/// Element-wise hash of a [`Utf8Array`]. Validity is preserved.
pub fn hash_utf8<O: Offset>(array: &Utf8Array<O>) -> PrimitiveArray<u64> {
let iter = array.values_iter().map(|x| {
let mut hasher = new_hasher!();
x.hash(&mut hasher);
hasher.finish()
});
let state = new_state!();

let iter = array
.values_iter()
.map(|x| <[u8]>::get_hash(&x.as_bytes(), &state));
let values = Buffer::from_trusted_len_iter(iter);
PrimitiveArray::<u64>::from_data(DataType::UInt64, values, array.validity().clone())
}

/// Element-wise hash of a [`BinaryArray`]. Validity is preserved.
pub fn hash_binary<O: Offset>(array: &BinaryArray<O>) -> PrimitiveArray<u64> {
let iter = array.values_iter().map(|x| {
let mut hasher = new_hasher!();
x.hash(&mut hasher);
hasher.finish()
});
let state = new_state!();
let iter = array.values_iter().map(|x| <[u8]>::get_hash(&x, &state));
let values = Buffer::from_trusted_len_iter(iter);
PrimitiveArray::<u64>::from_data(DataType::UInt64, values, array.validity().clone())
}
Expand Down