From 80ca94ed33e1f9f27b188a2c3b7f327138bb67e8 Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Fri, 22 Mar 2024 13:25:08 +0800 Subject: [PATCH 1/4] optimization: concat function fix: concat_ws chore: add license header add arrow feature update concat --- datafusion/physical-expr/Cargo.toml | 6 +- datafusion/physical-expr/benches/concat.rs | 47 ++ datafusion/physical-expr/src/functions.rs | 6 +- .../physical-expr/src/string_expressions.rs | 782 ++++++++++++++++-- 4 files changed, 763 insertions(+), 78 deletions(-) create mode 100644 datafusion/physical-expr/benches/concat.rs diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 24b831e7c575..9b2adb03b066 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -48,7 +48,7 @@ unicode_expressions = ["unicode-segmentation"] ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -arrow = { workspace = true } +arrow = { workspace = true, features = ["test_utils"] } arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-ord = { workspace = true } @@ -84,3 +84,7 @@ tokio = { workspace = true, features = ["rt-multi-thread"] } [[bench]] harness = false name = "in_list" + +[[bench]] +harness = false +name = "concat" diff --git a/datafusion/physical-expr/benches/concat.rs b/datafusion/physical-expr/benches/concat.rs new file mode 100644 index 000000000000..cdd54d767f1f --- /dev/null +++ b/datafusion/physical-expr/benches/concat.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::bench_util::create_string_array_with_len; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::string_expressions::concat; +use std::sync::Arc; + +fn create_args(size: usize, str_len: usize) -> Vec { + let array = Arc::new(create_string_array_with_len::(size, 0.2, str_len)); + let scalar = ScalarValue::Utf8(Some(", ".to_string())); + vec![ + ColumnarValue::Array(array.clone()), + ColumnarValue::Scalar(scalar), + ColumnarValue::Array(array), + ] +} + +fn criterion_benchmark(c: &mut Criterion) { + for size in [1024, 4096, 8192] { + let args = create_args(size, 32); + let mut group = c.benchmark_group("concat function"); + group.bench_function(BenchmarkId::new("concat", size), |b| { + b.iter(|| criterion::black_box(concat(&args).unwrap())) + }); + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 515511b15fbb..f609a6e9f01c 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -253,9 +253,9 @@ pub fn create_physical_fun( // string functions BuiltinScalarFunction::Coalesce => Arc::new(conditional_expressions::coalesce), BuiltinScalarFunction::Concat => Arc::new(string_expressions::concat), - BuiltinScalarFunction::ConcatWithSeparator => Arc::new(|args| { - make_scalar_function_inner(string_expressions::concat_ws)(args) - }), + BuiltinScalarFunction::ConcatWithSeparator => { + Arc::new(string_expressions::concat_ws) + } BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function_inner(string_expressions::initcap::)(args) diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index 2185b7c5b4a1..7e39ed8d6cfb 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -23,6 +23,7 @@ use std::sync::Arc; +use arrow::array::ArrayDataBuilder; use arrow::{ array::{ Array, ArrayRef, GenericStringArray, Int32Array, Int64Array, OffsetSizeTrait, @@ -30,6 +31,8 @@ use arrow::{ }, datatypes::DataType, }; +use arrow_buffer::{MutableBuffer, NullBuffer}; +use uuid::Uuid; use datafusion_common::Result; use datafusion_common::{ @@ -38,75 +41,268 @@ use datafusion_common::{ }; use datafusion_expr::ColumnarValue; -/// Concatenates the text representations of all the arguments. NULL arguments are ignored. -/// concat('abcde', 2, NULL, 22) = 'abcde222' -pub fn concat(args: &[ColumnarValue]) -> Result { - // do not accept 0 arguments. - if args.is_empty() { +/// applies a unary expression to `args[0]` that is expected to be downcastable to +/// a `GenericStringArray` and returns a `GenericStringArray` (which may have a different offset) +/// # Errors +/// This function errors when: +/// * the number of arguments is not 1 +/// * the first argument is not castable to a `GenericStringArray` +pub(crate) fn unary_string_function<'a, T, O, F, R>( + args: &[&'a dyn Array], + op: F, + name: &str, +) -> Result> +where + R: AsRef, + O: OffsetSizeTrait, + T: OffsetSizeTrait, + F: Fn(&'a str) -> R, +{ + if args.len() != 1 { return exec_err!( - "concat was called with {} arguments. It requires at least 1.", - args.len() + "{:?} args were supplied but {} takes exactly one argument", + args.len(), + name ); } - // first, decide whether to return a scalar or a vector. - let mut return_array = args.iter().filter_map(|x| match x { - ColumnarValue::Array(array) => Some(array.len()), - _ => None, - }); - if let Some(size) = return_array.next() { - let result = (0..size) - .map(|index| { - let mut owned_string: String = "".to_owned(); - for arg in args { - match arg { - ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { - if let Some(value) = maybe_value { - owned_string.push_str(value); - } - } - ColumnarValue::Array(v) => { - if v.is_valid(index) { - let v = as_string_array(v).unwrap(); - owned_string.push_str(v.value(index)); + let string_array = as_generic_string_array::(args[0])?; + + // first map is the iterator, second is for the `Option<_>` + Ok(string_array.iter().map(|string| string.map(&op)).collect()) +} + +fn handle<'a, F, R>(args: &'a [ColumnarValue], op: F, name: &str) -> Result +where + R: AsRef, + F: Fn(&'a str) -> R, +{ + match &args[0] { + ColumnarValue::Array(a) => match a.data_type() { + DataType::Utf8 => { + Ok(ColumnarValue::Array(Arc::new(unary_string_function::< + i32, + i32, + _, + _, + >( + &[a.as_ref()], op, name + )?))) + } + DataType::LargeUtf8 => { + Ok(ColumnarValue::Array(Arc::new(unary_string_function::< + i64, + i64, + _, + _, + >( + &[a.as_ref()], op, name + )?))) + } + other => exec_err!("Unsupported data type {other:?} for function {name}"), + }, + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(a) => { + let result = a.as_ref().map(|x| (op)(x).as_ref().to_string()); + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result))) + } + ScalarValue::LargeUtf8(a) => { + let result = a.as_ref().map(|x| (op)(x).as_ref().to_string()); + Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(result))) + } + other => exec_err!("Unsupported data type {other:?} for function {name}"), + }, + } +} + +/// Returns the numeric code of the first character of the argument. +/// ascii('x') = 120 +pub fn ascii(args: &[ArrayRef]) -> Result { + let string_array = as_generic_string_array::(&args[0])?; + + let result = string_array + .iter() + .map(|string| { + string.map(|string: &str| { + let mut chars = string.chars(); + chars.next().map_or(0, |v| v as i32) + }) + }) + .collect::(); + + Ok(Arc::new(result) as ArrayRef) +} + +/// Returns the character with the given code. chr(0) is disallowed because text data types cannot store that character. +/// chr(65) = 'A' +pub fn chr(args: &[ArrayRef]) -> Result { + let integer_array = as_int64_array(&args[0])?; + + // first map is the iterator, second is for the `Option<_>` + let result = integer_array + .iter() + .map(|integer: Option| { + integer + .map(|integer| { + if integer == 0 { + exec_err!("null character not permitted.") + } else { + match core::char::from_u32(integer as u32) { + Some(integer) => Ok(integer.to_string()), + None => { + exec_err!("requested character too large for encoding.") } } - _ => unreachable!(), } + }) + .transpose() + }) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) +} + +enum ColumnarValueRef<'a> { + Scalar(&'a [u8]), + NullableArray(&'a StringArray), + NonNullableArray(&'a StringArray), +} + +impl<'a> ColumnarValueRef<'a> { + #[inline] + fn is_valid(&self, i: usize) -> bool { + match &self { + Self::Scalar(_) | Self::NonNullableArray(_) => true, + Self::NullableArray(array) => array.is_valid(i), + } + } + + #[inline] + fn nulls(&self) -> Option { + match &self { + Self::Scalar(_) | Self::NonNullableArray(_) => None, + Self::NullableArray(array) => array.nulls().map(|b| b.clone()), + } + } +} + +struct StringArrayBuilder { + offsets_buffer: MutableBuffer, + value_buffer: MutableBuffer, +} + +impl StringArrayBuilder { + fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self { + let mut offsets_buffer = MutableBuffer::with_capacity( + (item_capacity + 1) * std::mem::size_of::(), + ); + unsafe { offsets_buffer.push_unchecked(0_i32) }; + Self { + offsets_buffer, + value_buffer: MutableBuffer::with_capacity(data_capacity), + } + } + + fn write(&mut self, column: &ColumnarValueRef, i: usize) { + match column { + ColumnarValueRef::Scalar(s) => { + self.value_buffer.extend_from_slice(s); + } + ColumnarValueRef::NullableArray(array) => { + if !CHECK_VALID || array.is_valid(i) { + self.value_buffer + .extend_from_slice(array.value(i).as_bytes()); } - Some(owned_string) - }) - .collect::(); - - Ok(ColumnarValue::Array(Arc::new(result))) - } else { - // short avenue with only scalars - let initial = Some("".to_string()); - let result = args.iter().fold(initial, |mut acc, rhs| { - if let Some(ref mut inner) = acc { - match rhs { - ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) => { - inner.push_str(v); - } - ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} - _ => unreachable!(""), + } + ColumnarValueRef::NonNullableArray(array) => { + self.value_buffer + .extend_from_slice(array.value(i).as_bytes()); + } + } + } + + fn append_offset(&mut self) { + let next_offset: i32 = self + .value_buffer + .len() + .try_into() + .expect("byte array offset overflow"); + unsafe { self.offsets_buffer.push_unchecked(next_offset) }; + } + + fn finish(self, null_buffer: Option) -> StringArray { + let array_builder = ArrayDataBuilder::new(DataType::Utf8) + .len(self.offsets_buffer.len() / std::mem::size_of::() - 1) + .add_buffer(self.offsets_buffer.into()) + .add_buffer(self.value_buffer.into()) + .nulls(null_buffer); + let array_data = unsafe { array_builder.build_unchecked() }; + StringArray::from(array_data) + } +} + +/// Concatenates the text representations of all the arguments. NULL arguments are ignored. +/// concat('abcde', 2, NULL, 22) = 'abcde222' +pub fn concat(args: &[ColumnarValue]) -> Result { + let array_len = args + .iter() + .filter_map(|x| match x { + ColumnarValue::Array(array) => Some(array.len()), + _ => None, + }) + .next(); + + // Scalar + if array_len.is_none() { + let mut result = String::new(); + for arg in args { + if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = arg { + result.push_str(v); + } + } + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))); + } + + // Array + let len = array_len.unwrap(); + let mut data_size = 0; + let mut columns = Vec::with_capacity(args.len()); + + for arg in args { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { + if let Some(s) = maybe_value { + data_size += s.len() * len; + columns.push(ColumnarValueRef::Scalar(s.as_bytes())); + } + } + ColumnarValue::Array(array) => { + let string_array = as_string_array(array)?; + data_size += string_array.values().len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableArray(string_array) + } else { + ColumnarValueRef::NonNullableArray(string_array) }; - }; - acc - }); - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result))) + columns.push(column); + } + _ => unreachable!(), + } } + + let mut builder = StringArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + columns + .iter() + .for_each(|column| builder.write::(column, i)); + builder.append_offset(); + } + Ok(ColumnarValue::Array(Arc::new(builder.finish(None)))) } /// Concatenates all but the first argument, with separators. The first argument is used as the separator string, and should not be NULL. Other NULL arguments are ignored. /// concat_ws(',', 'abcde', 2, NULL, 22) = 'abcde,2,22' -pub fn concat_ws(args: &[ArrayRef]) -> Result { - // downcast all arguments to strings - let args = args - .iter() - .map(|e| as_string_array(e)) - .collect::>>()?; - +pub fn concat_ws(args: &[ColumnarValue]) -> Result { // do not accept 0 or 1 arguments. if args.len() < 2 { return exec_err!( @@ -115,28 +311,126 @@ pub fn concat_ws(args: &[ArrayRef]) -> Result { ); } - // first map is the iterator, second is for the `Option<_>` - let result = args[0] + let array_len = args .iter() - .enumerate() - .map(|(index, x)| { - x.map(|sep: &str| { - let string_vec = args[1..] - .iter() - .flat_map(|arg| { - if !arg.is_null(index) { - Some(arg.value(index)) - } else { - None - } - }) - .collect::>(); - string_vec.join(sep) - }) + .filter_map(|x| match x { + ColumnarValue::Array(array) => Some(array.len()), + _ => None, }) - .collect::(); + .next(); - Ok(Arc::new(result) as ArrayRef) + // Scalar + if array_len.is_none() { + let sep = match &args[0] { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s, + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => { + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))); + } + _ => unreachable!(), + }; + + let mut result = String::new(); + let iter = &mut args[1..].iter(); + + while let Some(arg) = iter.next() { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + result.push_str(s); + break; + } + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + _ => unreachable!(), + } + } + + while let Some(arg) = iter.next() { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + result.push_str(sep); + result.push_str(s); + } + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + _ => unreachable!(), + } + } + + return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(result)))); + } + + // Array + let len = array_len.unwrap(); + let mut data_size = 0; + + // parse sep + let sep = match &args[0] { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + data_size += s.len() * len * (args.len() - 2); // estimate + ColumnarValueRef::Scalar(s.as_bytes()) + } + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => { + return Ok(ColumnarValue::Array(Arc::new(StringArray::new_null(len)))); + } + ColumnarValue::Array(array) => { + let string_array = as_string_array(array)?; + data_size += string_array.values().len() * (args.len() - 2); // estimate + if array.is_nullable() { + ColumnarValueRef::NullableArray(string_array) + } else { + ColumnarValueRef::NonNullableArray(string_array) + } + } + _ => unreachable!(), + }; + + let mut columns = Vec::with_capacity(args.len() - 1); + for arg in &args[1..] { + match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(maybe_value)) => { + if let Some(s) = maybe_value { + data_size += s.len() * len; + columns.push(ColumnarValueRef::Scalar(s.as_bytes())); + } + } + ColumnarValue::Array(array) => { + let string_array = as_string_array(array)?; + data_size += string_array.values().len(); + let column = if array.is_nullable() { + ColumnarValueRef::NullableArray(string_array) + } else { + ColumnarValueRef::NonNullableArray(string_array) + }; + columns.push(column); + } + _ => unreachable!(), + } + } + + let mut builder = StringArrayBuilder::with_capacity(len, data_size); + for i in 0..len { + if !sep.is_valid(i) { + builder.append_offset(); + continue; + } + + let mut iter = columns.iter(); + while let Some(column) = iter.next() { + if column.is_valid(i) { + builder.write::(column, i); + break; + } + } + + while let Some(column) = iter.next() { + if column.is_valid(i) { + builder.write::(&sep, i); + builder.write::(column, i); + } + } + + builder.append_offset(); + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())))) } /// Converts the first letter of each word to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by non-alphanumeric characters. @@ -234,3 +528,343 @@ pub fn ends_with(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } + +/// Converts the number to its equivalent hexadecimal representation. +/// to_hex(2147483647) = '7fffffff' +pub fn to_hex(args: &[ArrayRef]) -> Result +where + T::Native: OffsetSizeTrait, +{ + let integer_array = as_primitive_array::(&args[0])?; + + let result = integer_array + .iter() + .map(|integer| { + if let Some(value) = integer { + if let Some(value_usize) = value.to_usize() { + Ok(Some(format!("{value_usize:x}"))) + } else if let Some(value_isize) = value.to_isize() { + Ok(Some(format!("{value_isize:x}"))) + } else { + exec_err!("Unsupported data type {integer:?} for function to_hex") + } + } else { + Ok(None) + } + }) + .collect::>>()?; + + Ok(Arc::new(result) as ArrayRef) +} + +/// Converts the string to all upper case. +/// upper('tom') = 'TOM' +pub fn upper(args: &[ColumnarValue]) -> Result { + handle(args, |string| string.to_uppercase(), "upper") +} + +/// Prints random (v4) uuid values per row +/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' +pub fn uuid(args: &[ColumnarValue]) -> Result { + let len: usize = match &args[0] { + ColumnarValue::Array(array) => array.len(), + _ => return exec_err!("Expect uuid function to take no param"), + }; + + let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); + let array = GenericStringArray::::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) +} + +/// OVERLAY(string1 PLACING string2 FROM integer FOR integer2) +/// Replaces a substring of string1 with string2 starting at the integer bit +/// pgsql overlay('Txxxxas' placing 'hom' from 2 for 4) → Thomas +/// overlay('Txxxxas' placing 'hom' from 2) -> Thomxas, without for option, str2's len is instead +pub fn overlay(args: &[ArrayRef]) -> Result { + match args.len() { + 3 => { + let string_array = as_generic_string_array::(&args[0])?; + let characters_array = as_generic_string_array::(&args[1])?; + let pos_num = as_int64_array(&args[2])?; + + let result = string_array + .iter() + .zip(characters_array.iter()) + .zip(pos_num.iter()) + .map(|((string, characters), start_pos)| { + match (string, characters, start_pos) { + (Some(string), Some(characters), Some(start_pos)) => { + let string_len = string.chars().count(); + let characters_len = characters.chars().count(); + let replace_len = characters_len as i64; + let mut res = + String::with_capacity(string_len.max(characters_len)); + + //as sql replace index start from 1 while string index start from 0 + if start_pos > 1 && start_pos - 1 < string_len as i64 { + let start = (start_pos - 1) as usize; + res.push_str(&string[..start]); + } + res.push_str(characters); + // if start + replace_len - 1 >= string_length, just to string end + if start_pos + replace_len - 1 < string_len as i64 { + let end = (start_pos + replace_len - 1) as usize; + res.push_str(&string[end..]); + } + Ok(Some(res)) + } + _ => Ok(None), + } + }) + .collect::>>()?; + Ok(Arc::new(result) as ArrayRef) + } + 4 => { + let string_array = as_generic_string_array::(&args[0])?; + let characters_array = as_generic_string_array::(&args[1])?; + let pos_num = as_int64_array(&args[2])?; + let len_num = as_int64_array(&args[3])?; + + let result = string_array + .iter() + .zip(characters_array.iter()) + .zip(pos_num.iter()) + .zip(len_num.iter()) + .map(|(((string, characters), start_pos), len)| { + match (string, characters, start_pos, len) { + (Some(string), Some(characters), Some(start_pos), Some(len)) => { + let string_len = string.chars().count(); + let characters_len = characters.chars().count(); + let replace_len = len.min(string_len as i64); + let mut res = + String::with_capacity(string_len.max(characters_len)); + + //as sql replace index start from 1 while string index start from 0 + if start_pos > 1 && start_pos - 1 < string_len as i64 { + let start = (start_pos - 1) as usize; + res.push_str(&string[..start]); + } + res.push_str(characters); + // if start + replace_len - 1 >= string_length, just to string end + if start_pos + replace_len - 1 < string_len as i64 { + let end = (start_pos + replace_len - 1) as usize; + res.push_str(&string[end..]); + } + Ok(Some(res)) + } + _ => Ok(None), + } + }) + .collect::>>()?; + Ok(Arc::new(result) as ArrayRef) + } + other => { + exec_err!("overlay was called with {other} arguments. It requires 3 or 4.") + } + } +} + +///Returns the Levenshtein distance between the two given strings. +/// LEVENSHTEIN('kitten', 'sitting') = 3 +pub fn levenshtein(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return exec_err!( + "levenshtein function requires two arguments, got {}", + args.len() + ); + } + let str1_array = as_generic_string_array::(&args[0])?; + let str2_array = as_generic_string_array::(&args[1])?; + match args[0].data_type() { + DataType::Utf8 => { + let result = str1_array + .iter() + .zip(str2_array.iter()) + .map(|(string1, string2)| match (string1, string2) { + (Some(string1), Some(string2)) => { + Some(datafusion_strsim::levenshtein(string1, string2) as i32) + } + _ => None, + }) + .collect::(); + Ok(Arc::new(result) as ArrayRef) + } + DataType::LargeUtf8 => { + let result = str1_array + .iter() + .zip(str2_array.iter()) + .map(|(string1, string2)| match (string1, string2) { + (Some(string1), Some(string2)) => { + Some(datafusion_strsim::levenshtein(string1, string2) as i64) + } + _ => None, + }) + .collect::(); + Ok(Arc::new(result) as ArrayRef) + } + other => { + exec_err!( + "levenshtein was called with {other} datatype arguments. It requires Utf8 or LargeUtf8." + ) + } + } +} + +#[cfg(test)] +mod tests { + use arrow::{array::Int32Array, datatypes::Int32Type}; + use arrow_array::Int64Array; + + use datafusion_common::cast::as_int32_array; + + use crate::string_expressions; + + use super::*; + + #[test] + fn concat() -> Result<()> { + let c0 = + ColumnarValue::Array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); + let c1 = ColumnarValue::Scalar(ScalarValue::Utf8(Some(",".to_string()))); + let c2 = ColumnarValue::Array(Arc::new(StringArray::from(vec![ + Some("x"), + None, + Some("z"), + ]))); + let args = &[c0, c1, c2]; + + let result = super::concat(args)?; + let expected = + Arc::new(StringArray::from(vec!["foo,x", "bar,", "baz,z"])) as ArrayRef; + match &result { + ColumnarValue::Array(array) => { + assert_eq!(&expected, array); + } + _ => panic!(), + } + Ok(()) + } + + #[test] + fn concat_ws() -> Result<()> { + // sep is scalar + let c0 = ColumnarValue::Scalar(ScalarValue::Utf8(Some(",".to_string()))); + let c1 = + ColumnarValue::Array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); + let c2 = ColumnarValue::Array(Arc::new(StringArray::from(vec![ + Some("x"), + None, + Some("z"), + ]))); + let args = &[c0, c1, c2]; + + let result = super::concat_ws(args)?; + let expected = + Arc::new(StringArray::from(vec!["foo,x", "bar", "baz,z"])) as ArrayRef; + match &result { + ColumnarValue::Array(array) => { + assert_eq!(&expected, array); + } + _ => panic!(), + } + + // sep is nullable array + let c0 = ColumnarValue::Array(Arc::new(StringArray::from(vec![ + Some(","), + None, + Some("+"), + ]))); + let c1 = + ColumnarValue::Array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); + let c2 = ColumnarValue::Array(Arc::new(StringArray::from(vec![ + Some("x"), + Some("y"), + Some("z"), + ]))); + let args = &[c0, c1, c2]; + + let result = super::concat_ws(args)?; + let expected = + Arc::new(StringArray::from(vec![Some("foo,x"), None, Some("baz+z")])) + as ArrayRef; + match &result { + ColumnarValue::Array(array) => { + assert_eq!(&expected, array); + } + _ => panic!(), + } + + Ok(()) + } + + #[test] + // Test to_hex function for zero + fn to_hex_zero() -> Result<()> { + let array = vec![0].into_iter().collect::(); + let array_ref = Arc::new(array); + let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; + let hex_value = as_string_array(&hex_value_arc)?; + let expected = StringArray::from(vec![Some("0")]); + assert_eq!(&expected, hex_value); + + Ok(()) + } + + #[test] + // Test to_hex function for positive number + fn to_hex_positive_number() -> Result<()> { + let array = vec![100].into_iter().collect::(); + let array_ref = Arc::new(array); + let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; + let hex_value = as_string_array(&hex_value_arc)?; + let expected = StringArray::from(vec![Some("64")]); + assert_eq!(&expected, hex_value); + + Ok(()) + } + + #[test] + // Test to_hex function for negative number + fn to_hex_negative_number() -> Result<()> { + let array = vec![-1].into_iter().collect::(); + let array_ref = Arc::new(array); + let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; + let hex_value = as_string_array(&hex_value_arc)?; + let expected = StringArray::from(vec![Some("ffffffffffffffff")]); + assert_eq!(&expected, hex_value); + + Ok(()) + } + + #[test] + fn to_overlay() -> Result<()> { + let string = + Arc::new(StringArray::from(vec!["123", "abcdefg", "xyz", "Txxxxas"])); + let replace_string = + Arc::new(StringArray::from(vec!["abc", "qwertyasdfg", "ijk", "hom"])); + let start = Arc::new(Int64Array::from(vec![4, 1, 1, 2])); // start + let end = Arc::new(Int64Array::from(vec![5, 7, 2, 4])); // replace len + + let res = overlay::(&[string, replace_string, start, end]).unwrap(); + let result = as_generic_string_array::(&res).unwrap(); + let expected = StringArray::from(vec!["abc", "qwertyasdfg", "ijkz", "Thomas"]); + assert_eq!(&expected, result); + + Ok(()) + } + + #[test] + fn to_levenshtein() -> Result<()> { + let string1_array = + Arc::new(StringArray::from(vec!["123", "abc", "xyz", "kitten"])); + let string2_array = + Arc::new(StringArray::from(vec!["321", "def", "zyx", "sitting"])); + let res = levenshtein::(&[string1_array, string2_array]).unwrap(); + let result = + as_int32_array(&res).expect("failed to initialized function levenshtein"); + let expected = Int32Array::from(vec![2, 3, 2, 3]); + assert_eq!(&expected, result); + + Ok(()) + } +} From 625ee694a0f663539433ea83a861e02f6c397efb Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Sat, 30 Mar 2024 21:37:22 +0800 Subject: [PATCH 2/4] change Cargo.toml --- datafusion/physical-expr/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 9b2adb03b066..f9feb2c9a114 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -48,7 +48,7 @@ unicode_expressions = ["unicode-segmentation"] ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -arrow = { workspace = true, features = ["test_utils"] } +arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-ord = { workspace = true } @@ -76,6 +76,7 @@ sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } [dev-dependencies] +arrow = { workspace = true, features = ["test_utils"] } criterion = "0.5" rand = { workspace = true } rstest = { workspace = true } From 9d985c150e466ace6a8d0a749b1412c01f5d8222 Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Sat, 30 Mar 2024 22:04:12 +0800 Subject: [PATCH 3/4] pass cargo clippy --- .../physical-expr/src/string_expressions.rs | 391 +----------------- 1 file changed, 5 insertions(+), 386 deletions(-) diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index 7e39ed8d6cfb..38d1ae58f5b3 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -32,7 +32,6 @@ use arrow::{ datatypes::DataType, }; use arrow_buffer::{MutableBuffer, NullBuffer}; -use uuid::Uuid; use datafusion_common::Result; use datafusion_common::{ @@ -41,127 +40,6 @@ use datafusion_common::{ }; use datafusion_expr::ColumnarValue; -/// applies a unary expression to `args[0]` that is expected to be downcastable to -/// a `GenericStringArray` and returns a `GenericStringArray` (which may have a different offset) -/// # Errors -/// This function errors when: -/// * the number of arguments is not 1 -/// * the first argument is not castable to a `GenericStringArray` -pub(crate) fn unary_string_function<'a, T, O, F, R>( - args: &[&'a dyn Array], - op: F, - name: &str, -) -> Result> -where - R: AsRef, - O: OffsetSizeTrait, - T: OffsetSizeTrait, - F: Fn(&'a str) -> R, -{ - if args.len() != 1 { - return exec_err!( - "{:?} args were supplied but {} takes exactly one argument", - args.len(), - name - ); - } - - let string_array = as_generic_string_array::(args[0])?; - - // first map is the iterator, second is for the `Option<_>` - Ok(string_array.iter().map(|string| string.map(&op)).collect()) -} - -fn handle<'a, F, R>(args: &'a [ColumnarValue], op: F, name: &str) -> Result -where - R: AsRef, - F: Fn(&'a str) -> R, -{ - match &args[0] { - ColumnarValue::Array(a) => match a.data_type() { - DataType::Utf8 => { - Ok(ColumnarValue::Array(Arc::new(unary_string_function::< - i32, - i32, - _, - _, - >( - &[a.as_ref()], op, name - )?))) - } - DataType::LargeUtf8 => { - Ok(ColumnarValue::Array(Arc::new(unary_string_function::< - i64, - i64, - _, - _, - >( - &[a.as_ref()], op, name - )?))) - } - other => exec_err!("Unsupported data type {other:?} for function {name}"), - }, - ColumnarValue::Scalar(scalar) => match scalar { - ScalarValue::Utf8(a) => { - let result = a.as_ref().map(|x| (op)(x).as_ref().to_string()); - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result))) - } - ScalarValue::LargeUtf8(a) => { - let result = a.as_ref().map(|x| (op)(x).as_ref().to_string()); - Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(result))) - } - other => exec_err!("Unsupported data type {other:?} for function {name}"), - }, - } -} - -/// Returns the numeric code of the first character of the argument. -/// ascii('x') = 120 -pub fn ascii(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; - - let result = string_array - .iter() - .map(|string| { - string.map(|string: &str| { - let mut chars = string.chars(); - chars.next().map_or(0, |v| v as i32) - }) - }) - .collect::(); - - Ok(Arc::new(result) as ArrayRef) -} - -/// Returns the character with the given code. chr(0) is disallowed because text data types cannot store that character. -/// chr(65) = 'A' -pub fn chr(args: &[ArrayRef]) -> Result { - let integer_array = as_int64_array(&args[0])?; - - // first map is the iterator, second is for the `Option<_>` - let result = integer_array - .iter() - .map(|integer: Option| { - integer - .map(|integer| { - if integer == 0 { - exec_err!("null character not permitted.") - } else { - match core::char::from_u32(integer as u32) { - Some(integer) => Ok(integer.to_string()), - None => { - exec_err!("requested character too large for encoding.") - } - } - } - }) - .transpose() - }) - .collect::>()?; - - Ok(Arc::new(result) as ArrayRef) -} - enum ColumnarValueRef<'a> { Scalar(&'a [u8]), NullableArray(&'a StringArray), @@ -181,7 +59,7 @@ impl<'a> ColumnarValueRef<'a> { fn nulls(&self) -> Option { match &self { Self::Scalar(_) | Self::NonNullableArray(_) => None, - Self::NullableArray(array) => array.nulls().map(|b| b.clone()), + Self::NullableArray(array) => array.nulls().cloned(), } } } @@ -332,7 +210,7 @@ pub fn concat_ws(args: &[ColumnarValue]) -> Result { let mut result = String::new(); let iter = &mut args[1..].iter(); - while let Some(arg) = iter.next() { + for arg in iter.by_ref() { match arg { ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { result.push_str(s); @@ -343,7 +221,7 @@ pub fn concat_ws(args: &[ColumnarValue]) -> Result { } } - while let Some(arg) = iter.next() { + for arg in iter.by_ref() { match arg { ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { result.push_str(sep); @@ -413,14 +291,14 @@ pub fn concat_ws(args: &[ColumnarValue]) -> Result { } let mut iter = columns.iter(); - while let Some(column) = iter.next() { + for column in iter.by_ref() { if column.is_valid(i) { builder.write::(column, i); break; } } - while let Some(column) = iter.next() { + for column in iter { if column.is_valid(i) { builder.write::(&sep, i); builder.write::(column, i); @@ -529,196 +407,8 @@ pub fn ends_with(args: &[ArrayRef]) -> Result { Ok(Arc::new(result) as ArrayRef) } -/// Converts the number to its equivalent hexadecimal representation. -/// to_hex(2147483647) = '7fffffff' -pub fn to_hex(args: &[ArrayRef]) -> Result -where - T::Native: OffsetSizeTrait, -{ - let integer_array = as_primitive_array::(&args[0])?; - - let result = integer_array - .iter() - .map(|integer| { - if let Some(value) = integer { - if let Some(value_usize) = value.to_usize() { - Ok(Some(format!("{value_usize:x}"))) - } else if let Some(value_isize) = value.to_isize() { - Ok(Some(format!("{value_isize:x}"))) - } else { - exec_err!("Unsupported data type {integer:?} for function to_hex") - } - } else { - Ok(None) - } - }) - .collect::>>()?; - - Ok(Arc::new(result) as ArrayRef) -} - -/// Converts the string to all upper case. -/// upper('tom') = 'TOM' -pub fn upper(args: &[ColumnarValue]) -> Result { - handle(args, |string| string.to_uppercase(), "upper") -} - -/// Prints random (v4) uuid values per row -/// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' -pub fn uuid(args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect uuid function to take no param"), - }; - - let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); - let array = GenericStringArray::::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) -} - -/// OVERLAY(string1 PLACING string2 FROM integer FOR integer2) -/// Replaces a substring of string1 with string2 starting at the integer bit -/// pgsql overlay('Txxxxas' placing 'hom' from 2 for 4) → Thomas -/// overlay('Txxxxas' placing 'hom' from 2) -> Thomxas, without for option, str2's len is instead -pub fn overlay(args: &[ArrayRef]) -> Result { - match args.len() { - 3 => { - let string_array = as_generic_string_array::(&args[0])?; - let characters_array = as_generic_string_array::(&args[1])?; - let pos_num = as_int64_array(&args[2])?; - - let result = string_array - .iter() - .zip(characters_array.iter()) - .zip(pos_num.iter()) - .map(|((string, characters), start_pos)| { - match (string, characters, start_pos) { - (Some(string), Some(characters), Some(start_pos)) => { - let string_len = string.chars().count(); - let characters_len = characters.chars().count(); - let replace_len = characters_len as i64; - let mut res = - String::with_capacity(string_len.max(characters_len)); - - //as sql replace index start from 1 while string index start from 0 - if start_pos > 1 && start_pos - 1 < string_len as i64 { - let start = (start_pos - 1) as usize; - res.push_str(&string[..start]); - } - res.push_str(characters); - // if start + replace_len - 1 >= string_length, just to string end - if start_pos + replace_len - 1 < string_len as i64 { - let end = (start_pos + replace_len - 1) as usize; - res.push_str(&string[end..]); - } - Ok(Some(res)) - } - _ => Ok(None), - } - }) - .collect::>>()?; - Ok(Arc::new(result) as ArrayRef) - } - 4 => { - let string_array = as_generic_string_array::(&args[0])?; - let characters_array = as_generic_string_array::(&args[1])?; - let pos_num = as_int64_array(&args[2])?; - let len_num = as_int64_array(&args[3])?; - - let result = string_array - .iter() - .zip(characters_array.iter()) - .zip(pos_num.iter()) - .zip(len_num.iter()) - .map(|(((string, characters), start_pos), len)| { - match (string, characters, start_pos, len) { - (Some(string), Some(characters), Some(start_pos), Some(len)) => { - let string_len = string.chars().count(); - let characters_len = characters.chars().count(); - let replace_len = len.min(string_len as i64); - let mut res = - String::with_capacity(string_len.max(characters_len)); - - //as sql replace index start from 1 while string index start from 0 - if start_pos > 1 && start_pos - 1 < string_len as i64 { - let start = (start_pos - 1) as usize; - res.push_str(&string[..start]); - } - res.push_str(characters); - // if start + replace_len - 1 >= string_length, just to string end - if start_pos + replace_len - 1 < string_len as i64 { - let end = (start_pos + replace_len - 1) as usize; - res.push_str(&string[end..]); - } - Ok(Some(res)) - } - _ => Ok(None), - } - }) - .collect::>>()?; - Ok(Arc::new(result) as ArrayRef) - } - other => { - exec_err!("overlay was called with {other} arguments. It requires 3 or 4.") - } - } -} - -///Returns the Levenshtein distance between the two given strings. -/// LEVENSHTEIN('kitten', 'sitting') = 3 -pub fn levenshtein(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return exec_err!( - "levenshtein function requires two arguments, got {}", - args.len() - ); - } - let str1_array = as_generic_string_array::(&args[0])?; - let str2_array = as_generic_string_array::(&args[1])?; - match args[0].data_type() { - DataType::Utf8 => { - let result = str1_array - .iter() - .zip(str2_array.iter()) - .map(|(string1, string2)| match (string1, string2) { - (Some(string1), Some(string2)) => { - Some(datafusion_strsim::levenshtein(string1, string2) as i32) - } - _ => None, - }) - .collect::(); - Ok(Arc::new(result) as ArrayRef) - } - DataType::LargeUtf8 => { - let result = str1_array - .iter() - .zip(str2_array.iter()) - .map(|(string1, string2)| match (string1, string2) { - (Some(string1), Some(string2)) => { - Some(datafusion_strsim::levenshtein(string1, string2) as i64) - } - _ => None, - }) - .collect::(); - Ok(Arc::new(result) as ArrayRef) - } - other => { - exec_err!( - "levenshtein was called with {other} datatype arguments. It requires Utf8 or LargeUtf8." - ) - } - } -} - #[cfg(test)] mod tests { - use arrow::{array::Int32Array, datatypes::Int32Type}; - use arrow_array::Int64Array; - - use datafusion_common::cast::as_int32_array; - - use crate::string_expressions; - use super::*; #[test] @@ -796,75 +486,4 @@ mod tests { Ok(()) } - - #[test] - // Test to_hex function for zero - fn to_hex_zero() -> Result<()> { - let array = vec![0].into_iter().collect::(); - let array_ref = Arc::new(array); - let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; - let hex_value = as_string_array(&hex_value_arc)?; - let expected = StringArray::from(vec![Some("0")]); - assert_eq!(&expected, hex_value); - - Ok(()) - } - - #[test] - // Test to_hex function for positive number - fn to_hex_positive_number() -> Result<()> { - let array = vec![100].into_iter().collect::(); - let array_ref = Arc::new(array); - let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; - let hex_value = as_string_array(&hex_value_arc)?; - let expected = StringArray::from(vec![Some("64")]); - assert_eq!(&expected, hex_value); - - Ok(()) - } - - #[test] - // Test to_hex function for negative number - fn to_hex_negative_number() -> Result<()> { - let array = vec![-1].into_iter().collect::(); - let array_ref = Arc::new(array); - let hex_value_arc = string_expressions::to_hex::(&[array_ref])?; - let hex_value = as_string_array(&hex_value_arc)?; - let expected = StringArray::from(vec![Some("ffffffffffffffff")]); - assert_eq!(&expected, hex_value); - - Ok(()) - } - - #[test] - fn to_overlay() -> Result<()> { - let string = - Arc::new(StringArray::from(vec!["123", "abcdefg", "xyz", "Txxxxas"])); - let replace_string = - Arc::new(StringArray::from(vec!["abc", "qwertyasdfg", "ijk", "hom"])); - let start = Arc::new(Int64Array::from(vec![4, 1, 1, 2])); // start - let end = Arc::new(Int64Array::from(vec![5, 7, 2, 4])); // replace len - - let res = overlay::(&[string, replace_string, start, end]).unwrap(); - let result = as_generic_string_array::(&res).unwrap(); - let expected = StringArray::from(vec!["abc", "qwertyasdfg", "ijkz", "Thomas"]); - assert_eq!(&expected, result); - - Ok(()) - } - - #[test] - fn to_levenshtein() -> Result<()> { - let string1_array = - Arc::new(StringArray::from(vec!["123", "abc", "xyz", "kitten"])); - let string2_array = - Arc::new(StringArray::from(vec!["321", "def", "zyx", "sitting"])); - let res = levenshtein::(&[string1_array, string2_array]).unwrap(); - let result = - as_int32_array(&res).expect("failed to initialized function levenshtein"); - let expected = Int32Array::from(vec![2, 3, 2, 3]); - assert_eq!(&expected, result); - - Ok(()) - } } From 078c2d7d819afb7870ecc6550553d5858428269b Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Thu, 4 Apr 2024 12:23:26 +0800 Subject: [PATCH 4/4] chore: add annotation --- datafusion/physical-expr/src/string_expressions.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index 38d1ae58f5b3..fd6c8eb6b1d9 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -64,6 +64,9 @@ impl<'a> ColumnarValueRef<'a> { } } +/// Optimized version of the StringBuilder in Arrow that: +/// 1. Precalculating the expected length of the result, avoiding reallocations. +/// 2. Avoids creating / incrementally creating a `NullBufferBuilder` struct StringArrayBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, @@ -74,6 +77,7 @@ impl StringArrayBuilder { let mut offsets_buffer = MutableBuffer::with_capacity( (item_capacity + 1) * std::mem::size_of::(), ); + // SAFETY: the first offset value is definitely not going to exceed the bounds. unsafe { offsets_buffer.push_unchecked(0_i32) }; Self { offsets_buffer, @@ -114,6 +118,8 @@ impl StringArrayBuilder { .add_buffer(self.offsets_buffer.into()) .add_buffer(self.value_buffer.into()) .nulls(null_buffer); + // SAFETY: all data that was appended was valid UTF8 and the values + // and offsets were created correctly let array_data = unsafe { array_builder.build_unchecked() }; StringArray::from(array_data) }