Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10330: [Rust][DataFusion] Implement NULLIF() SQL function #8688

Closed
Closed
147 changes: 145 additions & 2 deletions rust/arrow/src/compute/kernels/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
//! `RUSTFLAGS="-C target-feature=+avx2"` for example. See the documentation
//! [here](https://doc.rust-lang.org/stable/core/arch/) for more information.

use std::ops::Not;
use std::sync::Arc;

use crate::array::{Array, ArrayData, BooleanArray};
use crate::array::{Array, ArrayData, BooleanArray, PrimitiveArray};
use crate::buffer::{
buffer_bin_and, buffer_bin_or, buffer_unary_not, Buffer, MutableBuffer,
};
use crate::compute::util::combine_option_bitmap;
use crate::datatypes::DataType;
use crate::datatypes::{ArrowNumericType, DataType};
use crate::error::{ArrowError, Result};
use crate::util::bit_util::ceil;

Expand Down Expand Up @@ -223,6 +224,102 @@ pub fn is_not_null(input: &Array) -> Result<BooleanArray> {
Ok(BooleanArray::from(Arc::new(data)))
}

/// Copies original array, setting null bit to true if a secondary comparison boolean array is set to true.
/// Typically used to implement NULLIF.
// NOTE: For now this only supports Primitive Arrays. Although the code could be made generic, the issue
// is that currently the bitmap operations result in a final bitmap which is aligned to bit 0, and thus
// the left array's data needs to be sliced to a new offset, and for non-primitive arrays shifting the
// data might be too complicated. In the future, to avoid shifting left array's data, we could instead
// shift the final bitbuffer to the right, prepending with 0's instead.
pub fn nullif<T>(
left: &PrimitiveArray<T>,
velvia marked this conversation as resolved.
Show resolved Hide resolved
right: &BooleanArray,
) -> Result<PrimitiveArray<T>>
where
T: ArrowNumericType,
{
if left.len() != right.len() {
return Err(ArrowError::ComputeError(
"Cannot perform comparison operation on arrays of different length"
.to_string(),
));
}
let left_data = left.data();
let right_data = right.data();

// If left has no bitmap, create a new one with all values set for nullity op later
// left=0 (null) right=null output bitmap=null
// left=0 right=1 output bitmap=null
// left=1 (set) right=null output bitmap=set (passthrough)
// left=1 right=1 & comp=true output bitmap=null
// left=1 right=1 & comp=false output bitmap=set
//
// Thus: result = left null bitmap & (!right_values | !right_bitmap)
// OR left null bitmap & !(right_values & right_bitmap)
//
// Do the right expression !(right_values & right_bitmap) first since there are two steps
// TRICK: convert BooleanArray buffer as a bitmap for faster operation
let right_combo_buffer = match right.data().null_bitmap() {
Some(right_bitmap) => {
// NOTE: right values and bitmaps are combined and stay at bit offset right.offset()
(&right.values() & &right_bitmap.bits).ok().map(|b| b.not())
}
None => Some(!&right.values()),
};

// AND of original left null bitmap with right expression
// Here we take care of the possible offsets of the left and right arrays all at once.
let modified_null_buffer = match left_data.null_bitmap() {
Some(left_null_bitmap) => match right_combo_buffer {
Some(rcb) => Some(buffer_bin_and(
&left_null_bitmap.bits,
left_data.offset(),
&rcb,
right_data.offset(),
left_data.len(),
)),
None => Some(
left_null_bitmap
.bits
.bit_slice(left_data.offset(), left.len()),
),
},
None => right_combo_buffer
.map(|rcb| rcb.bit_slice(right_data.offset(), right_data.len())),
};

// Align/shift left data on offset as needed, since new bitmaps are shifted and aligned to 0 already
// NOTE: this probably only works for primitive arrays.
let data_buffers = if left.offset() == 0 {
left_data.buffers().to_vec()
} else {
// Shift each data buffer by type's bit_width * offset.
left_data
.buffers()
.iter()
.map(|buf| {
buf.bit_slice(
left.offset() * T::get_bit_width(),
left.len() * T::get_bit_width(),
)
})
.collect::<Vec<_>>()
};

// Construct new array with same values but modified null bitmap
// TODO: shift data buffer as needed
let data = ArrayData::new(
T::DATA_TYPE,
left.len(),
None, // force new to compute the number of null bits
modified_null_buffer,
0, // No need for offset since left data has been shifted
data_buffers,
left_data.child_data().to_vec(),
);
Ok(PrimitiveArray::<T>::from(Arc::new(data)))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -585,4 +682,50 @@ mod tests {
assert_eq!(expected, res);
assert_eq!(&None, res.data_ref().null_bitmap());
}

velvia marked this conversation as resolved.
Show resolved Hide resolved
#[test]
fn test_nullif_int_array() {
let a = Int32Array::from(vec![Some(15), None, Some(8), Some(1), Some(9)]);
let comp =
BooleanArray::from(vec![Some(false), None, Some(true), Some(false), None]);
let res = nullif(&a, &comp).unwrap();

let expected = Int32Array::from(vec![
Some(15),
None,
None, // comp true, slot 2 turned into null
Some(1),
// Even though comp array / right is null, should still pass through original value
// comp true, slot 2 turned into null
Some(9),
]);

assert_eq!(expected, res);
}

#[test]
fn test_nullif_int_array_offset() {
let a = Int32Array::from(vec![None, Some(15), Some(8), Some(1), Some(9)]);
let a = a.slice(1, 3); // Some(15), Some(8), Some(1)
let a = a.as_any().downcast_ref::<Int32Array>().unwrap();
let comp = BooleanArray::from(vec![
Some(false),
Some(false),
Some(false),
None,
Some(true),
Some(false),
None,
]);
let comp = comp.slice(2, 3); // Some(false), None, Some(true)
let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
let res = nullif(&a, &comp).unwrap();

let expected = Int32Array::from(vec![
Some(15), // False => keep it
Some(8), // None => keep it
None, // true => None
]);
assert_eq!(&expected, &res)
}
}
2 changes: 2 additions & 0 deletions rust/datafusion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ DataFusion includes a simple command-line interactive SQL utility. See the [CLI
- String functions
- [x] Length
- [x] Concatenate
- Miscellaneous/Boolean functions
- [x] nullif
- Common date/time functions
- [ ] Basic date functions
- [ ] Basic time functions
Expand Down
146 changes: 144 additions & 2 deletions rust/datafusion/src/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow::array::{self, Array, BooleanBuilder, LargeStringArray};
use arrow::compute;
use arrow::compute::kernels;
use arrow::compute::kernels::arithmetic::{add, divide, multiply, subtract};
use arrow::compute::kernels::boolean::{and, or};
use arrow::compute::kernels::boolean::{and, nullif, or};
use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow::compute::kernels::comparison::{
eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
Expand Down Expand Up @@ -1535,6 +1535,80 @@ pub fn binary(
Ok(Arc::new(BinaryExpr::new(l, op, r)))
}

/// Invoke a compute kernel on a primitive array and a Boolean Array
macro_rules! compute_bool_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
.expect("compute_op failed to downcast array");
let rr = $RIGHT
.as_any()
.downcast_ref::<BooleanArray>()
.expect("compute_op failed to downcast array");
Ok(Arc::new($OP(&ll, &rr)?))
}};
}

