Skip to content

Commit

Permalink
support hash decimal array and group by (#1640)
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 authored Jan 22, 2022
1 parent 0762bf0 commit af8786e
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions datafusion/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
use crate::error::{DataFusionError, Result};
use ahash::{CallHasher, RandomState};
use arrow::array::{
Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DictionaryArray,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DecimalArray,
DictionaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeStringArray, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
use arrow::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type,
Expand All @@ -38,6 +39,40 @@ fn combine_hashes(l: u64, r: u64) -> u64 {
hash.wrapping_mul(37).wrapping_add(r)
}

fn hash_decimal128<'a>(
array: &ArrayRef,
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
mul_col: bool,
) {
let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
if array.null_count() == 0 {
if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = i128::get_hash(&array.value(i), random_state);
}
}
} else if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = i128::get_hash(&array.value(i), random_state);
}
}
}
}

macro_rules! hash_array {
($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
Expand Down Expand Up @@ -249,6 +284,9 @@ pub fn create_hashes<'a>(

for col in arrays {
match col.data_type() {
DataType::Decimal(_, _) => {
hash_decimal128(col, random_state, hashes_buffer, multi_col);
}
DataType::UInt8 => {
hash_array_primitive!(
UInt8Array,
Expand Down Expand Up @@ -516,11 +554,27 @@ pub fn create_hashes<'a>(
#[cfg(test)]
mod tests {
use crate::from_slice::FromSlice;
use arrow::array::DecimalBuilder;
use arrow::{array::DictionaryArray, datatypes::Int8Type};
use std::sync::Arc;

use super::*;

#[test]
fn create_hashes_for_decimal_array() -> Result<()> {
let mut builder = DecimalBuilder::new(4, 20, 3);
let array: Vec<i128> = vec![1, 2, 3, 4];
for value in &array {
builder.append_value(*value)?;
}
let array_ref = Arc::new(builder.finish());
let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array.len()];
let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?;
assert_eq!(hashes.len(), 4);
Ok(())
}

#[test]
fn create_hashes_for_float_arrays() -> Result<()> {
let f32_arr = Arc::new(Float32Array::from_slice(&[0.12, 0.5, 1f32, 444.7]));
Expand Down

0 comments on commit af8786e

Please sign in to comment.