From d3fe7974773ee638322a8e93b530f6f8545cc3a6 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Mon, 13 Feb 2023 22:48:35 +0800 Subject: [PATCH 1/2] [feat] Support `unary_dyn_mut` in arrow-arth. --- arrow-arith/src/arity.rs | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/arrow-arith/src/arity.rs b/arrow-arith/src/arity.rs index 3e7a81862927..2a9a9494d892 100644 --- a/arrow-arith/src/arity.rs +++ b/arrow-arith/src/arity.rs @@ -152,6 +152,37 @@ where } } +/// Applies an infallible unary function to an [`ArrayRef`] with primitive values. +/// If the buffer of ArrayRef is not shared with other arrays, then func will +/// mutate the buffer directly without allocating new buffer. +pub fn unary_dyn_mut(array: ArrayRef, op: F) -> Result +where + T: ArrowPrimitiveType, + F: Fn(T::Native) -> T::Native, +{ + let array_ref = array.as_ref(); + downcast_dictionary_array! { + array_ref => unary_dict::<_, F, T>(array_ref, op).map_err(|_| array), + t => { + //Todo support unary_dict_mut + if PrimitiveArray::::is_compatible(t) { + let primitive_array = array.as_any().downcast_ref::>().unwrap().clone(); + // Need drop the strong ref which clone before in this function. + std::mem::drop(array); + match unary_mut::( + primitive_array, + op, + ) { + Ok(arr) => Ok(Arc::new(arr)), + Err(arr) => Err(Arc::new(arr)), + } + } else { + Err(array) + } + } + } +} + /// Applies a fallible unary function to an array with primitive values. pub fn try_unary_dyn(array: &dyn Array, op: F) -> Result where @@ -576,6 +607,27 @@ mod tests { ); } + #[test] + fn test_unary_f64_mut() { + // 1. only have one strong ref, use copy on write. + let input = + Float64Array::from(vec![Some(5.1f64), None, Some(6.8), None, Some(7.2)]); + let result = + unary_dyn_mut::<_, Float64Type>(make_array(input.into_data()), |n| n + 1.0) + .unwrap(); + assert_eq!( + result.as_any().downcast_ref::().unwrap(), + &Float64Array::from(vec![Some(6.1f64), None, Some(7.8), None, Some(8.2)]) + ); + + // 2. More than one strong ref + let input = + Float64Array::from(vec![Some(5.1f64), None, Some(6.8), None, Some(7.2)]); + let slice = input.slice(1, 4); + let result = unary_dyn_mut::<_, Float64Type>(slice, |n| n + 1.0); + assert!(result.is_err()) + } + #[test] fn test_unary_dict_and_unary_dyn() { let mut builder = PrimitiveDictionaryBuilder::::new(); From cff197d8b48ec9ed57a6216fb293bda29ccc10b3 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 14 Feb 2023 23:32:42 +0800 Subject: [PATCH 2/2] add bench and expose add --- arrow-arith/src/arithmetic.rs | 14 ++++++++ arrow-arith/src/arity.rs | 17 +++++++++ arrow/Cargo.toml | 6 ++++ arrow/benches/cow.rs | 68 +++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+) create mode 100644 arrow/benches/cow.rs diff --git a/arrow-arith/src/arithmetic.rs b/arrow-arith/src/arithmetic.rs index 0db32d575761..8283f5a9eb12 100644 --- a/arrow-arith/src/arithmetic.rs +++ b/arrow-arith/src/arithmetic.rs @@ -939,6 +939,20 @@ where unary_dyn::<_, T>(array, |value| value.add_wrapping(scalar)) } +pub fn add_scalar_dyn_mut( + array: ArrayRef, + scalar: T::Native, +) -> Result +where + T: ArrowNumericType, + T::Native: ArrowNativeTypeOp, +{ + match unary_dyn_mut::<_, T>(array, |value| value.add_wrapping(scalar)) { + Ok(array) => Ok(array), + Err(array) => add_scalar_dyn::(&array, scalar), + } +} + /// Add every value in an array by a scalar. If any value in the array is null then the /// result is also null. The given array must be a `PrimitiveArray` of the type same as /// the scalar, or a `DictionaryArray` of the value type same as the scalar. diff --git a/arrow-arith/src/arity.rs b/arrow-arith/src/arity.rs index 2a9a9494d892..4efeb6f318d9 100644 --- a/arrow-arith/src/arity.rs +++ b/arrow-arith/src/arity.rs @@ -583,6 +583,7 @@ where #[cfg(test)] mod tests { use super::*; + use crate::arithmetic::{add_scalar_dyn, add_scalar_dyn_mut}; use arrow_array::builder::*; use arrow_array::cast::*; use arrow_array::types::*; @@ -627,6 +628,22 @@ mod tests { let result = unary_dyn_mut::<_, Float64Type>(slice, |n| n + 1.0); assert!(result.is_err()) } + #[test] + fn profile() { + let mut vec = vec![]; + let batch_size = 16384; + for i in 0..1024 { + let array = PrimitiveArray::::from_iter_values( + (0..batch_size).map(|x| x as f32), + ); + vec.push(make_array(array.into_data())); + } + + for i in 0..vec.len() { + add_scalar_dyn::(vec.pop().unwrap().as_ref(), 1 as f32) + .expect("panic message"); + } + } #[test] fn test_unary_dict_and_unary_dyn() { diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index ef89e5a81232..5aaa53e92653 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -296,3 +296,9 @@ required-features = ["pyarrow"] [[test]] name = "array_cast" required-features = ["chrono-tz"] + +[[bench]] +name = "cow" +harness = false +required-features = ["test_utils"] + diff --git a/arrow/benches/cow.rs b/arrow/benches/cow.rs new file mode 100644 index 000000000000..931534e6a4c5 --- /dev/null +++ b/arrow/benches/cow.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +use arrow::util::bench_util::create_primitive_array; +use arrow_arith::arithmetic::{add_scalar_dyn, add_scalar_dyn_mut}; +use arrow_array::types::Float32Type; +use arrow_array::{make_array, Array, ArrayRef, PrimitiveArray}; +use arrow_schema::ArrowError; +use criterion::Criterion; + +extern crate arrow; + +const BATCH_SIZE: i32 = 163840; +const BATCH_NUM: i32 = 1024 * 10; +fn add_cow() { + let mut vec = vec![]; + for i in 0..BATCH_NUM { + let array = PrimitiveArray::::from_iter_values( + (0..BATCH_SIZE).map(|x| x as f32), + ); + vec.push(make_array(array.into_data())); + } + + for i in 0..vec.len() { + add_scalar_dyn_mut::(vec.pop().unwrap(), 1 as f32) + .expect("panic message"); + } +} + +fn add_copy() { + let mut vec = vec![]; + for i in 0..BATCH_NUM { + let array = PrimitiveArray::::from_iter_values( + (0..BATCH_SIZE).map(|x| x as f32), + ); + vec.push(make_array(array.into_data())); + } + + for i in 0..vec.len() { + add_scalar_dyn::(vec.pop().unwrap().as_ref(), 1 as f32) + .expect("panic message"); + } +} + +fn cow_benchmark(c: &mut Criterion) { + c.bench_function(&format!("add"), |b| { + b.iter(|| criterion::black_box(add_cow())) + }); +} + +criterion_group!(benches, cow_benchmark); +criterion_main!(benches);