/// Binary op between primitive and boolean arrays
macro_rules! primitive_bool_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
DataType::Int8 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Int8Array),
DataType::Int16 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Int16Array),
DataType::Int32 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Int32Array),
DataType::Int64 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Int64Array),
DataType::UInt8 => compute_bool_array_op!($LEFT, $RIGHT, $OP, UInt8Array),
DataType::UInt16 => compute_bool_array_op!($LEFT, $RIGHT, $OP, UInt16Array),
DataType::UInt32 => compute_bool_array_op!($LEFT, $RIGHT, $OP, UInt32Array),
DataType::UInt64 => compute_bool_array_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_bool_array_op!($LEFT, $RIGHT, $OP, Float64Array),
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for NULLIF/primitive/boolean operator",
other
))),
}
}};
}

///
/// Implements NULLIF(expr1, expr2)
/// Args: 0 - left expr is any array
/// 1 - if the left is equal to this expr2, then the result is NULL, otherwise left value is passed.
///
pub fn nullif_func(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.len() != 2 {
return Err(DataFusionError::Internal(format!(
"{:?} args were supplied but NULLIF takes exactly two args",
args.len(),
)));
}

// Get args0 == args1 evaluated and produce a boolean array
let cond_array = binary_array_op!(args[0], args[1], eq)?;

// Now, invoke nullif on the result
primitive_bool_array_op!(args[0], *cond_array, nullif)
}

