Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Jan 26, 2022
1 parent 48ad975 commit 0e412eb
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 51 deletions.
93 changes: 50 additions & 43 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,97 +16,104 @@
# under the License.

[package]
name = "datafusion"
authors = ["Apache Arrow <[email protected]>"]
description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
version = "6.0.0"
edition = "2021"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
include = [
"benches/*.rs",
"src/**/*.rs",
"Cargo.toml",
"benches/*.rs",
"src/**/*.rs",
"Cargo.toml",
]
edition = "2021"
keywords = ["arrow", "query", "sql"]
license = "Apache-2.0"
name = "datafusion"
readme = "../README.md"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.58"
version = "6.0.0"

[lib]
name = "datafusion"
path = "src/lib.rs"

[features]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
simd = ["arrow/simd"]
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
pyarrow = ["pyo3", "arrow/pyarrow"]
regex_expressions = ["regex"]
simd = ["arrow/simd"]
unicode_expressions = ["unicode-segmentation"]
pyarrow = ["pyo3", "arrow/pyarrow"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
# Used to enable the avro format
avro = ["avro-rs", "num-traits"]

[dependencies]
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
arrow = { version = "8.0.0", features = ["prettyprint"] }
parquet = { version = "8.0.0", features = ["arrow"] }
sqlparser = "0.13"
paste = "^1.0"
num_cpus = "1.13.0"
chrono = { version = "0.4", default-features = false }
ahash = {version = "0.7", default-features = false}
arrow = {version = "8.0.0", features = ["prettyprint"]}
async-trait = "0.1.41"
avro-rs = {version = "0.13", features = ["snappy"], optional = true}
avro-rs = {version = "0.13", features = ["snappy"], optional = true}
blake2 = {version = "^0.9.2", optional = true}
blake2 = {version = "^0.10.2", optional = true}
blake3 = {version = "1.0", optional = true}
blake3 = {version = "1.0", optional = true}
chrono = {version = "0.4", default-features = false}
chrono = {version = "0.4", default-features = false}
futures = "0.3"
pin-project-lite= "^0.2.7"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
tokio-stream = "0.1"
hashbrown = {version = "0.12", features = ["raw"]}
hashbrown = {version = "0.11", features = ["raw"]}
lazy_static = {version = "^1.4.0"}
log = "^0.4"
md-5 = { version = "^0.10.0", optional = true }
sha2 = { version = "^0.10.1", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
md-5 = {version = "^0.10.0", optional = true}
num = "0.4"
num-traits = {version = "0.2", optional = true}
num_cpus = "1.13.0"
ordered-float = "2.0"
unicode-segmentation = { version = "^1.7.1", optional = true }
regex = { version = "^1.4.3", optional = true }
lazy_static = { version = "^1.4.0" }
smallvec = { version = "1.6", features = ["union"] }
parquet = {version = "8.0.0", features = ["arrow"]}
parquet = {version = "7.0.0", features = ["arrow"]}
paste = "^1.0"
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = {version = "0.14", optional = true}
pyo3 = {version = "0.15", optional = true}
rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.15", optional = true }
sha2 = {version = "^0.10.1", optional = true}
sqlparser = "0.13"
tempfile = "3"
tokio = {version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"]}
tokio-stream = "0.1"
unicode-segmentation = {version = "^1.7.1", optional = true}

[dev-dependencies]
criterion = "0.3"
doc-comment = "0.3"

[[bench]]
name = "aggregate_query_sql"
harness = false
name = "aggregate_query_sql"

[[bench]]
name = "sort_limit_query_sql"
harness = false
name = "sort_limit_query_sql"

[[bench]]
name = "math_query_sql"
harness = false
name = "math_query_sql"

[[bench]]
name = "filter_query_sql"
harness = false
name = "filter_query_sql"

[[bench]]
name = "window_query_sql"
harness = false
name = "window_query_sql"

[[bench]]
name = "scalar"
harness = false
name = "scalar"

[[bench]]
name = "physical_plan"
harness = false
name = "physical_plan"
136 changes: 128 additions & 8 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::TryInto;
use std::{any::Any, sync::Arc};

use arrow::array::TimestampMillisecondArray;
Expand All @@ -28,6 +29,18 @@ use arrow::compute::kernels::comparison::{
eq_bool, eq_bool_scalar, gt_bool, gt_bool_scalar, gt_eq_bool, gt_eq_bool_scalar,
lt_bool, lt_bool_scalar, lt_eq_bool, lt_eq_bool_scalar, neq_bool, neq_bool_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_bool_scalar, gt_dyn_bool_scalar, gt_eq_dyn_bool_scalar, lt_dyn_bool_scalar,
lt_eq_dyn_bool_scalar, neq_dyn_bool_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_scalar, gt_dyn_scalar, gt_eq_dyn_scalar, lt_dyn_scalar, lt_eq_dyn_scalar,
neq_dyn_scalar,
};
use arrow::compute::kernels::comparison::{
eq_dyn_utf8_scalar, gt_dyn_utf8_scalar, gt_eq_dyn_utf8_scalar, lt_dyn_utf8_scalar,
lt_eq_dyn_utf8_scalar, neq_dyn_utf8_scalar,
};
use arrow::compute::kernels::comparison::{
eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
};
Expand Down Expand Up @@ -429,6 +442,24 @@ macro_rules! compute_utf8_op_scalar {
}};
}

/// Invoke a compute kernel on a data array and a scalar value
macro_rules! compute_utf8_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
if let ScalarValue::Utf8(Some(string_value)) = $RIGHT {
Ok(Arc::new(paste::expr! {[<$OP _dyn_utf8_scalar>]}(
$LEFT,
&string_value,
)?))
} else {
Err(DataFusionError::Internal(format!(
"compute_utf8_op_scalar for '{}' failed to cast literal value {}",
stringify!($OP),
$RIGHT
)))
}
}};
}

