From af8786ebea1d62f2f4f28f3eb390cc73fd3a9e16 Mon Sep 17 00:00:00 2001 From: Kun Liu Date: Sat, 22 Jan 2022 20:37:30 +0800 Subject: [PATCH] support hash decimal array and group by (#1640) --- datafusion/src/physical_plan/hash_utils.rs | 62 ++++++++++++++++++++-- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index ce995c4a2e58..5f7a610db075 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -20,10 +20,11 @@ use crate::error::{DataFusionError, Result}; use ahash::{CallHasher, RandomState}; use arrow::array::{ - Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DictionaryArray, - Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DecimalArray, + DictionaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, + Int8Array, LargeStringArray, StringArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; use arrow::datatypes::{ ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, @@ -38,6 +39,40 @@ fn combine_hashes(l: u64, r: u64) -> u64 { hash.wrapping_mul(37).wrapping_add(r) } +fn hash_decimal128<'a>( + array: &ArrayRef, + random_state: &RandomState, + hashes_buffer: &'a mut Vec, + mul_col: bool, +) { + let array = array.as_any().downcast_ref::().unwrap(); + if array.null_count() == 0 { + if mul_col { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + *hash = + combine_hashes(i128::get_hash(&array.value(i), random_state), *hash); + } + } else { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + *hash = i128::get_hash(&array.value(i), random_state); + } + } + } else if mul_col { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = + combine_hashes(i128::get_hash(&array.value(i), random_state), *hash); + } + } + } else { + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + if !array.is_null(i) { + *hash = i128::get_hash(&array.value(i), random_state); + } + } + } +} + macro_rules! hash_array { ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => { let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); @@ -249,6 +284,9 @@ pub fn create_hashes<'a>( for col in arrays { match col.data_type() { + DataType::Decimal(_, _) => { + hash_decimal128(col, random_state, hashes_buffer, multi_col); + } DataType::UInt8 => { hash_array_primitive!( UInt8Array, @@ -516,11 +554,27 @@ pub fn create_hashes<'a>( #[cfg(test)] mod tests { use crate::from_slice::FromSlice; + use arrow::array::DecimalBuilder; use arrow::{array::DictionaryArray, datatypes::Int8Type}; use std::sync::Arc; use super::*; + #[test] + fn create_hashes_for_decimal_array() -> Result<()> { + let mut builder = DecimalBuilder::new(4, 20, 3); + let array: Vec = vec![1, 2, 3, 4]; + for value in &array { + builder.append_value(*value)?; + } + let array_ref = Arc::new(builder.finish()); + let random_state = RandomState::with_seeds(0, 0, 0, 0); + let hashes_buff = &mut vec![0; array.len()]; + let hashes = create_hashes(&[array_ref], &random_state, hashes_buff)?; + assert_eq!(hashes.len(), 4); + Ok(()) + } + #[test] fn create_hashes_for_float_arrays() -> Result<()> { let f32_arr = Arc::new(Float32Array::from_slice(&[0.12, 0.5, 1f32, 444.7]));