/// Currently supported types by the nullif function.
/// The order of these types correspond to the order on which coercion applies
/// This should thus be from least informative to most informative
pub static SUPPORTED_NULLIF_TYPES: &'static [DataType] = &[
DataType::Boolean,
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
DataType::Int8,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
];
Comment on lines +1598 to +1610
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to make these trait bounds with a good comment about how these are selected. I didn't understand:

/// The order of these types correspond to the order on which coercion applies

Can you explain it?

Copy link
Member

@jorgecarleitao jorgecarleitao Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to make these trait bounds with a good comment about how these are selected

AFAIK these cannot be trait bounds because logical and physical planning is dynamically typed.

In this case, this is enumerating all valid types that can be (dynamically) passed to the function. If someone tries to call this function with e.g. a ListArray, the logical planner will error with a description that this function does not support that type.

The order here matters because when a function is planned to be called with type X that is not supported by the function, the physical planner will try to (lossless) cast that type to a valid type for that functions, and it does so in the order of this array. In general these should be ordered from fastest to slowest (in the eyes of the implementation), so that the cast chooses the type with the fastest implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, in our private project at work, I have used type algebra definitions to not do these. For now, this can go like how it is, but later I can open a type algebra pr to convert all these to castability.

Copy link
Member

@jorgecarleitao jorgecarleitao Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is interesting. I would be interested in knowing what is the issue with the current implementation and why type algebra definitions should be used instead. Could you first introduce a proposal with the design e.g. on a google docs, before the PR? In DataFusion we have been doing that for larger changes to avoid committing to an implementation before some general agreement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh definitely will do, give me some time to wrap my head up.


/// Not expression
#[derive(Debug)]
pub struct NotExpr {
Expand Down Expand Up @@ -3151,6 +3225,70 @@ mod tests {
)
}

#[test]
fn nullif_int32() -> Result<()> {
let a = Int32Array::from(vec![
Some(1),
Some(2),
None,
None,
Some(3),
None,
None,
Some(4),
Some(5),
]);
let a = Arc::new(a);
let a_len = a.len();

let lit_array = Arc::new(Int32Array::from(vec![2; a.len()]));

let result = nullif_func(&[a.clone(), lit_array])?;

assert_eq!(result.len(), a_len);

let expected = Int32Array::from(vec![
Some(1),
None,
None,
None,
Some(3),
None,
None,
Some(4),
Some(5),
]);
assert_array_eq::<Int32Type>(expected, result);
Ok(())
}

#[test]
// Ensure that arrays with no nulls can also invoke NULLIF() correctly
fn nullif_int32_nonulls() -> Result<()> {
let a = Int32Array::from(vec![1, 3, 10, 7, 8, 1, 2, 4, 5]);
let a = Arc::new(a);
let a_len = a.len();

let lit_array = Arc::new(Int32Array::from(vec![1; a.len()]));

let result = nullif_func(&[a.clone(), lit_array])?;
assert_eq!(result.len(), a_len);

let expected = Int32Array::from(vec![
None,
Some(3),
Some(10),
Some(7),
Some(8),
None,
Some(2),
Some(4),
Some(5),
]);
assert_array_eq::<Int32Type>(expected, result);
Ok(())
}

fn aggregate(
batch: &RecordBatch,
agg: Arc<dyn AggregateExpr>,
Expand Down Expand Up @@ -3275,7 +3413,11 @@ mod tests {
.expect("Actual array should unwrap to type of expected array");

for i in 0..expected.len() {
assert_eq!(expected.value(i), actual.value(i));
if expected.is_null(i) {
assert!(actual.is_null(i));
} else {
assert_eq!(expected.value(i), actual.value(i));
}
}
}

Expand Down
Loading