/// Invoke a compute kernel on a boolean data array and a scalar value
macro_rules! compute_bool_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
Expand All @@ -446,6 +477,18 @@ macro_rules! compute_bool_op_scalar {
}};
}

/// Invoke a compute kernel on a boolean data array and a scalar value
macro_rules! compute_bool_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
// generate the scalar function name, such as lt_dyn_bool_scalar, from the $OP parameter
// (which could have a value of lt) and the suffix _scalar
Ok(Arc::new(paste::expr! {[<$OP _dyn_bool_scalar>]}(
$LEFT,
$RIGHT.try_into()?,
)?))
}};
}

/// Invoke a bool compute kernel on array(s)
macro_rules! compute_bool_op {
// invoke binary operator
Expand Down Expand Up @@ -474,7 +517,7 @@ macro_rules! compute_bool_op {
/// LEFT is array, RIGHT is scalar value
macro_rules! compute_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{
use std::convert::TryInto;
// use std::convert::TryInto;
let ll = $LEFT
.as_any()
.downcast_ref::<$DT>()
Expand All @@ -488,6 +531,19 @@ macro_rules! compute_op_scalar {
}};
}

/// Invoke a dyn compute kernel on a data array and a scalar value
/// LEFT is Primitive or Dictionart array of numeric values, RIGHT is scalar value
macro_rules! compute_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
// generate the scalar function name, such as lt_dyn_scalar, from the $OP parameter
// (which could have a value of lt_dyn) and the suffix _scalar
Ok(Arc::new(paste::expr! {[<$OP _dyn_scalar>]}(
$LEFT,
$RIGHT,
)?))
}};
}

/// Invoke a compute kernel on array(s)
macro_rules! compute_op {
// invoke binary operator
Expand Down Expand Up @@ -878,26 +934,90 @@ impl PhysicalExpr for BinaryExpr {
}
}

/// The binary_array_op_scalar macro includes types that extend beyond the primitive,
/// such as Utf8 strings.
#[macro_export]
macro_rules! binary_array_op_dyn_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
let is_numeric = DataType::is_numeric($LEFT.data_type());
let is_numeric_dict = match $LEFT.data_type() {
DataType::Dictionary(_, val_type) => DataType::is_numeric(val_type),
_ => false
};
let numeric_like = is_numeric | is_numeric_dict;

let is_string = ($LEFT.data_type() == &DataType::Utf8) | ($LEFT.data_type() == &DataType::LargeUtf8);
let is_string_dict = match $LEFT.data_type() {
DataType::Dictionary(_, val_type) => match **val_type {
DataType::Utf8 | DataType::LargeUtf8 => true,
_ => false
}
};
let string_like = is_string | is_string_dict;

let result: Result<Arc<dyn Array>> = if numeric_like {
compute_op_dyn_scalar!($LEFT, $RIGHT, $OP)
} else if string_like {
compute_utf8_op_dyn_scalar!($LEFT, $RIGHT, $OP)
} else {
let r: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Decimal(_,_) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, DecimalArray),
DataType::Boolean => compute_bool_op_dyn_scalar!($LEFT, $RIGHT, $OP),
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
}
DataType::Date64 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date64Array)
}
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation '{}' on dyn array",
other, stringify!($OP)
))),
};
r
};
Some(result)
}}
}

impl BinaryExpr {
/// Evaluate the expression of the left input is an array and
/// right is literal - use scalar operations
fn evaluate_array_scalar(
&self,
array: &ArrayRef,
array: &dyn Array,
scalar: &ScalarValue,
) -> Result<Option<Result<ArrayRef>>> {
let scalar_result = match &self.op {
Operator::Lt => binary_array_op_scalar!(array, scalar.clone(), lt),
Operator::Lt => {
binary_array_op_dyn_scalar!(array, scalar.clone(), lt)
}
Operator::LtEq => {
binary_array_op_scalar!(array, scalar.clone(), lt_eq)
binary_array_op_dyn_scalar!(array, scalar.clone(), lt_eq)
}
Operator::Gt => {
binary_array_op_dyn_scalar!(array, scalar.clone(), gt)
}
Operator::Gt => binary_array_op_scalar!(array, scalar.clone(), gt),
Operator::GtEq => {
binary_array_op_scalar!(array, scalar.clone(), gt_eq)
binary_array_op_dyn_scalar!(array, scalar.clone(), gt_eq)
}
Operator::Eq => {
binary_array_op_dyn_scalar!(array, scalar.clone(), eq)
}
Operator::Eq => binary_array_op_scalar!(array, scalar.clone(), eq),
Operator::NotEq => {
binary_array_op_scalar!(array, scalar.clone(), neq)
binary_array_op_dyn_scalar!(array, scalar.clone(), neq)
}
Operator::Like => {
binary_string_array_op_scalar!(array, scalar.clone(), like)
Expand Down

0 comments on commit 0e412eb

Please sign in to comment.