From 5883e43db6c16d3ac3616302606849abbfbc86eb Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 10 Nov 2022 11:41:01 -0800 Subject: [PATCH] Use f64::total_cmp instead of OrderedFloat (#4133) * Replace OrderedFloat with f64 * clippy * Adding hasher * fixed comments * fixed comments * fmt * comments fixed * removed ordered_flost from toml * changed cargo.lock --- datafusion-cli/Cargo.lock | 14 +- datafusion/common/Cargo.toml | 1 - datafusion/common/src/scalar.rs | 79 +++--- datafusion/core/Cargo.toml | 1 - datafusion/physical-expr/Cargo.toml | 1 - .../src/aggregate/approx_percentile_cont.rs | 10 +- .../approx_percentile_cont_with_weight.rs | 4 +- .../src/aggregate/count_distinct.rs | 7 +- .../physical-expr/src/aggregate/tdigest.rs | 236 +++++++----------- 9 files changed, 136 insertions(+), 217 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 8d1e53ec270a..774f57ed9fdc 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -595,7 +595,6 @@ dependencies = [ "log", "num_cpus", "object_store", - "ordered-float 3.3.0", "parking_lot", "parquet", "paste", @@ -635,7 +634,6 @@ dependencies = [ "arrow", "chrono", "object_store", - "ordered-float 3.3.0", "parquet", "sqlparser", ] @@ -685,7 +683,6 @@ dependencies = [ "lazy_static", "md-5", "num-traits", - "ordered-float 3.3.0", "paste", "rand", "regex", @@ -1616,15 +1613,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "ordered-float" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f74e330193f90ec45e2b257fa3ef6df087784157ac1ad2c1e71c62837b03aa7" -dependencies = [ - "num-traits", -] - [[package]] name = "os_str_bytes" version = "6.3.1" @@ -2260,7 +2248,7 @@ checksum = "09678c4cdbb4eed72e18b7c2af1329c69825ed16fcbac62d083fc3e2b0590ff0" dependencies = [ "byteorder", "integer-encoding", - "ordered-float 1.1.1", + "ordered-float", ] [[package]] diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index d62aa1dd1a08..cbbaf4337cd9 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -44,7 +44,6 @@ arrow = { version = "26.0.0", default-features = false } chrono = { version = "0.4", default-features = false } cranelift-module = { version = "0.89.0", optional = true } object_store = { version = "0.5.0", default-features = false, optional = true } -ordered-float = "3.0" parquet = { version = "26.0.0", default-features = false, optional = true } pyo3 = { version = "0.17.1", optional = true } sqlparser = "0.26" diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index facc9ed427f0..76c7cacccc7e 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -24,6 +24,9 @@ use std::ops::{Add, Sub}; use std::str::FromStr; use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; +use crate::cast::as_struct_array; +use crate::delta::shift_months; +use crate::error::{DataFusionError, Result}; use arrow::{ array::*, compute::kernels::cast::{cast, cast_with_options, CastOptions}, @@ -37,11 +40,6 @@ use arrow::{ }, }; use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime}; -use ordered_float::OrderedFloat; - -use crate::cast::as_struct_array; -use crate::delta::shift_months; -use crate::error::{DataFusionError, Result}; /// Represents a dynamically typed, nullable single value. /// This is the single-valued counter-part of arrow's `Array`. @@ -116,8 +114,7 @@ pub enum ScalarValue { Dictionary(Box, Box), } -// manual implementation of `PartialEq` that uses OrderedFloat to -// get defined behavior for floating point +// manual implementation of `PartialEq` impl PartialEq for ScalarValue { fn eq(&self, other: &Self) -> bool { use ScalarValue::*; @@ -131,17 +128,15 @@ impl PartialEq for ScalarValue { (Decimal128(_, _, _), _) => false, (Boolean(v1), Boolean(v2)) => v1.eq(v2), (Boolean(_), _) => false, - (Float32(v1), Float32(v2)) => { - let v1 = v1.map(OrderedFloat); - let v2 = v2.map(OrderedFloat); - v1.eq(&v2) - } + (Float32(v1), Float32(v2)) => match (v1, v2) { + (Some(f1), Some(f2)) => f1.to_bits() == f2.to_bits(), + _ => v1.eq(v2), + }, (Float32(_), _) => false, - (Float64(v1), Float64(v2)) => { - let v1 = v1.map(OrderedFloat); - let v2 = v2.map(OrderedFloat); - v1.eq(&v2) - } + (Float64(v1), Float64(v2)) => match (v1, v2) { + (Some(f1), Some(f2)) => f1.to_bits() == f2.to_bits(), + _ => v1.eq(v2), + }, (Float64(_), _) => false, (Int8(v1), Int8(v2)) => v1.eq(v2), (Int8(_), _) => false, @@ -201,8 +196,7 @@ impl PartialEq for ScalarValue { } } -// manual implementation of `PartialOrd` that uses OrderedFloat to -// get defined behavior for floating point +// manual implementation of `PartialOrd` impl PartialOrd for ScalarValue { fn partial_cmp(&self, other: &Self) -> Option { use ScalarValue::*; @@ -221,17 +215,15 @@ impl PartialOrd for ScalarValue { (Decimal128(_, _, _), _) => None, (Boolean(v1), Boolean(v2)) => v1.partial_cmp(v2), (Boolean(_), _) => None, - (Float32(v1), Float32(v2)) => { - let v1 = v1.map(OrderedFloat); - let v2 = v2.map(OrderedFloat); - v1.partial_cmp(&v2) - } + (Float32(v1), Float32(v2)) => match (v1, v2) { + (Some(f1), Some(f2)) => Some(f1.total_cmp(f2)), + _ => v1.partial_cmp(v2), + }, (Float32(_), _) => None, - (Float64(v1), Float64(v2)) => { - let v1 = v1.map(OrderedFloat); - let v2 = v2.map(OrderedFloat); - v1.partial_cmp(&v2) - } + (Float64(v1), Float64(v2)) => match (v1, v2) { + (Some(f1), Some(f2)) => Some(f1.total_cmp(f2)), + _ => v1.partial_cmp(v2), + }, (Float64(_), _) => None, (Int8(v1), Int8(v2)) => v1.partial_cmp(v2), (Int8(_), _) => None, @@ -625,8 +617,23 @@ where intermediate.add(Duration::milliseconds(ms as i64)) } -// manual implementation of `Hash` that uses OrderedFloat to -// get defined behavior for floating point +//Float wrapper over f32/f64. Just because we cannot build std::hash::Hash for floats directly we have to do it through type wrapper +struct Fl(T); + +macro_rules! hash_float_value { + ($(($t:ty, $i:ty)),+) => { + $(impl std::hash::Hash for Fl<$t> { + #[inline] + fn hash(&self, state: &mut H) { + state.write(&<$i>::from_ne_bytes(self.0.to_ne_bytes()).to_ne_bytes()) + } + })+ + }; +} + +hash_float_value!((f64, u64), (f32, u32)); + +// manual implementation of `Hash` impl std::hash::Hash for ScalarValue { fn hash(&self, state: &mut H) { use ScalarValue::*; @@ -637,14 +644,8 @@ impl std::hash::Hash for ScalarValue { s.hash(state) } Boolean(v) => v.hash(state), - Float32(v) => { - let v = v.map(OrderedFloat); - v.hash(state) - } - Float64(v) => { - let v = v.map(OrderedFloat); - v.hash(state) - } + Float32(v) => v.map(Fl).hash(state), + Float64(v) => v.map(Fl).hash(state), Int8(v) => v.hash(state), Int16(v) => v.hash(state), Int32(v) => v.hash(state), diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 9ca47864da73..039d0ebe6866 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -80,7 +80,6 @@ log = "^0.4" num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" object_store = "0.5.0" -ordered-float = "3.0" parking_lot = "0.12" parquet = { version = "26.0.0", features = ["arrow", "async"] } paste = "^1.0" diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index df4f8c3c8315..91b5baaa5c0e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,7 +55,6 @@ itertools = { version = "0.10", features = ["use_std"] } lazy_static = { version = "^1.4.0" } md-5 = { version = "^0.10.0", optional = true } num-traits = { version = "0.2", default-features = false } -ordered-float = "3.0" paste = "^1.0" rand = "0.8" regex = { version = "^1.4.3", optional = true } diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index 6dfa24746305..e9f9f07212a6 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::aggregate::tdigest::TryIntoOrderedF64; +use crate::aggregate::tdigest::TryIntoF64; use crate::aggregate::tdigest::{TDigest, DEFAULT_MAX_SIZE}; use crate::expressions::{format_state_name, Literal}; use crate::{AggregateExpr, PhysicalExpr}; @@ -30,7 +30,6 @@ use datafusion_common::DataFusionError; use datafusion_common::Result; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_expr::{Accumulator, AggregateState}; -use ordered_float::OrderedFloat; use std::{any::Any, iter, sync::Arc}; /// APPROX_PERCENTILE_CONT aggregate expression @@ -267,9 +266,7 @@ impl ApproxPercentileAccumulator { self.digest = TDigest::merge_digests(digests); } - pub(crate) fn convert_to_ordered_float( - values: &ArrayRef, - ) -> Result>> { + pub(crate) fn convert_to_float(values: &ArrayRef) -> Result> { match values.data_type() { DataType::Float64 => { let array = downcast_value!(values, Float64Array); @@ -371,8 +368,7 @@ impl Accumulator for ApproxPercentileAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; let sorted_values = &arrow::compute::sort(values, None)?; - let sorted_values = - ApproxPercentileAccumulator::convert_to_ordered_float(sorted_values)?; + let sorted_values = ApproxPercentileAccumulator::convert_to_float(sorted_values)?; self.digest = self.digest.merge_sorted_f64(&sorted_values); Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs index 40a44c3a55e1..85426015e3a0 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs @@ -126,8 +126,8 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator { weights.len(), "invalid number of values in means and weights" ); - let means_f64 = ApproxPercentileAccumulator::convert_to_ordered_float(means)?; - let weights_f64 = ApproxPercentileAccumulator::convert_to_ordered_float(weights)?; + let means_f64 = ApproxPercentileAccumulator::convert_to_float(means)?; + let weights_f64 = ApproxPercentileAccumulator::convert_to_float(weights)?; let mut digests: Vec = vec![]; for (mean, weight) in means_f64.iter().zip(weights_f64.iter()) { digests.push(TDigest::new_with_centroid( diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index fe1503bdb16e..588b739097d5 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -410,7 +410,6 @@ mod tests { macro_rules! test_count_distinct_update_batch_floating_point { ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{ - use ordered_float::OrderedFloat; let values: Vec> = vec![ Some(<$PRIM_TYPE>::INFINITY), Some(<$PRIM_TYPE>::NAN), @@ -437,10 +436,10 @@ mod tests { let mut state_vec = state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); + + dbg!(&state_vec); state_vec.sort_by(|a, b| match (a, b) { - (Some(lhs), Some(rhs)) => { - OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs)) - } + (Some(lhs), Some(rhs)) => lhs.total_cmp(rhs), _ => a.partial_cmp(b).unwrap(), }); diff --git a/datafusion/physical-expr/src/aggregate/tdigest.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs index 6314a2af6ca3..e4112d848014 100644 --- a/datafusion/physical-expr/src/aggregate/tdigest.rs +++ b/datafusion/physical-expr/src/aggregate/tdigest.rs @@ -28,20 +28,18 @@ //! [Facebook's Folly TDigest]: https://github.com/facebook/folly/blob/main/folly/stats/TDigest.h use arrow::datatypes::DataType; -use datafusion_common::DataFusionError; use datafusion_common::Result; use datafusion_common::ScalarValue; -use ordered_float::OrderedFloat; use std::cmp::Ordering; pub const DEFAULT_MAX_SIZE: usize = 100; -// Cast a non-null [`ScalarValue::Float64`] to an [`OrderedFloat`], or +// Cast a non-null [`ScalarValue::Float64`] to an [`f64`], or // panic. macro_rules! cast_scalar_f64 { ($value:expr ) => { match &$value { - ScalarValue::Float64(Some(v)) => OrderedFloat::from(*v), + ScalarValue::Float64(Some(v)) => *v, v => panic!("invalid type {:?}", v), } }; @@ -50,22 +48,22 @@ macro_rules! cast_scalar_f64 { /// This trait is implemented for each type a [`TDigest`] can operate on, /// allowing it to support both numerical rust types (obtained from /// `PrimitiveArray` instances), and [`ScalarValue`] instances. -pub(crate) trait TryIntoOrderedF64 { - /// A fallible conversion of a possibly null `self` into a [`OrderedFloat`]. +pub(crate) trait TryIntoF64 { + /// A fallible conversion of a possibly null `self` into a [`f64`]. /// /// If `self` is null, this method must return `Ok(None)`. /// /// If `self` cannot be coerced to the desired type, this method must return /// an `Err` variant. - fn try_as_f64(&self) -> Result>>; + fn try_as_f64(&self) -> Result>; } -/// Generate an infallible conversion from `type` to an [`OrderedFloat`]. +/// Generate an infallible conversion from `type` to an [`f64`]. macro_rules! impl_try_ordered_f64 { ($type:ty) => { - impl TryIntoOrderedF64 for $type { - fn try_as_f64(&self) -> Result>> { - Ok(Some(OrderedFloat::from(*self as f64))) + impl TryIntoF64 for $type { + fn try_as_f64(&self) -> Result> { + Ok(Some(*self as f64)) } } }; @@ -82,33 +80,11 @@ impl_try_ordered_f64!(u32); impl_try_ordered_f64!(u16); impl_try_ordered_f64!(u8); -impl TryIntoOrderedF64 for ScalarValue { - fn try_as_f64(&self) -> Result>> { - match self { - ScalarValue::Float32(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::Float64(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::Int8(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::Int16(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::Int32(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::Int64(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::UInt8(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::UInt16(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::UInt32(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - ScalarValue::UInt64(v) => Ok(v.map(|v| OrderedFloat::from(v as f64))), - - got => Err(DataFusionError::NotImplemented(format!( - "Support for 'TryIntoOrderedF64' for data type {} is not implemented", - got - ))), - } - } -} - /// Centroid implementation to the cluster mentioned in the paper. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Clone)] pub(crate) struct Centroid { - mean: OrderedFloat, - weight: OrderedFloat, + mean: f64, + weight: f64, } impl PartialOrd for Centroid { @@ -117,64 +93,56 @@ impl PartialOrd for Centroid { } } +impl Eq for Centroid {} + impl Ord for Centroid { fn cmp(&self, other: &Centroid) -> Ordering { - self.mean.cmp(&other.mean) + self.mean.total_cmp(&other.mean) } } impl Centroid { - pub(crate) fn new( - mean: impl Into>, - weight: impl Into>, - ) -> Self { - Centroid { - mean: mean.into(), - weight: weight.into(), - } + pub(crate) fn new(mean: f64, weight: f64) -> Self { + Centroid { mean, weight } } #[inline] - pub(crate) fn mean(&self) -> OrderedFloat { + pub(crate) fn mean(&self) -> f64 { self.mean } #[inline] - pub(crate) fn weight(&self) -> OrderedFloat { + pub(crate) fn weight(&self) -> f64 { self.weight } - pub(crate) fn add( - &mut self, - sum: impl Into>, - weight: impl Into>, - ) -> f64 { - let new_sum = sum.into() + self.weight * self.mean; - let new_weight = self.weight + weight.into(); + pub(crate) fn add(&mut self, sum: f64, weight: f64) -> f64 { + let new_sum = sum + self.weight * self.mean; + let new_weight = self.weight + weight; self.weight = new_weight; self.mean = new_sum / new_weight; - new_sum.into_inner() + new_sum } } impl Default for Centroid { fn default() -> Self { Centroid { - mean: OrderedFloat::from(0.0), - weight: OrderedFloat::from(1.0), + mean: 0_f64, + weight: 1_f64, } } } /// T-Digest to be operated on. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Clone)] pub(crate) struct TDigest { centroids: Vec, max_size: usize, - sum: OrderedFloat, - count: OrderedFloat, - max: OrderedFloat, - min: OrderedFloat, + sum: f64, + count: f64, + max: f64, + min: f64, } impl TDigest { @@ -182,10 +150,10 @@ impl TDigest { TDigest { centroids: Vec::new(), max_size, - sum: OrderedFloat::from(0.0), - count: OrderedFloat::from(0.0), - max: OrderedFloat::from(std::f64::NAN), - min: OrderedFloat::from(std::f64::NAN), + sum: 0_f64, + count: 0_f64, + max: std::f64::NAN, + min: std::f64::NAN, } } @@ -194,7 +162,7 @@ impl TDigest { centroids: vec![centroid.clone()], max_size, sum: centroid.mean * centroid.weight, - count: OrderedFloat::from(1.0), + count: 1_f64, max: centroid.mean, min: centroid.mean, } @@ -202,17 +170,17 @@ impl TDigest { #[inline] pub(crate) fn count(&self) -> f64 { - self.count.into_inner() + self.count } #[inline] pub(crate) fn max(&self) -> f64 { - self.max.into_inner() + self.max } #[inline] pub(crate) fn min(&self) -> f64 { - self.min.into_inner() + self.min } #[inline] @@ -226,16 +194,16 @@ impl Default for TDigest { TDigest { centroids: Vec::new(), max_size: 100, - sum: OrderedFloat::from(0.0), - count: OrderedFloat::from(0.0), - max: OrderedFloat::from(std::f64::NAN), - min: OrderedFloat::from(std::f64::NAN), + sum: 0_f64, + count: 0_f64, + max: std::f64::NAN, + min: std::f64::NAN, } } } impl TDigest { - fn k_to_q(k: f64, d: f64) -> OrderedFloat { + fn k_to_q(k: f64, d: f64) -> f64 { let k_div_d = k / d; if k_div_d >= 0.5 { let base = 1.0 - k_div_d; @@ -243,14 +211,9 @@ impl TDigest { } else { 2.0 * k_div_d * k_div_d } - .into() } - fn clamp( - v: OrderedFloat, - lo: OrderedFloat, - hi: OrderedFloat, - ) -> OrderedFloat { + fn clamp(v: f64, lo: f64, hi: f64) -> f64 { if v > hi { hi } else if v < lo { @@ -261,19 +224,14 @@ impl TDigest { } #[cfg(test)] - pub(crate) fn merge_unsorted_f64( - &self, - unsorted_values: Vec>, - ) -> TDigest { + pub(crate) fn merge_unsorted_f64(&self, unsorted_values: Vec) -> TDigest { let mut values = unsorted_values; - values.sort(); + values.sort_by(|a, b| a.total_cmp(b)); self.merge_sorted_f64(&values) } - pub(crate) fn merge_sorted_f64( - &self, - sorted_values: &[OrderedFloat], - ) -> TDigest { + pub(crate) fn merge_sorted_f64(&self, sorted_values: &[f64]) -> TDigest { + dbg!(&sorted_values); #[cfg(debug_assertions)] debug_assert!(is_sorted(sorted_values), "unsorted input to TDigest"); @@ -282,14 +240,14 @@ impl TDigest { } let mut result = TDigest::new(self.max_size()); - result.count = OrderedFloat::from(self.count() + (sorted_values.len() as f64)); + result.count = self.count() + (sorted_values.len() as f64); let maybe_min = *sorted_values.first().unwrap(); let maybe_max = *sorted_values.last().unwrap(); if self.count() > 0.0 { - result.min = std::cmp::min(self.min, maybe_min); - result.max = std::cmp::max(self.max, maybe_max); + result.min = self.min.min(maybe_min); + result.max = self.max.max(maybe_max); } else { result.min = maybe_min; result.max = maybe_max; @@ -318,8 +276,8 @@ impl TDigest { let mut weight_so_far = curr.weight(); - let mut sums_to_merge = OrderedFloat::from(0.0); - let mut weights_to_merge = OrderedFloat::from(0.0); + let mut sums_to_merge = 0_f64; + let mut weights_to_merge = 0_f64; while iter_centroids.peek().is_some() || iter_sorted_values.peek().is_some() { let next: Centroid = if let Some(c) = iter_centroids.peek() { @@ -341,11 +299,9 @@ impl TDigest { sums_to_merge += next_sum; weights_to_merge += next.weight(); } else { - result.sum = OrderedFloat::from( - result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge), - ); - sums_to_merge = 0.0.into(); - weights_to_merge = 0.0.into(); + result.sum += curr.add(sums_to_merge, weights_to_merge); + sums_to_merge = 0_f64; + weights_to_merge = 0_f64; compressed.push(curr.clone()); q_limit_times_count = @@ -355,9 +311,7 @@ impl TDigest { } } - result.sum = OrderedFloat::from( - result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge), - ); + result.sum += curr.add(sums_to_merge, weights_to_merge); compressed.push(curr); compressed.shrink_to_fit(); compressed.sort(); @@ -423,8 +377,8 @@ impl TDigest { let mut starts: Vec = Vec::with_capacity(digests.len()); let mut count: f64 = 0.0; - let mut min = OrderedFloat::from(std::f64::INFINITY); - let mut max = OrderedFloat::from(std::f64::NEG_INFINITY); + let mut min = std::f64::INFINITY; + let mut max = std::f64::NEG_INFINITY; let mut start: usize = 0; for digest in digests.iter() { @@ -432,8 +386,8 @@ impl TDigest { let curr_count: f64 = digest.count(); if curr_count > 0.0 { - min = std::cmp::min(min, digest.min); - max = std::cmp::max(max, digest.max); + min = min.min(digest.min); + max = max.max(digest.max); count += curr_count; for centroid in &digest.centroids { centroids.push(centroid.clone()); @@ -472,8 +426,8 @@ impl TDigest { let mut iter_centroids = centroids.iter_mut(); let mut curr = iter_centroids.next().unwrap(); let mut weight_so_far = curr.weight(); - let mut sums_to_merge = OrderedFloat::from(0.0); - let mut weights_to_merge = OrderedFloat::from(0.0); + let mut sums_to_merge = 0_f64; + let mut weights_to_merge = 0_f64; for centroid in iter_centroids { weight_so_far += centroid.weight(); @@ -482,11 +436,9 @@ impl TDigest { sums_to_merge += centroid.mean() * centroid.weight(); weights_to_merge += centroid.weight(); } else { - result.sum = OrderedFloat::from( - result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge), - ); - sums_to_merge = OrderedFloat::from(0.0); - weights_to_merge = OrderedFloat::from(0.0); + result.sum += curr.add(sums_to_merge, weights_to_merge); + sums_to_merge = 0_f64; + weights_to_merge = 0_f64; compressed.push(curr.clone()); q_limit_times_count = Self::k_to_q(k_limit, max_size as f64) * (count as f64); @@ -495,14 +447,12 @@ impl TDigest { } } - result.sum = OrderedFloat::from( - result.sum.into_inner() + curr.add(sums_to_merge, weights_to_merge), - ); + result.sum += curr.add(sums_to_merge, weights_to_merge); compressed.push(curr.clone()); compressed.shrink_to_fit(); compressed.sort(); - result.count = OrderedFloat::from(count as f64); + result.count = count as f64; result.min = min; result.max = max; result.centroids = compressed; @@ -516,7 +466,7 @@ impl TDigest { } let count_ = self.count; - let rank = OrderedFloat::from(q) * count_; + let rank = q * count_; let mut pos: usize; let mut t; @@ -542,7 +492,7 @@ impl TDigest { } pos = self.centroids.len() - 1; - t = OrderedFloat::from(0.0); + t = 0_f64; for (k, centroid) in self.centroids.iter().enumerate() { if rank < t + centroid.weight() { @@ -554,7 +504,7 @@ impl TDigest { } } - let mut delta = OrderedFloat::from(0.0); + let mut delta = 0_f64; let mut min = self.min; let mut max = self.max; @@ -575,7 +525,7 @@ impl TDigest { let value = self.centroids[pos].mean() + ((rank - t) / self.centroids[pos].weight() - 0.5) * delta; - Self::clamp(value, min, max).into_inner() + Self::clamp(value, min, max) } /// This method decomposes the [`TDigest`] and its [`Centroid`] instances @@ -618,16 +568,16 @@ impl TDigest { let centroids: Vec<_> = self .centroids .iter() - .flat_map(|c| [c.mean().into_inner(), c.weight().into_inner()]) + .flat_map(|c| [c.mean(), c.weight()]) .map(|v| ScalarValue::Float64(Some(v))) .collect(); vec![ ScalarValue::UInt64(Some(self.max_size as u64)), - ScalarValue::Float64(Some(self.sum.into_inner())), - ScalarValue::Float64(Some(self.count.into_inner())), - ScalarValue::Float64(Some(self.max.into_inner())), - ScalarValue::Float64(Some(self.min.into_inner())), + ScalarValue::Float64(Some(self.sum)), + ScalarValue::Float64(Some(self.count)), + ScalarValue::Float64(Some(self.max)), + ScalarValue::Float64(Some(self.min)), ScalarValue::new_list(Some(centroids), DataType::Float64), ] } @@ -658,7 +608,8 @@ impl TDigest { let max = cast_scalar_f64!(&state[3]); let min = cast_scalar_f64!(&state[4]); - assert!(max >= min); + + assert!(max.total_cmp(&min).is_ge()); Self { max_size, @@ -672,8 +623,8 @@ impl TDigest { } #[cfg(debug_assertions)] -fn is_sorted(values: &[OrderedFloat]) -> bool { - values.windows(2).all(|w| w[0] <= w[1]) +fn is_sorted(values: &[f64]) -> bool { + values.windows(2).all(|w| w[0].total_cmp(&w[1]).is_le()) } #[cfg(test)] @@ -716,9 +667,7 @@ mod tests { #[test] fn test_int64_uniform() { - let values = (1i64..=1000) - .map(|v| OrderedFloat::from(v as f64)) - .collect(); + let values = (1i64..=1000).map(|v| v as f64).collect(); let t = TDigest::new(100); let t = t.merge_unsorted_f64(values); @@ -737,7 +686,7 @@ mod tests { let mut t = TDigest::new(10); for v in vals { - t = t.merge_unsorted_f64(vec![OrderedFloat::from(v as f64)]); + t = t.merge_unsorted_f64(vec![v as f64]); } assert_error_bounds!(t, quantile = 0.5, want = 1.0); @@ -748,10 +697,7 @@ mod tests { #[test] fn test_merge_unsorted_against_uniform_distro() { let t = TDigest::new(100); - let values: Vec<_> = (1..=1_000_000) - .map(f64::from) - .map(|v| OrderedFloat::from(v as f64)) - .collect(); + let values: Vec<_> = (1..=1_000_000).map(f64::from).collect(); let t = t.merge_unsorted_f64(values); @@ -766,13 +712,8 @@ mod tests { #[test] fn test_merge_unsorted_against_skewed_distro() { let t = TDigest::new(100); - let mut values: Vec<_> = (1..=600_000) - .map(f64::from) - .map(|v| OrderedFloat::from(v as f64)) - .collect(); - for _ in 0..400_000 { - values.push(OrderedFloat::from(1_000_000_f64)); - } + let mut values: Vec<_> = (1..=600_000).map(f64::from).collect(); + values.resize(1_000_000, 1_000_000_f64); let t = t.merge_unsorted_f64(values); @@ -788,10 +729,7 @@ mod tests { for _ in 1..=100 { let t = TDigest::new(100); - let values: Vec<_> = (1..=1_000) - .map(f64::from) - .map(|v| OrderedFloat::from(v as f64)) - .collect(); + let values: Vec<_> = (1..=1_000).map(f64::from).collect(); let t = t.merge_unsorted_f64(values); digests.push(t) }