From 2eb38bd5e0aeac69dd7b032386729379a160883e Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Wed, 22 May 2024 11:06:35 +0800 Subject: [PATCH] Minor: Move group accumulator for aggregate function to physical-expr-common, and add ahash physical-expr-common (#10574) * ahash workspace Signed-off-by: jayzhan211 * move other utils Signed-off-by: jayzhan211 * move NullState Signed-off-by: jayzhan211 * move PrimitiveGroupsAccumulator Signed-off-by: jayzhan211 * move boolop Signed-off-by: jayzhan211 * move deciamlavg Signed-off-by: jayzhan211 * add comment Signed-off-by: jayzhan211 * fix doc Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- Cargo.toml | 3 + datafusion-cli/Cargo.lock | 1 + datafusion/common/Cargo.toml | 4 +- datafusion/core/Cargo.toml | 2 +- datafusion/expr/Cargo.toml | 4 +- datafusion/physical-expr-common/Cargo.toml | 1 + .../groups_accumulator/accumulate.rs | 8 +- .../aggregate/groups_accumulator/bool_op.rs | 5 +- .../src/aggregate/groups_accumulator/mod.rs | 22 ++ .../aggregate/groups_accumulator/prim_op.rs | 6 +- .../physical-expr-common/src/aggregate/mod.rs | 1 + .../src/aggregate/utils.rs | 162 ++++++++++++++- datafusion/physical-expr/Cargo.toml | 4 +- .../src/aggregate/groups_accumulator/mod.rs | 17 +- datafusion/physical-expr/src/aggregate/mod.rs | 7 +- .../physical-expr/src/aggregate/utils.rs | 191 ------------------ datafusion/physical-plan/Cargo.toml | 4 +- 17 files changed, 222 insertions(+), 220 deletions(-) rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/accumulate.rs (99%) rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/bool_op.rs (97%) create mode 100644 datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs rename datafusion/{physical-expr => physical-expr-common}/src/aggregate/groups_accumulator/prim_op.rs (96%) delete mode 100644 datafusion/physical-expr/src/aggregate/utils.rs diff --git a/Cargo.toml b/Cargo.toml index 04c5f4755fe4..e0edf30efaec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,9 @@ version = "38.0.0" # for the inherited dependency but cannot do the reverse (override from true to false). # # See for more detaiils: https://github.com/rust-lang/cargo/issues/11329 +ahash = { version = "0.8", default-features = false, features = [ + "runtime-rng", +] } arrow = { version = "51.0.0", features = ["prettyprint"] } arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "51.0.0", default-features = false } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 62ae154a4225..db87f85e346b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1367,6 +1367,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr", + "rand", ] [[package]] diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 22944133f89b..7085732b562e 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,9 +41,7 @@ backtrace = [] pyarrow = ["pyo3", "arrow/pyarrow", "parquet"] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } apache-avro = { version = "0.16", default-features = false, features = [ "bzip", "snappy", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index cef45e41a33c..a3e299752189 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -77,7 +77,7 @@ unicode_expressions = [ ] [dependencies] -ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } +ahash = { workspace = true } apache-avro = { version = "0.16", optional = true } arrow = { workspace = true } arrow-array = { workspace = true } diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 2759572581ea..df91d9313746 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -38,9 +38,7 @@ path = "src/lib.rs" [features] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } chrono = { workspace = true } diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index d1202c83d526..637b8775112e 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -39,3 +39,4 @@ path = "src/lib.rs" arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +rand = { workspace = true } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs similarity index 99% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs index 9850b002e40e..f109079f6a26 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,9 +19,9 @@ //! //! [`GroupsAccumulator`]: datafusion_expr::GroupsAccumulator +use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; +use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; -use arrow_array::{Array, BooleanArray, PrimitiveArray}; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use datafusion_expr::EmitTo; /// Track the accumulator null state per row: if any values for that @@ -462,9 +462,9 @@ fn initialize_builder( mod test { use super::*; - use arrow_array::UInt32Array; - use hashbrown::HashSet; + use arrow::array::UInt32Array; use rand::{rngs::ThreadRng, Rng}; + use std::collections::HashSet; #[test] fn accumulate() { diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs similarity index 97% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs index f40c661a7a2f..8498d69dd333 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs @@ -17,9 +17,8 @@ use std::sync::Arc; -use arrow::array::AsArray; -use arrow_array::{ArrayRef, BooleanArray}; -use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; +use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; +use arrow::buffer::BooleanBuffer; use datafusion_common::Result; use datafusion_expr::{EmitTo, GroupsAccumulator}; diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs new file mode 100644 index 000000000000..5b0182c5db8a --- /dev/null +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for implementing GroupsAccumulator + +pub mod accumulate; +pub mod bool_op; +pub mod prim_op; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs similarity index 96% rename from datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs rename to datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index 994f5447d7c0..debb36852b22 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -17,9 +17,9 @@ use std::sync::Arc; -use arrow::{array::AsArray, datatypes::ArrowPrimitiveType}; -use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray}; -use arrow_schema::DataType; +use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}; +use arrow::datatypes::ArrowPrimitiveType; +use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_expr::{EmitTo, GroupsAccumulator}; diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index da24f335b2f8..4ef0d58046f8 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod groups_accumulator; pub mod stats; pub mod utils; diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs b/datafusion/physical-expr-common/src/aggregate/utils.rs index 9821ba626b18..c59c29a139d8 100644 --- a/datafusion/physical-expr-common/src/aggregate/utils.rs +++ b/datafusion/physical-expr-common/src/aggregate/utils.rs @@ -17,10 +17,18 @@ use std::{any::Any, sync::Arc}; +use arrow::datatypes::ArrowNativeType; use arrow::{ + array::{ArrayRef, ArrowNativeTypeOp, AsArray}, compute::SortOptions, - datatypes::{DataType, Field}, + datatypes::{ + DataType, Decimal128Type, DecimalType, Field, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, + ToByteSlice, + }, }; +use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_expr::Accumulator; use crate::sort_expr::PhysicalSortExpr; @@ -43,6 +51,60 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { } } +/// Convert scalar values from an accumulator into arrays. +pub fn get_accum_scalar_values_as_arrays( + accum: &mut dyn Accumulator, +) -> Result> { + accum + .state()? + .iter() + .map(|s| s.to_array_of_size(1)) + .collect() +} + +/// Adjust array type metadata if needed +/// +/// Since `Decimal128Arrays` created from `Vec` have +/// default precision and scale, this function adjusts the output to +/// match `data_type`, if necessary +pub fn adjust_output_array(data_type: &DataType, array: ArrayRef) -> Result { + let array = match data_type { + DataType::Decimal128(p, s) => Arc::new( + array + .as_primitive::() + .clone() + .with_precision_and_scale(*p, *s)?, + ) as ArrayRef, + DataType::Timestamp(TimeUnit::Nanosecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Microsecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Millisecond, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + DataType::Timestamp(TimeUnit::Second, tz) => Arc::new( + array + .as_primitive::() + .clone() + .with_timezone_opt(tz.clone()), + ), + // no adjustment needed for other arrays + _ => array, + }; + Ok(array) +} + /// Construct corresponding fields for lexicographical ordering requirement expression pub fn ordering_fields( ordering_req: &[PhysicalSortExpr], @@ -67,3 +129,101 @@ pub fn ordering_fields( pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec { ordering_req.iter().map(|item| item.options).collect() } + +/// A wrapper around a type to provide hash for floats +#[derive(Copy, Clone, Debug)] +pub struct Hashable(pub T); + +impl std::hash::Hash for Hashable { + fn hash(&self, state: &mut H) { + self.0.to_byte_slice().hash(state) + } +} + +impl PartialEq for Hashable { + fn eq(&self, other: &Self) -> bool { + self.0.is_eq(other.0) + } +} + +impl Eq for Hashable {} + +/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow +/// +/// This is needed because different precisions for Decimal128/Decimal256 can +/// store different ranges of values and thus sum/count may not fit in +/// the target type. +/// +/// For example, the precision is 3, the max of value is `999` and the min +/// value is `-999` +pub struct DecimalAverager { + /// scale factor for sum values (10^sum_scale) + sum_mul: T::Native, + /// scale factor for target (10^target_scale) + target_mul: T::Native, + /// the output precision + target_precision: u8, +} + +impl DecimalAverager { + /// Create a new `DecimalAverager`: + /// + /// * sum_scale: the scale of `sum` values passed to [`Self::avg`] + /// * target_precision: the output precision + /// * target_scale: the output scale + /// + /// Errors if the resulting data can not be stored + pub fn try_new( + sum_scale: i8, + target_precision: u8, + target_scale: i8, + ) -> Result { + let sum_mul = T::Native::from_usize(10_usize) + .map(|b| b.pow_wrapping(sum_scale as u32)) + .ok_or(DataFusionError::Internal( + "Failed to compute sum_mul in DecimalAverager".to_string(), + ))?; + + let target_mul = T::Native::from_usize(10_usize) + .map(|b| b.pow_wrapping(target_scale as u32)) + .ok_or(DataFusionError::Internal( + "Failed to compute target_mul in DecimalAverager".to_string(), + ))?; + + if target_mul >= sum_mul { + Ok(Self { + sum_mul, + target_mul, + target_precision, + }) + } else { + // can't convert the lit decimal to the returned data type + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } + + /// Returns the `sum`/`count` as a i128/i256 Decimal128/Decimal256 with + /// target_scale and target_precision and reporting overflow. + /// + /// * sum: The total sum value stored as Decimal128 with sum_scale + /// (passed to `Self::try_new`) + /// * count: total count, stored as a i128/i256 (*NOT* a Decimal128/Decimal256 value) + #[inline(always)] + pub fn avg(&self, sum: T::Native, count: T::Native) -> Result { + if let Ok(value) = sum.mul_checked(self.target_mul.div_wrapping(self.sum_mul)) { + let new_value = value.div_wrapping(count); + + let validate = + T::validate_decimal_precision(new_value, self.target_precision); + + if validate.is_ok() { + Ok(new_value) + } else { + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } else { + // can't convert the lit decimal to the returned data type + exec_err!("Arithmetic Overflow in AvgAccumulator") + } + } +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 7104b5d6605e..798654206e63 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -44,9 +44,7 @@ encoding_expressions = ["base64", "hex"] regex_expressions = ["regex"] [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index de090badd349..65227b727be7 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -15,10 +15,19 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod accumulate; mod adapter; -pub use accumulate::NullState; pub use adapter::GroupsAccumulatorAdapter; -pub(crate) mod bool_op; -pub(crate) mod prim_op; +// Backward compatibility +pub(crate) mod accumulate { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::{accumulate_indices, NullState}; +} + +pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; + +pub(crate) mod bool_op { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; +} +pub(crate) mod prim_op { + pub use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +} diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index eff008e8f825..93ecf0655e51 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -54,7 +54,12 @@ pub(crate) mod variance; pub mod build_in; pub mod moving_min_max; -pub mod utils; +pub mod utils { + pub use datafusion_physical_expr_common::aggregate::utils::{ + adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, + get_sort_options, ordering_fields, DecimalAverager, Hashable, + }; +} /// Checks whether the given aggregate expression is order-sensitive. /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs deleted file mode 100644 index 6d97ad3da6de..000000000000 --- a/datafusion/physical-expr/src/aggregate/utils.rs +++ /dev/null @@ -1,191 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utilities used in aggregates - -use std::sync::Arc; - -// For backwards compatibility -pub use datafusion_physical_expr_common::aggregate::utils::{ - down_cast_any_ref, get_sort_options, ordering_fields, -}; - -use arrow::array::{ArrayRef, ArrowNativeTypeOp}; -use arrow_array::cast::AsArray; -use arrow_array::types::{ - Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, -}; -use arrow_buffer::{ArrowNativeType, ToByteSlice}; -use arrow_schema::DataType; -use datafusion_common::{exec_err, DataFusionError, Result}; -use datafusion_expr::Accumulator; - -/// Convert scalar values from an accumulator into arrays. -pub fn get_accum_scalar_values_as_arrays( - accum: &mut dyn Accumulator, -) -> Result> { - accum - .state()? - .iter() - .map(|s| s.to_array_of_size(1)) - .collect() -} - -/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow -/// -/// This is needed because different precisions for Decimal128/Decimal256 can -/// store different ranges of values and thus sum/count may not fit in -/// the target type. -/// -/// For example, the precision is 3, the max of value is `999` and the min -/// value is `-999` -pub(crate) struct DecimalAverager { - /// scale factor for sum values (10^sum_scale) - sum_mul: T::Native, - /// scale factor for target (10^target_scale) - target_mul: T::Native, - /// the output precision - target_precision: u8, -} - -impl DecimalAverager { - /// Create a new `DecimalAverager`: - /// - /// * sum_scale: the scale of `sum` values passed to [`Self::avg`] - /// * target_precision: the output precision - /// * target_scale: the output scale - /// - /// Errors if the resulting data can not be stored - pub fn try_new( - sum_scale: i8, - target_precision: u8, - target_scale: i8, - ) -> Result { - let sum_mul = T::Native::from_usize(10_usize) - .map(|b| b.pow_wrapping(sum_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute sum_mul in DecimalAverager".to_string(), - ))?; - - let target_mul = T::Native::from_usize(10_usize) - .map(|b| b.pow_wrapping(target_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute target_mul in DecimalAverager".to_string(), - ))?; - - if target_mul >= sum_mul { - Ok(Self { - sum_mul, - target_mul, - target_precision, - }) - } else { - // can't convert the lit decimal to the returned data type - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } - - /// Returns the `sum`/`count` as a i128/i256 Decimal128/Decimal256 with - /// target_scale and target_precision and reporting overflow. - /// - /// * sum: The total sum value stored as Decimal128 with sum_scale - /// (passed to `Self::try_new`) - /// * count: total count, stored as a i128/i256 (*NOT* a Decimal128/Decimal256 value) - #[inline(always)] - pub fn avg(&self, sum: T::Native, count: T::Native) -> Result { - if let Ok(value) = sum.mul_checked(self.target_mul.div_wrapping(self.sum_mul)) { - let new_value = value.div_wrapping(count); - - let validate = - T::validate_decimal_precision(new_value, self.target_precision); - - if validate.is_ok() { - Ok(new_value) - } else { - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } else { - // can't convert the lit decimal to the returned data type - exec_err!("Arithmetic Overflow in AvgAccumulator") - } - } -} - -/// Adjust array type metadata if needed -/// -/// Since `Decimal128Arrays` created from `Vec` have -/// default precision and scale, this function adjusts the output to -/// match `data_type`, if necessary -pub fn adjust_output_array( - data_type: &DataType, - array: ArrayRef, -) -> Result { - let array = match data_type { - DataType::Decimal128(p, s) => Arc::new( - array - .as_primitive::() - .clone() - .with_precision_and_scale(*p, *s)?, - ) as ArrayRef, - DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - DataType::Timestamp(arrow_schema::TimeUnit::Second, tz) => Arc::new( - array - .as_primitive::() - .clone() - .with_timezone_opt(tz.clone()), - ), - // no adjustment needed for other arrays - _ => array, - }; - Ok(array) -} - -/// A wrapper around a type to provide hash for floats -#[derive(Copy, Clone, Debug)] -pub(crate) struct Hashable(pub T); - -impl std::hash::Hash for Hashable { - fn hash(&self, state: &mut H) { - self.0.to_byte_slice().hash(state) - } -} - -impl PartialEq for Hashable { - fn eq(&self, other: &Self) -> bool { - self.0.is_eq(other.0) - } -} - -impl Eq for Hashable {} diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 3e9c9ee0e6c0..dac2a24d359d 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -36,9 +36,7 @@ name = "datafusion_physical_plan" path = "src/lib.rs" [dependencies] -ahash = { version = "0.8", default-features = false, features = [ - "runtime-rng", -] } +ahash = { workspace = true } arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true }