diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 62a9aa2cc277..a1154f58dcd9 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1485,6 +1485,7 @@ dependencies = [ "log", "paste", "recursive", + "regex", "regex-syntax", ] diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index 67f3da4f48de..bb53a30dcb23 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -19,7 +19,7 @@ use crate::utils::{parse_identifiers_normalized, quote_identifier}; use std::sync::Arc; /// A fully resolved path to a table of the form "catalog.schema.table" -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct ResolvedTableReference { /// The catalog (aka database) containing the table pub catalog: Arc, diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 140e266a0272..44320e7a287a 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -24,8 +24,10 @@ mod data_utils; use crate::criterion::Criterion; use arrow::datatypes::{DataType, Field, Fields, Schema}; +use criterion::Bencher; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; +use datafusion_common::ScalarValue; use itertools::Itertools; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext { ctx } +/// Target of this benchmark: control that placeholders replacing does not get slower, +/// if the query does not contain placeholders at all. +fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) { + const COLUMNS_NUM: usize = 200; + let mut aggregates = String::new(); + for i in 0..COLUMNS_NUM { + if i > 0 { + aggregates.push_str(", "); + } + aggregates.push_str(format!("MAX(a{})", i).as_str()); + } + // SELECT max(attr0), ..., max(attrN) FROM t1. + let query = format!("SELECT {} FROM t1", aggregates); + let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap(); + let rt = Runtime::new().unwrap(); + let plan = + rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() }); + b.iter(|| { + let plan = plan.clone(); + criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap()); + }); +} + fn criterion_benchmark(c: &mut Criterion) { // verify that we can load the clickbench data prior to running the benchmark if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() @@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) { } }) }); + + c.bench_function("with_param_values_many_columns", |b| { + benchmark_with_param_values_many_columns(&ctx, b); + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 7868a7f9e59c..e04fe6bddec9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -688,11 +688,11 @@ impl SessionContext { LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await } - LogicalPlan::Prepare(Prepare { + LogicalPlan::Statement(Statement::Prepare(Prepare { name, input, data_types, - }) => { + })) => { // The number of parameters must match the specified data types length. if !data_types.is_empty() { let param_names = input.get_parameter_names()?; @@ -712,7 +712,15 @@ impl SessionContext { self.state.write().store_prepared(name, data_types, input)?; self.return_empty_dataframe() } - LogicalPlan::Execute(execute) => self.execute_prepared(execute), + LogicalPlan::Statement(Statement::Execute(execute)) => { + self.execute_prepared(execute) + } + LogicalPlan::Statement(Statement::Deallocate(deallocate)) => { + self.state + .write() + .remove_prepared(deallocate.name.as_str())?; + self.return_empty_dataframe() + } plan => Ok(DataFrame::new(self.state(), plan)), } } @@ -1773,14 +1781,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> { LogicalPlan::Statement(stmt) if !self.options.allow_statements => { plan_err!("Statement not supported: {}", stmt.name()) } - // TODO: Implement PREPARE as a LogicalPlan::Statement - LogicalPlan::Prepare(_) if !self.options.allow_statements => { - plan_err!("Statement not supported: PREPARE") - } - // TODO: Implement EXECUTE as a LogicalPlan::Statement - LogicalPlan::Execute(_) if !self.options.allow_statements => { - plan_err!("Statement not supported: EXECUTE") - } _ => Ok(TreeNodeRecursion::Continue), } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ad0c1c2d41a6..d0bbc95a1b08 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -540,8 +540,9 @@ impl SessionState { }; for reference in references { - let resolved = &self.resolve_table_ref(reference); - if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) { + let resolved = self.resolve_table_ref(reference); + if let Entry::Vacant(v) = provider.tables.entry(resolved) { + let resolved = v.key(); if let Ok(schema) = self.schema_for_ref(resolved.clone()) { if let Some(table) = schema.table(&resolved.table).await? { v.insert(provider_as_source(table)); @@ -933,6 +934,17 @@ impl SessionState { pub(crate) fn get_prepared(&self, name: &str) -> Option> { self.prepared_plans.get(name).map(Arc::clone) } + + /// Remove the prepared plan with the given name. + pub(crate) fn remove_prepared( + &mut self, + name: &str, + ) -> datafusion_common::Result<()> { + match self.prepared_plans.remove(name) { + Some(_) => Ok(()), + None => exec_err!("Prepared statement '{}' does not exist", name), + } + } } /// A builder to be used for building [`SessionState`]'s. Defaults will @@ -1599,7 +1611,7 @@ impl From for SessionStateBuilder { /// having a direct dependency on the [`SessionState`] struct (and core crate) struct SessionContextProvider<'a> { state: &'a SessionState, - tables: HashMap>, + tables: HashMap>, } impl<'a> ContextProvider for SessionContextProvider<'a> { @@ -1611,7 +1623,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> { &self, name: TableReference, ) -> datafusion_common::Result> { - let name = self.state.resolve_table_ref(name).to_string(); + let name = self.state.resolve_table_ref(name); self.tables .get(&name) .cloned() diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 7d475ad2e2a1..2d3899adb00e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner { let name = statement.name(); return not_impl_err!("Unsupported logical plan: Statement({name})"); } - LogicalPlan::Prepare(_) => { - // There is no default plan for "PREPARE" -- it must be - // handled at a higher level (so that the appropriate - // statement can be prepared) - return not_impl_err!("Unsupported logical plan: Prepare"); - } - LogicalPlan::Execute(_) => { - return not_impl_err!("Unsupported logical plan: Execute"); - } LogicalPlan::Dml(dml) => { // DataFusion is a read-only query engine, but also a library, so consumers may implement this return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op); diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 16f539b75967..792e23b519e0 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -50,6 +50,7 @@ use datafusion_common::HashMap; use datafusion_physical_expr_common::sort_expr::LexOrdering; use rand::rngs::StdRng; use rand::{thread_rng, Rng, SeedableRng}; +use std::str; use tokio::task::JoinSet; // ======================================================================== @@ -171,6 +172,21 @@ fn baseline_config() -> DatasetGeneratorConfig { ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)), ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)), ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)), + ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)), + ColumnDescr::new( + "timestamp_ms", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ColumnDescr::new( + "timestamp_us", + DataType::Timestamp(TimeUnit::Microsecond, None), + ), + ColumnDescr::new( + "timestamp_ns", + DataType::Timestamp(TimeUnit::Nanosecond, None), + ), + ColumnDescr::new("float32", DataType::Float32), + ColumnDescr::new("float64", DataType::Float64), ColumnDescr::new( "interval_year_month", DataType::Interval(IntervalUnit::YearMonth), @@ -206,10 +222,12 @@ fn baseline_config() -> DatasetGeneratorConfig { ColumnDescr::new("utf8", DataType::Utf8), ColumnDescr::new("largeutf8", DataType::LargeUtf8), ColumnDescr::new("utf8view", DataType::Utf8View), - // todo binary // low cardinality columns ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10), ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10), + ColumnDescr::new("binary", DataType::Binary), + ColumnDescr::new("large_binary", DataType::LargeBinary), + ColumnDescr::new("binaryview", DataType::BinaryView), ]; let min_num_rows = 512; diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index f0973826b507..fd4e3c40db2a 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -18,11 +18,13 @@ use std::sync::Arc; use arrow::datatypes::{ - ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, - Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, LargeUtf8Type, - StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, - Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type, + BinaryType, BinaryViewType, ByteArrayType, ByteViewType, Date32Type, Date64Type, + Decimal128Type, Decimal256Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, + IntervalYearMonthType, LargeBinaryType, LargeUtf8Type, StringViewType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type, }; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit}; @@ -35,7 +37,10 @@ use rand::{ thread_rng, Rng, SeedableRng, }; use test_utils::{ - array_gen::{DecimalArrayGenerator, PrimitiveArrayGenerator, StringArrayGenerator}, + array_gen::{ + BinaryArrayGenerator, DecimalArrayGenerator, PrimitiveArrayGenerator, + StringArrayGenerator, + }, stagger_batch, }; @@ -71,17 +76,19 @@ pub struct DatasetGeneratorConfig { } impl DatasetGeneratorConfig { - /// return a list of all column names + /// Return a list of all column names pub fn all_columns(&self) -> Vec<&str> { self.columns.iter().map(|d| d.name.as_str()).collect() } - /// return a list of column names that are "numeric" + /// Return a list of column names that are "numeric" pub fn numeric_columns(&self) -> Vec<&str> { self.columns .iter() .filter_map(|d| { - if d.column_type.is_numeric() { + if d.column_type.is_numeric() + && !matches!(d.column_type, DataType::Float32 | DataType::Float64) + { Some(d.name.as_str()) } else { None @@ -278,6 +285,37 @@ macro_rules! generate_primitive_array { }}; } +macro_rules! generate_binary_array { + ( + $SELF:ident, + $NUM_ROWS:ident, + $MAX_NUM_DISTINCT:expr, + $BATCH_GEN_RNG:ident, + $ARRAY_GEN_RNG:ident, + $ARROW_TYPE:ident + ) => {{ + let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len()); + let null_pct = $SELF.candidate_null_pcts[null_pct_idx]; + + let max_len = $BATCH_GEN_RNG.gen_range(1..100); + + let mut generator = BinaryArrayGenerator { + max_len, + num_binaries: $NUM_ROWS, + num_distinct_binaries: $MAX_NUM_DISTINCT, + null_pct, + rng: $ARRAY_GEN_RNG, + }; + + match $ARROW_TYPE::DATA_TYPE { + DataType::Binary => generator.gen_data::(), + DataType::LargeBinary => generator.gen_data::(), + DataType::BinaryView => generator.gen_binary_view(), + _ => unreachable!(), + } + }}; +} + impl RecordBatchGenerator { fn new(min_rows_nun: usize, max_rows_num: usize, columns: Vec) -> Self { let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5]; @@ -527,6 +565,76 @@ impl RecordBatchGenerator { IntervalMonthDayNanoType ) } + DataType::Timestamp(TimeUnit::Second, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampSecondType + ) + } + DataType::Timestamp(TimeUnit::Millisecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampMillisecondType + ) + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampMicrosecondType + ) + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + generate_primitive_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + TimestampNanosecondType + ) + } + DataType::Binary => { + generate_binary_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + BinaryType + ) + } + DataType::LargeBinary => { + generate_binary_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + LargeBinaryType + ) + } + DataType::BinaryView => { + generate_binary_array!( + self, + num_rows, + max_num_distinct, + batch_gen_rng, + array_gen_rng, + BinaryViewType + ) + } DataType::Decimal128(precision, scale) => { generate_decimal_array!( self, diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index b2ffefa43708..034d6fa23d9c 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -124,12 +124,12 @@ async fn disable_prepare_and_execute_statement() { let df = ctx.sql_with_options(prepare_sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), - "Error during planning: Statement not supported: PREPARE" + "Error during planning: Statement not supported: Prepare" ); let df = ctx.sql_with_options(execute_sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), - "Error during planning: Statement not supported: EXECUTE" + "Error during planning: Statement not supported: Execute" ); let options = options.with_allow_statements(true); diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml index de11b19c3b06..109d8e0b89a6 100644 --- a/datafusion/expr-common/Cargo.toml +++ b/datafusion/expr-common/Cargo.toml @@ -19,7 +19,6 @@ name = "datafusion-expr-common" description = "Logical plan and expression representation for DataFusion query engine" keywords = ["datafusion", "logical", "plan", "expressions"] -readme = "README.md" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index be69d3b809c0..3846fae5de5d 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -18,8 +18,10 @@ //! Signature module contains foundational types that are used to represent signatures, types, //! and return types of functions in DataFusion. +use crate::type_coercion::aggregates::{NUMERICS, STRINGS}; use arrow::datatypes::DataType; -use datafusion_common::types::LogicalTypeRef; +use datafusion_common::types::{LogicalTypeRef, NativeType}; +use itertools::Itertools; /// Constant that is used as a placeholder for any valid timezone. /// This is used where a function can accept a timestamp type with any @@ -258,17 +260,66 @@ impl TypeSignature { .iter() .flat_map(|type_sig| type_sig.get_possible_types()) .collect(), + TypeSignature::Uniform(arg_count, types) => types + .iter() + .cloned() + .map(|data_type| vec![data_type; *arg_count]) + .collect(), + TypeSignature::Coercible(types) => types + .iter() + .map(|logical_type| get_data_types(logical_type.native())) + .multi_cartesian_product() + .collect(), + TypeSignature::Variadic(types) => types + .iter() + .cloned() + .map(|data_type| vec![data_type]) + .collect(), + TypeSignature::Numeric(arg_count) => NUMERICS + .iter() + .cloned() + .map(|numeric_type| vec![numeric_type; *arg_count]) + .collect(), + TypeSignature::String(arg_count) => STRINGS + .iter() + .cloned() + .map(|string_type| vec![string_type; *arg_count]) + .collect(), // TODO: Implement for other types - TypeSignature::Uniform(_, _) - | TypeSignature::Coercible(_) - | TypeSignature::Any(_) - | TypeSignature::Variadic(_) + TypeSignature::Any(_) | TypeSignature::VariadicAny - | TypeSignature::UserDefined | TypeSignature::ArraySignature(_) - | TypeSignature::Numeric(_) - | TypeSignature::String(_) => vec![], + | TypeSignature::UserDefined => vec![], + } + } +} + +fn get_data_types(native_type: &NativeType) -> Vec { + match native_type { + NativeType::Null => vec![DataType::Null], + NativeType::Boolean => vec![DataType::Boolean], + NativeType::Int8 => vec![DataType::Int8], + NativeType::Int16 => vec![DataType::Int16], + NativeType::Int32 => vec![DataType::Int32], + NativeType::Int64 => vec![DataType::Int64], + NativeType::UInt8 => vec![DataType::UInt8], + NativeType::UInt16 => vec![DataType::UInt16], + NativeType::UInt32 => vec![DataType::UInt32], + NativeType::UInt64 => vec![DataType::UInt64], + NativeType::Float16 => vec![DataType::Float16], + NativeType::Float32 => vec![DataType::Float32], + NativeType::Float64 => vec![DataType::Float64], + NativeType::Date => vec![DataType::Date32, DataType::Date64], + NativeType::Binary => vec![ + DataType::Binary, + DataType::LargeBinary, + DataType::BinaryView, + ], + NativeType::String => { + vec![DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View] } + // TODO: support other native types + _ => vec![], } } @@ -417,6 +468,8 @@ impl Signature { #[cfg(test)] mod tests { + use datafusion_common::types::{logical_int64, logical_string}; + use super::*; #[test] @@ -515,5 +568,65 @@ mod tests { vec![DataType::Utf8] ] ); + + let type_signature = + TypeSignature::Uniform(2, vec![DataType::Float32, DataType::Int64]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Float32, DataType::Float32], + vec![DataType::Int64, DataType::Int64] + ] + ); + + let type_signature = + TypeSignature::Coercible(vec![logical_string(), logical_int64()]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Utf8, DataType::Int64], + vec![DataType::LargeUtf8, DataType::Int64], + vec![DataType::Utf8View, DataType::Int64] + ] + ); + + let type_signature = + TypeSignature::Variadic(vec![DataType::Int32, DataType::Int64]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![vec![DataType::Int32], vec![DataType::Int64]] + ); + + let type_signature = TypeSignature::Numeric(2); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Int8, DataType::Int8], + vec![DataType::Int16, DataType::Int16], + vec![DataType::Int32, DataType::Int32], + vec![DataType::Int64, DataType::Int64], + vec![DataType::UInt8, DataType::UInt8], + vec![DataType::UInt16, DataType::UInt16], + vec![DataType::UInt32, DataType::UInt32], + vec![DataType::UInt64, DataType::UInt64], + vec![DataType::Float32, DataType::Float32], + vec![DataType::Float64, DataType::Float64] + ] + ); + + let type_signature = TypeSignature::String(2); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Utf8, DataType::Utf8], + vec![DataType::LargeUtf8, DataType::LargeUtf8], + vec![DataType::Utf8View, DataType::Utf8View] + ] + ); } } diff --git a/datafusion/expr-common/src/type_coercion/aggregates.rs b/datafusion/expr-common/src/type_coercion/aggregates.rs index fee75f9e4595..384d688cc27e 100644 --- a/datafusion/expr-common/src/type_coercion/aggregates.rs +++ b/datafusion/expr-common/src/type_coercion/aggregates.rs @@ -23,7 +23,8 @@ use arrow::datatypes::{ use datafusion_common::{internal_err, plan_err, Result}; -pub static STRINGS: &[DataType] = &[DataType::Utf8, DataType::LargeUtf8]; +pub static STRINGS: &[DataType] = + &[DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View]; pub static SIGNED_INTEGERS: &[DataType] = &[ DataType::Int8, diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 0cdfa055216b..aee3c9b42fb0 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1598,7 +1598,11 @@ impl Expr { /// /// For example, gicen an expression like ` = $0` will infer `$0` to /// have type `int32`. - pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result { + /// + /// Returns transformed expression and flag that is true if expression contains + /// at least one placeholder. + pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> { + let mut has_placeholder = false; self.transform(|mut expr| { // Default to assuming the arguments are the same type if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr { @@ -1615,9 +1619,13 @@ impl Expr { rewrite_placeholder(low.as_mut(), expr.as_ref(), schema)?; rewrite_placeholder(high.as_mut(), expr.as_ref(), schema)?; } + if let Expr::Placeholder(_) = &expr { + has_placeholder = true; + } Ok(Transformed::yes(expr)) }) .data() + .map(|data| (data, has_placeholder)) } /// Returns true if some of this `exprs` subexpressions may not be evaluated diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index c86696854ca3..b944428977c4 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -314,7 +314,7 @@ impl NamePreserver { | LogicalPlan::Join(_) | LogicalPlan::TableScan(_) | LogicalPlan::Limit(_) - | LogicalPlan::Execute(_) + | LogicalPlan::Statement(_) ), } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index beb729fc7c17..90235e3f84c4 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -42,7 +42,7 @@ use crate::utils::{ }; use crate::{ and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery, - TableProviderFilterPushDown, TableSource, WriteOp, + Statement, TableProviderFilterPushDown, TableSource, WriteOp, }; use super::dml::InsertOp; @@ -500,11 +500,13 @@ impl LogicalPlanBuilder { /// Make a builder for a prepare logical plan from the builder's plan pub fn prepare(self, name: String, data_types: Vec) -> Result { - Ok(Self::new(LogicalPlan::Prepare(Prepare { - name, - data_types, - input: self.plan, - }))) + Ok(Self::new(LogicalPlan::Statement(Statement::Prepare( + Prepare { + name, + data_types, + input: self.plan, + }, + )))) } /// Limit the number of rows returned @@ -1228,25 +1230,24 @@ impl LogicalPlanBuilder { let join_key_pairs = equi_exprs .0 .into_iter() - .zip(equi_exprs.1.into_iter()) + .zip(equi_exprs.1) .map(|(l, r)| { let left_key = l.into(); let right_key = r.into(); - - let mut left_using_columns = HashSet::new(); + let mut left_using_columns = HashSet::new(); expr_to_columns(&left_key, &mut left_using_columns)?; let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check( left_key, - &[&[self.plan.schema(), right.schema()]], - &[left_using_columns], + &[&[self.plan.schema()]], + &[], )?; let mut right_using_columns = HashSet::new(); expr_to_columns(&right_key, &mut right_using_columns)?; let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check( right_key, - &[&[self.plan.schema(), right.schema()]], - &[right_using_columns], + &[&[right.schema()]], + &[], )?; // find valid equijoin diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 84efd8541940..b808defcb959 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -21,10 +21,10 @@ use std::collections::HashMap; use std::fmt; use crate::{ - expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute, - Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, - RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias, - TableProviderFilterPushDown, TableScan, Unnest, Values, Window, + expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr, + Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery, + Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan, + Unnest, Values, Window, }; use crate::dml::CopyTo; @@ -618,24 +618,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Detail": format!("{:?}", e.node) }) } - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { - json!({ - "Node Type": "Prepare", - "Name": name, - "Data Types": format!("{:?}", data_types) - }) - } - LogicalPlan::Execute(Execute { - name, parameters, .. - }) => { - json!({ - "Node Type": "Execute", - "Name": name, - "Parameters": expr_vec_fmt!(parameters), - }) - } LogicalPlan::DescribeTable(DescribeTable { .. }) => { json!({ "Node Type": "DescribeTable" diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 59654a227829..5d613d4e80db 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -36,14 +36,14 @@ pub use ddl::{ pub use dml::{DmlStatement, WriteOp}; pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, - DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join, - JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, - Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, + DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join, + JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, + RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ - SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, + Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index db309d9b5232..6ee99b22c7f3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -39,9 +39,9 @@ use crate::utils::{ split_conjunction, }; use crate::{ - build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr, - ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown, - TableSource, WindowFunctionDefinition, + build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute, + Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare, + TableProviderFilterPushDown, TableSource, WindowFunctionDefinition, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -262,11 +262,6 @@ pub enum LogicalPlan { /// Remove duplicate rows from the input. This is used to /// implement SQL `SELECT DISTINCT ...`. Distinct(Distinct), - /// Prepare a statement and find any bind parameters - /// (e.g. `?`). This is used to implement SQL-prepared statements. - Prepare(Prepare), - /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'. - Execute(Execute), /// Data Manipulation Language (DML): Insert / Update / Delete Dml(DmlStatement), /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS @@ -314,8 +309,6 @@ impl LogicalPlan { LogicalPlan::Statement(statement) => statement.schema(), LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, - LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(), - LogicalPlan::Execute(Execute { schema, .. }) => schema, LogicalPlan::Explain(explain) => &explain.schema, LogicalPlan::Analyze(analyze) => &analyze.schema, LogicalPlan::Extension(extension) => extension.node.schema(), @@ -448,18 +441,16 @@ impl LogicalPlan { LogicalPlan::Copy(copy) => vec![©.input], LogicalPlan::Ddl(ddl) => ddl.inputs(), LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], - LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, recursive_term, .. }) => vec![static_term, recursive_term], + LogicalPlan::Statement(stmt) => stmt.inputs(), // plans without inputs LogicalPlan::TableScan { .. } - | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => vec![], } } @@ -562,8 +553,6 @@ impl LogicalPlan { } LogicalPlan::Subquery(_) => Ok(None), LogicalPlan::EmptyRelation(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) | LogicalPlan::Explain(_) @@ -715,8 +704,6 @@ impl LogicalPlan { LogicalPlan::RecursiveQuery(_) => Ok(self), LogicalPlan::Analyze(_) => Ok(self), LogicalPlan::Explain(_) => Ok(self), - LogicalPlan::Prepare(_) => Ok(self), - LogicalPlan::Execute(_) => Ok(self), LogicalPlan::TableScan(_) => Ok(self), LogicalPlan::EmptyRelation(_) => Ok(self), LogicalPlan::Statement(_) => Ok(self), @@ -1070,24 +1057,25 @@ impl LogicalPlan { logical_optimization_succeeded: e.logical_optimization_succeeded, })) } - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { + LogicalPlan::Statement(Statement::Prepare(Prepare { + name, + data_types, + .. + })) => { self.assert_no_expressions(expr)?; let input = self.only_input(inputs)?; - Ok(LogicalPlan::Prepare(Prepare { + Ok(LogicalPlan::Statement(Statement::Prepare(Prepare { name: name.clone(), data_types: data_types.clone(), input: Arc::new(input), - })) + }))) } - LogicalPlan::Execute(Execute { name, schema, .. }) => { + LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => { self.assert_no_inputs(inputs)?; - Ok(LogicalPlan::Execute(Execute { + Ok(LogicalPlan::Statement(Statement::Execute(Execute { name: name.clone(), - schema: Arc::clone(schema), parameters: expr, - })) + }))) } LogicalPlan::TableScan(ts) => { self.assert_no_inputs(inputs)?; @@ -1184,8 +1172,8 @@ impl LogicalPlan { /// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`] /// with the specified `param_values`. /// - /// [`LogicalPlan::Prepare`] are - /// converted to their inner logical plan for execution. + /// [`Prepare`] statements are converted to + /// their inner logical plan for execution. /// /// # Example /// ``` @@ -1242,13 +1230,17 @@ impl LogicalPlan { let plan_with_values = self.replace_params_with_values(¶m_values)?; // unwrap Prepare - Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values { - param_values.verify(&prepare_lp.data_types)?; - // try and take ownership of the input if is not shared, clone otherwise - Arc::unwrap_or_clone(prepare_lp.input) - } else { - plan_with_values - }) + Ok( + if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) = + plan_with_values + { + param_values.verify(&prepare_lp.data_types)?; + // try and take ownership of the input if is not shared, clone otherwise + Arc::unwrap_or_clone(prepare_lp.input) + } else { + plan_with_values + }, + ) } /// Returns the maximum number of rows that this plan can output, if known. @@ -1346,8 +1338,6 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Statement(_) | LogicalPlan::Extension(_) => None, } @@ -1423,9 +1413,15 @@ impl LogicalPlan { let schema = Arc::clone(plan.schema()); let name_preserver = NamePreserver::new(&plan); plan.map_expressions(|e| { - let original_name = name_preserver.save(&e); - let transformed_expr = - e.infer_placeholder_types(&schema)?.transform_up(|e| { + let (e, has_placeholder) = e.infer_placeholder_types(&schema)?; + if !has_placeholder { + // Performance optimization: + // avoid NamePreserver copy and second pass over expression + // if no placeholders. + Ok(Transformed::no(e)) + } else { + let original_name = name_preserver.save(&e); + let transformed_expr = e.transform_up(|e| { if let Expr::Placeholder(Placeholder { id, .. }) = e { let value = param_values.get_placeholders_with_values(&id)?; Ok(Transformed::yes(Expr::Literal(value))) @@ -1433,8 +1429,9 @@ impl LogicalPlan { Ok(Transformed::no(e)) } })?; - // Preserve name to avoid breaking column references to this expression - Ok(transformed_expr.update_data(|expr| original_name.restore(expr))) + // Preserve name to avoid breaking column references to this expression + Ok(transformed_expr.update_data(|expr| original_name.restore(expr))) + } }) }) .map(|res| res.data) @@ -1962,14 +1959,6 @@ impl LogicalPlan { LogicalPlan::Analyze { .. } => write!(f, "Analyze"), LogicalPlan::Union(_) => write!(f, "Union"), LogicalPlan::Extension(e) => e.node.fmt_for_explain(f), - LogicalPlan::Prepare(Prepare { - name, data_types, .. - }) => { - write!(f, "Prepare: {name:?} {data_types:?} ") - } - LogicalPlan::Execute(Execute { name, parameters, .. }) => { - write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters)) - } LogicalPlan::DescribeTable(DescribeTable { .. }) => { write!(f, "DescribeTable") } @@ -2624,39 +2613,6 @@ impl PartialOrd for Union { } } -/// Prepare a statement but do not execute it. Prepare statements can have 0 or more -/// `Expr::Placeholder` expressions that are filled in during execution -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] -pub struct Prepare { - /// The name of the statement - pub name: String, - /// Data types of the parameters ([`Expr::Placeholder`]) - pub data_types: Vec, - /// The logical plan of the statements - pub input: Arc, -} - -/// Execute a prepared statement. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Execute { - /// The name of the prepared statement to execute - pub name: String, - /// The execute parameters - pub parameters: Vec, - /// Dummy schema - pub schema: DFSchemaRef, -} - -// Comparison excludes the `schema` field. -impl PartialOrd for Execute { - fn partial_cmp(&self, other: &Self) -> Option { - match self.name.partial_cmp(&other.name) { - Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters), - cmp => cmp, - } - } -} - /// Describe the schema of table /// /// # Example output: diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index 7ad18ce7bbf7..05e2b1af14d3 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -15,17 +15,26 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DFSchemaRef; -use std::cmp::Ordering; +use arrow::datatypes::DataType; +use datafusion_common::tree_node::{Transformed, TreeNodeIterator}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; use std::fmt::{self, Display}; +use std::sync::{Arc, OnceLock}; + +use super::tree_node::rewrite_arc; +use crate::{expr_vec_fmt, Expr, LogicalPlan}; + +/// Statements have a unchanging empty schema. +/// TODO: Use `LazyLock` when MSRV is 1.80.0 +static STATEMENT_EMPTY_SCHEMA: OnceLock = OnceLock::new(); /// Various types of Statements. /// /// # Transactions: /// /// While DataFusion does not offer support transactions, it provides -/// [`LogicalPlan`](crate::LogicalPlan) support to assist building -/// database systems using DataFusion +/// [`LogicalPlan`] support to assist building database systems +/// using DataFusion #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub enum Statement { // Begin a transaction @@ -34,16 +43,20 @@ pub enum Statement { TransactionEnd(TransactionEnd), /// Set a Variable SetVariable(SetVariable), + /// Prepare a statement and find any bind parameters + /// (e.g. `?`). This is used to implement SQL-prepared statements. + Prepare(Prepare), + /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'. + Execute(Execute), + /// Deallocate a prepared statement. + /// This is used to implement SQL 'DEALLOCATE'. + Deallocate(Deallocate), } impl Statement { /// Get a reference to the logical plan's schema pub fn schema(&self) -> &DFSchemaRef { - match self { - Statement::TransactionStart(TransactionStart { schema, .. }) => schema, - Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, - Statement::SetVariable(SetVariable { schema, .. }) => schema, - } + STATEMENT_EMPTY_SCHEMA.get_or_init(|| Arc::new(DFSchema::empty())) } /// Return a descriptive string describing the type of this @@ -53,6 +66,64 @@ impl Statement { Statement::TransactionStart(_) => "TransactionStart", Statement::TransactionEnd(_) => "TransactionEnd", Statement::SetVariable(_) => "SetVariable", + Statement::Prepare(_) => "Prepare", + Statement::Execute(_) => "Execute", + Statement::Deallocate(_) => "Deallocate", + } + } + + /// Returns input LogicalPlans in the current `Statement`. + pub(super) fn inputs(&self) -> Vec<&LogicalPlan> { + match self { + Statement::Prepare(Prepare { input, .. }) => vec![input.as_ref()], + _ => vec![], + } + } + + /// Rewrites input LogicalPlans in the current `Statement` using `f`. + pub(super) fn map_inputs< + F: FnMut(LogicalPlan) -> Result>, + >( + self, + f: F, + ) -> Result> { + match self { + Statement::Prepare(Prepare { + input, + name, + data_types, + }) => Ok(rewrite_arc(input, f)?.update_data(|input| { + Statement::Prepare(Prepare { + input, + name, + data_types, + }) + })), + _ => Ok(Transformed::no(self)), + } + } + + /// Returns a iterator over all expressions in the current `Statement`. + pub(super) fn expression_iter(&self) -> impl Iterator { + match self { + Statement::Execute(Execute { parameters, .. }) => parameters.iter(), + _ => [].iter(), + } + } + + /// Rewrites all expressions in the current `Statement` using `f`. + pub(super) fn map_expressions Result>>( + self, + f: F, + ) -> Result> { + match self { + Statement::Execute(Execute { name, parameters }) => Ok(parameters + .into_iter() + .map_until_stop_and_collect(f)? + .update_data(|parameters| { + Statement::Execute(Execute { parameters, name }) + })), + _ => Ok(Transformed::no(self)), } } @@ -85,6 +156,24 @@ impl Statement { }) => { write!(f, "SetVariable: set {variable:?} to {value:?}") } + Statement::Prepare(Prepare { + name, data_types, .. + }) => { + write!(f, "Prepare: {name:?} {data_types:?} ") + } + Statement::Execute(Execute { + name, parameters, .. + }) => { + write!( + f, + "Execute: {} params=[{}]", + name, + expr_vec_fmt!(parameters) + ) + } + Statement::Deallocate(Deallocate { name }) => { + write!(f, "Deallocate: {}", name) + } } } } @@ -116,67 +205,57 @@ pub enum TransactionIsolationLevel { } /// Indicator that the following statements should be committed or rolled back atomically -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct TransactionStart { /// indicates if transaction is allowed to write pub access_mode: TransactionAccessMode, // indicates ANSI isolation level pub isolation_level: TransactionIsolationLevel, - /// Empty schema - pub schema: DFSchemaRef, -} - -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for TransactionStart { - fn partial_cmp(&self, other: &Self) -> Option { - match self.access_mode.partial_cmp(&other.access_mode) { - Some(Ordering::Equal) => { - self.isolation_level.partial_cmp(&other.isolation_level) - } - cmp => cmp, - } - } } /// Indicator that any current transaction should be terminated -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct TransactionEnd { /// whether the transaction committed or aborted pub conclusion: TransactionConclusion, /// if specified a new transaction is immediately started with same characteristics pub chain: bool, - /// Empty schema - pub schema: DFSchemaRef, -} - -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for TransactionEnd { - fn partial_cmp(&self, other: &Self) -> Option { - match self.conclusion.partial_cmp(&other.conclusion) { - Some(Ordering::Equal) => self.chain.partial_cmp(&other.chain), - cmp => cmp, - } - } } /// Set a Variable's value -- value in /// [`ConfigOptions`](datafusion_common::config::ConfigOptions) -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct SetVariable { /// The variable name pub variable: String, /// The value to set pub value: String, - /// Dummy schema - pub schema: DFSchemaRef, } -// Manual implementation needed because of `schema` field. Comparison excludes this field. -impl PartialOrd for SetVariable { - fn partial_cmp(&self, other: &Self) -> Option { - match self.variable.partial_cmp(&other.value) { - Some(Ordering::Equal) => self.value.partial_cmp(&other.value), - cmp => cmp, - } - } +/// Prepare a statement but do not execute it. Prepare statements can have 0 or more +/// `Expr::Placeholder` expressions that are filled in during execution +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub struct Prepare { + /// The name of the statement + pub name: String, + /// Data types of the parameters ([`Expr::Placeholder`]) + pub data_types: Vec, + /// The logical plan of the statements + pub input: Arc, +} + +/// Execute a prepared statement. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +pub struct Execute { + /// The name of the prepared statement to execute + pub name: String, + /// The execute parameters + pub parameters: Vec, +} + +/// Deallocate a prepared statement. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +pub struct Deallocate { + /// The name of the prepared statement to deallocate + pub name: String, } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 2fb589f15847..e7dfe8791924 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -38,10 +38,9 @@ //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions use crate::{ dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement, - Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, - Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, - Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, - Values, Window, + Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit, + LogicalPlan, Partitioning, Projection, RecursiveQuery, Repartition, Sort, Subquery, + SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values, Window, }; use recursive::recursive; use std::ops::Deref; @@ -330,17 +329,6 @@ impl TreeNode for LogicalPlan { options, }) }), - LogicalPlan::Prepare(Prepare { - name, - data_types, - input, - }) => rewrite_arc(input, f)?.update_data(|input| { - LogicalPlan::Prepare(Prepare { - name, - data_types, - input, - }) - }), LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, @@ -359,19 +347,20 @@ impl TreeNode for LogicalPlan { is_distinct, }) }), + LogicalPlan::Statement(stmt) => { + stmt.map_inputs(f)?.update_data(LogicalPlan::Statement) + } // plans without inputs LogicalPlan::TableScan { .. } - | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::Execute { .. } | LogicalPlan::DescribeTable(_) => Transformed::no(self), }) } } /// Applies `f` to rewrite a `Arc` without copying, if possible -fn rewrite_arc Result>>( +pub(super) fn rewrite_arc Result>>( plan: Arc, mut f: F, ) -> Result>> { @@ -507,15 +496,12 @@ impl LogicalPlan { .chain(fetch.iter()) .map(|e| e.deref()) .apply_until_stop(f), - LogicalPlan::Execute(Execute { parameters, .. }) => { - parameters.iter().apply_until_stop(f) - } + LogicalPlan::Statement(stmt) => stmt.expression_iter().apply_until_stop(f), // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Statement(_) | LogicalPlan::Analyze(_) | LogicalPlan::Explain(_) | LogicalPlan::Union(_) @@ -523,8 +509,7 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Ok(TreeNodeRecursion::Continue), + | LogicalPlan::DescribeTable(_) => Ok(TreeNodeRecursion::Continue), } } @@ -739,27 +724,15 @@ impl LogicalPlan { }) }) } - LogicalPlan::Execute(Execute { - parameters, - name, - schema, - }) => parameters - .into_iter() - .map_until_stop_and_collect(f)? - .update_data(|parameters| { - LogicalPlan::Execute(Execute { - parameters, - name, - schema, - }) - }), + LogicalPlan::Statement(stmt) => { + stmt.map_expressions(f)?.update_data(LogicalPlan::Statement) + } // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::Unnest(_) | LogicalPlan::RecursiveQuery(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) - | LogicalPlan::Statement(_) | LogicalPlan::Analyze(_) | LogicalPlan::Explain(_) | LogicalPlan::Union(_) @@ -767,8 +740,7 @@ impl LogicalPlan { | LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Prepare(_) => Transformed::no(self), + | LogicalPlan::DescribeTable(_) => Transformed::no(self), }) } diff --git a/datafusion/functions-aggregate-common/Cargo.toml b/datafusion/functions-aggregate-common/Cargo.toml index a8296ce11f30..9b299c1a11d7 100644 --- a/datafusion/functions-aggregate-common/Cargo.toml +++ b/datafusion/functions-aggregate-common/Cargo.toml @@ -19,7 +19,6 @@ name = "datafusion-functions-aggregate-common" description = "Utility functions for implementing aggregate functions for the DataFusion query engine" keywords = ["datafusion", "logical", "plan", "expressions"] -readme = "README.md" version = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/datafusion/functions-nested/src/map.rs b/datafusion/functions-nested/src/map.rs index cad193910cee..73aad10a8e26 100644 --- a/datafusion/functions-nested/src/map.rs +++ b/datafusion/functions-nested/src/map.rs @@ -261,33 +261,34 @@ fn get_map_doc() -> &'static Documentation { "map(key, value)\nmap(key: value)\nmake_map(['key1', 'key2'], ['value1', 'value2'])" ) .with_sql_example( - r#"```sql - -- Using map function - SELECT MAP('type', 'test'); - ---- - {type: test} - - SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]); - ---- - {POST: 41, HEAD: 33, PATCH: } - - SELECT MAP([[1,2], [3,4]], ['a', 'b']); - ---- - {[1, 2]: a, [3, 4]: b} - - SELECT MAP { 'a': 1, 'b': 2 }; - ---- - {a: 1, b: 2} - - -- Using make_map function - SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]); - ---- - {POST: 41, HEAD: 33} - - SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]); - ---- - {key1: value1, key2: } - ```"# + r#" +```sql +-- Using map function +SELECT MAP('type', 'test'); +---- +{type: test} + +SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]); +---- +{POST: 41, HEAD: 33, PATCH: } + +SELECT MAP([[1,2], [3,4]], ['a', 'b']); +---- +{[1, 2]: a, [3, 4]: b} + +SELECT MAP { 'a': 1, 'b': 2 }; +---- +{a: 1, b: 2} + +-- Using make_map function +SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]); +---- +{POST: 41, HEAD: 33} + +SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]); +---- +{key1: value1, key2: } +```"#, ) .with_argument( "key", diff --git a/datafusion/functions/src/string/bit_length.rs b/datafusion/functions/src/string/bit_length.rs index d02c2b6a65f4..cb815df15e4b 100644 --- a/datafusion/functions/src/string/bit_length.rs +++ b/datafusion/functions/src/string/bit_length.rs @@ -79,6 +79,9 @@ impl ScalarUDFImpl for BitLengthFunc { ScalarValue::LargeUtf8(v) => Ok(ColumnarValue::Scalar( ScalarValue::Int64(v.as_ref().map(|x| (x.len() * 8) as i64)), )), + ScalarValue::Utf8View(v) => Ok(ColumnarValue::Scalar( + ScalarValue::Int32(v.as_ref().map(|x| (x.len() * 8) as i32)), + )), _ => unreachable!("bit length"), }, } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index d915076823e5..34e35c66107a 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -48,6 +48,7 @@ itertools = { workspace = true } log = { workspace = true } paste = "1.0.14" recursive = { workspace = true } +regex = { workspace = true } regex-syntax = "0.8.0" [dev-dependencies] diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 6ba2b48fcc9a..16a4fa6be38d 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -565,9 +565,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::Unnest(_) - | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) => { + | LogicalPlan::RecursiveQuery(_) => { // This rule handles recursion itself in a `ApplyOrder::TopDown` like // manner. plan.map_children(|c| self.rewrite(c, config))? diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 66c7463c3d5d..2e7a751ca4c5 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -264,6 +264,34 @@ mod tests { assert_optimized_plan_equal(plan, expected) } + #[test] + fn one_side_unqualified() -> Result<()> { + let (t1, t2) = test_tables()?; + let plan_from_exprs = LogicalPlanBuilder::from(t1.clone()) + .join_with_expr_keys( + t2.clone(), + JoinType::Inner, + (vec![col("optional_id")], vec![col("t2.optional_id")]), + None, + )? + .build()?; + let plan_from_cols = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec!["optional_id"], vec!["t2.optional_id"]), + None, + )? + .build()?; + let expected = "Inner Join: t1.optional_id = t2.optional_id\ + \n Filter: t1.optional_id IS NOT NULL\ + \n TableScan: t1\ + \n Filter: t2.optional_id IS NOT NULL\ + \n TableScan: t2"; + assert_optimized_plan_equal(plan_from_cols, expected)?; + assert_optimized_plan_equal(plan_from_exprs, expected) + } + fn build_plan( left_table: LogicalPlan, right_table: LogicalPlan, diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index b9fc6725c253..b659e477f67e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -296,7 +296,7 @@ fn optimize_projections( }) .collect::>()? } - LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => { + LogicalPlan::Limit(_) => { // Pass index requirements from the parent as well as column indices // that appear in this plan's expressions to its child. These operators // do not benefit from "small" inputs, so the projection_beneficial @@ -312,6 +312,7 @@ fn optimize_projections( | LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Subquery(_) + | LogicalPlan::Statement(_) | LogicalPlan::Distinct(Distinct::All(_)) => { // These plans require all their fields, and their children should // be treated as final plans -- otherwise, we may have schema a @@ -347,10 +348,8 @@ fn optimize_projections( } LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) - | LogicalPlan::Statement(_) | LogicalPlan::Values(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::Execute(_) => { + | LogicalPlan::DescribeTable(_) => { // These operators have no inputs, so stop the optimization process. return Ok(Transformed::no(plan)); } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index e0df6a3a68ce..d8ca246bb635 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -49,6 +49,7 @@ use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; use crate::simplify_expressions::regex::simplify_regex_expr; use crate::simplify_expressions::SimplifyInfo; +use regex::Regex; use super::inlist_simplifier::ShortenInListSimplifier; use super::utils::*; @@ -1470,34 +1471,70 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for Simplifier<'a, S> { }) => Transformed::yes(simplify_regex_expr(left, op, right)?), // Rules for Like - Expr::Like(Like { - expr, - pattern, - negated, - escape_char: _, - case_insensitive: _, - }) if matches!( - pattern.as_ref(), - Expr::Literal(ScalarValue::Utf8(Some(pattern_str))) if pattern_str == "%" - ) || matches!( - pattern.as_ref(), - Expr::Literal(ScalarValue::LargeUtf8(Some(pattern_str))) if pattern_str == "%" - ) || matches!( - pattern.as_ref(), - Expr::Literal(ScalarValue::Utf8View(Some(pattern_str))) if pattern_str == "%" - ) => - { - // exp LIKE '%' is - // - when exp is not NULL, it's true - // - when exp is NULL, it's NULL - // exp NOT LIKE '%' is - // - when exp is not NULL, it's false - // - when exp is NULL, it's NULL - Transformed::yes(Expr::Case(Case { - expr: Some(Box::new(Expr::IsNotNull(expr))), - when_then_expr: vec![(Box::new(lit(true)), Box::new(lit(!negated)))], - else_expr: None, - })) + Expr::Like(like) => { + // `\` is implicit escape, see https://github.com/apache/datafusion/issues/13291 + let escape_char = like.escape_char.unwrap_or('\\'); + match as_string_scalar(&like.pattern) { + Some((data_type, pattern_str)) => { + match pattern_str { + None => return Ok(Transformed::yes(lit_bool_null())), + Some(pattern_str) if pattern_str == "%" => { + // exp LIKE '%' is + // - when exp is not NULL, it's true + // - when exp is NULL, it's NULL + // exp NOT LIKE '%' is + // - when exp is not NULL, it's false + // - when exp is NULL, it's NULL + let result_for_non_null = lit(!like.negated); + Transformed::yes(if !info.nullable(&like.expr)? { + result_for_non_null + } else { + Expr::Case(Case { + expr: Some(Box::new(Expr::IsNotNull(like.expr))), + when_then_expr: vec![( + Box::new(lit(true)), + Box::new(result_for_non_null), + )], + else_expr: None, + }) + }) + } + Some(pattern_str) + if pattern_str.contains("%%") + && !pattern_str.contains(escape_char) => + { + // Repeated occurrences of wildcard are redundant so remove them + // exp LIKE '%%' --> exp LIKE '%' + let simplified_pattern = Regex::new("%%+") + .unwrap() + .replace_all(pattern_str, "%") + .to_string(); + Transformed::yes(Expr::Like(Like { + pattern: Box::new(to_string_scalar( + data_type, + Some(simplified_pattern), + )), + ..like + })) + } + Some(pattern_str) + if !pattern_str + .contains(['%', '_', escape_char].as_ref()) => + { + // If the pattern does not contain any wildcards, we can simplify the like expression to an equality expression + // TODO: handle escape characters + Transformed::yes(Expr::BinaryExpr(BinaryExpr { + left: like.expr.clone(), + op: if like.negated { NotEq } else { Eq }, + right: like.pattern.clone(), + })) + } + + Some(_pattern_str) => Transformed::no(Expr::Like(like)), + } + } + None => Transformed::no(Expr::Like(like)), + } } // a is not null/unknown --> true (if a is not nullable) @@ -1696,6 +1733,24 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for Simplifier<'a, S> { } } +fn as_string_scalar(expr: &Expr) -> Option<(DataType, &Option)> { + match expr { + Expr::Literal(ScalarValue::Utf8(s)) => Some((DataType::Utf8, s)), + Expr::Literal(ScalarValue::LargeUtf8(s)) => Some((DataType::LargeUtf8, s)), + Expr::Literal(ScalarValue::Utf8View(s)) => Some((DataType::Utf8View, s)), + _ => None, + } +} + +fn to_string_scalar(data_type: DataType, value: Option) -> Expr { + match data_type { + DataType::Utf8 => Expr::Literal(ScalarValue::Utf8(value)), + DataType::LargeUtf8 => Expr::Literal(ScalarValue::LargeUtf8(value)), + DataType::Utf8View => Expr::Literal(ScalarValue::Utf8View(value)), + _ => unreachable!(), + } +} + fn has_common_conjunction(lhs: &Expr, rhs: &Expr) -> bool { let lhs_set: HashSet<&Expr> = iter_conjunction(lhs).collect(); iter_conjunction(rhs).any(|e| lhs_set.contains(&e) && !e.is_volatile()) @@ -2810,10 +2865,16 @@ mod tests { ); // single character - assert_change(regex_match(col("c1"), lit("x")), like(col("c1"), "%x%")); + assert_change( + regex_match(col("c1"), lit("x")), + like(col("c1"), lit("%x%")), + ); // single word - assert_change(regex_match(col("c1"), lit("foo")), like(col("c1"), "%foo%")); + assert_change( + regex_match(col("c1"), lit("foo")), + like(col("c1"), lit("%foo%")), + ); // regular expressions that match an exact literal assert_change(regex_match(col("c1"), lit("^$")), col("c1").eq(lit(""))); @@ -2900,44 +2961,50 @@ mod tests { assert_no_change(regex_match(col("c1"), lit("$foo^"))); // regular expressions that match a partial literal - assert_change(regex_match(col("c1"), lit("^foo")), like(col("c1"), "foo%")); - assert_change(regex_match(col("c1"), lit("foo$")), like(col("c1"), "%foo")); + assert_change( + regex_match(col("c1"), lit("^foo")), + like(col("c1"), lit("foo%")), + ); + assert_change( + regex_match(col("c1"), lit("foo$")), + like(col("c1"), lit("%foo")), + ); assert_change( regex_match(col("c1"), lit("^foo|bar$")), - like(col("c1"), "foo%").or(like(col("c1"), "%bar")), + like(col("c1"), lit("foo%")).or(like(col("c1"), lit("%bar"))), ); // OR-chain assert_change( regex_match(col("c1"), lit("foo|bar|baz")), - like(col("c1"), "%foo%") - .or(like(col("c1"), "%bar%")) - .or(like(col("c1"), "%baz%")), + like(col("c1"), lit("%foo%")) + .or(like(col("c1"), lit("%bar%"))) + .or(like(col("c1"), lit("%baz%"))), ); assert_change( regex_match(col("c1"), lit("foo|x|baz")), - like(col("c1"), "%foo%") - .or(like(col("c1"), "%x%")) - .or(like(col("c1"), "%baz%")), + like(col("c1"), lit("%foo%")) + .or(like(col("c1"), lit("%x%"))) + .or(like(col("c1"), lit("%baz%"))), ); assert_change( regex_not_match(col("c1"), lit("foo|bar|baz")), - not_like(col("c1"), "%foo%") - .and(not_like(col("c1"), "%bar%")) - .and(not_like(col("c1"), "%baz%")), + not_like(col("c1"), lit("%foo%")) + .and(not_like(col("c1"), lit("%bar%"))) + .and(not_like(col("c1"), lit("%baz%"))), ); // both anchored expressions (translated to equality) and unanchored assert_change( regex_match(col("c1"), lit("foo|^x$|baz")), - like(col("c1"), "%foo%") + like(col("c1"), lit("%foo%")) .or(col("c1").eq(lit("x"))) - .or(like(col("c1"), "%baz%")), + .or(like(col("c1"), lit("%baz%"))), ); assert_change( regex_not_match(col("c1"), lit("foo|^bar$|baz")), - not_like(col("c1"), "%foo%") + not_like(col("c1"), lit("%foo%")) .and(col("c1").not_eq(lit("bar"))) - .and(not_like(col("c1"), "%baz%")), + .and(not_like(col("c1"), lit("%baz%"))), ); // Too many patterns (MAX_REGEX_ALTERNATIONS_EXPANSION) assert_no_change(regex_match(col("c1"), lit("foo|bar|baz|blarg|bozo|etc"))); @@ -2987,41 +3054,41 @@ mod tests { }) } - fn like(expr: Expr, pattern: &str) -> Expr { + fn like(expr: Expr, pattern: impl Into) -> Expr { Expr::Like(Like { negated: false, expr: Box::new(expr), - pattern: Box::new(lit(pattern)), + pattern: Box::new(pattern.into()), escape_char: None, case_insensitive: false, }) } - fn not_like(expr: Expr, pattern: &str) -> Expr { + fn not_like(expr: Expr, pattern: impl Into) -> Expr { Expr::Like(Like { negated: true, expr: Box::new(expr), - pattern: Box::new(lit(pattern)), + pattern: Box::new(pattern.into()), escape_char: None, case_insensitive: false, }) } - fn ilike(expr: Expr, pattern: &str) -> Expr { + fn ilike(expr: Expr, pattern: impl Into) -> Expr { Expr::Like(Like { negated: false, expr: Box::new(expr), - pattern: Box::new(lit(pattern)), + pattern: Box::new(pattern.into()), escape_char: None, case_insensitive: true, }) } - fn not_ilike(expr: Expr, pattern: &str) -> Expr { + fn not_ilike(expr: Expr, pattern: impl Into) -> Expr { Expr::Like(Like { negated: true, expr: Box::new(expr), - pattern: Box::new(lit(pattern)), + pattern: Box::new(pattern.into()), escape_char: None, case_insensitive: true, }) @@ -3633,32 +3700,123 @@ mod tests { #[test] fn test_like_and_ilke() { - // LIKE '%' - let expr = like(col("c1"), "%"); + let null = lit(ScalarValue::Utf8(None)); + + // expr [NOT] [I]LIKE NULL + let expr = like(col("c1"), null.clone()); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_like(col("c1"), null.clone()); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = ilike(col("c1"), null.clone()); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_ilike(col("c1"), null.clone()); + assert_eq!(simplify(expr), lit_bool_null()); + + // expr [NOT] [I]LIKE '%' + let expr = like(col("c1"), lit("%")); + assert_eq!(simplify(expr), if_not_null(col("c1"), true)); + + let expr = not_like(col("c1"), lit("%")); + assert_eq!(simplify(expr), if_not_null(col("c1"), false)); + + let expr = ilike(col("c1"), lit("%")); + assert_eq!(simplify(expr), if_not_null(col("c1"), true)); + + let expr = not_ilike(col("c1"), lit("%")); + assert_eq!(simplify(expr), if_not_null(col("c1"), false)); + + // expr [NOT] [I]LIKE '%%' + let expr = like(col("c1"), lit("%%")); assert_eq!(simplify(expr), if_not_null(col("c1"), true)); - let expr = not_like(col("c1"), "%"); + let expr = not_like(col("c1"), lit("%%")); assert_eq!(simplify(expr), if_not_null(col("c1"), false)); - let expr = ilike(col("c1"), "%"); + let expr = ilike(col("c1"), lit("%%")); assert_eq!(simplify(expr), if_not_null(col("c1"), true)); - let expr = not_ilike(col("c1"), "%"); + let expr = not_ilike(col("c1"), lit("%%")); assert_eq!(simplify(expr), if_not_null(col("c1"), false)); - // null_constant LIKE '%' + // not_null_expr [NOT] [I]LIKE '%' + let expr = like(col("c1_non_null"), lit("%")); + assert_eq!(simplify(expr), lit(true)); + + let expr = not_like(col("c1_non_null"), lit("%")); + assert_eq!(simplify(expr), lit(false)); + + let expr = ilike(col("c1_non_null"), lit("%")); + assert_eq!(simplify(expr), lit(true)); + + let expr = not_ilike(col("c1_non_null"), lit("%")); + assert_eq!(simplify(expr), lit(false)); + + // not_null_expr [NOT] [I]LIKE '%%' + let expr = like(col("c1_non_null"), lit("%%")); + assert_eq!(simplify(expr), lit(true)); + + let expr = not_like(col("c1_non_null"), lit("%%")); + assert_eq!(simplify(expr), lit(false)); + + let expr = ilike(col("c1_non_null"), lit("%%")); + assert_eq!(simplify(expr), lit(true)); + + let expr = not_ilike(col("c1_non_null"), lit("%%")); + assert_eq!(simplify(expr), lit(false)); + + // null_constant [NOT] [I]LIKE '%' + let expr = like(null.clone(), lit("%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_like(null.clone(), lit("%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = ilike(null.clone(), lit("%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_ilike(null, lit("%")); + assert_eq!(simplify(expr), lit_bool_null()); + + // null_constant [NOT] [I]LIKE '%%' + let null = lit(ScalarValue::Utf8(None)); + let expr = like(null.clone(), lit("%%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_like(null.clone(), lit("%%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = ilike(null.clone(), lit("%%")); + assert_eq!(simplify(expr), lit_bool_null()); + + let expr = not_ilike(null, lit("%%")); + assert_eq!(simplify(expr), lit_bool_null()); + + // null_constant [NOT] [I]LIKE 'a%' let null = lit(ScalarValue::Utf8(None)); - let expr = like(null.clone(), "%"); + let expr = like(null.clone(), lit("a%")); assert_eq!(simplify(expr), lit_bool_null()); - let expr = not_like(null.clone(), "%"); + let expr = not_like(null.clone(), lit("a%")); assert_eq!(simplify(expr), lit_bool_null()); - let expr = ilike(null.clone(), "%"); + let expr = ilike(null.clone(), lit("a%")); assert_eq!(simplify(expr), lit_bool_null()); - let expr = not_ilike(null, "%"); + let expr = not_ilike(null, lit("a%")); assert_eq!(simplify(expr), lit_bool_null()); + + // expr [NOT] [I]LIKE with pattern without wildcards + let expr = like(col("c1"), lit("a")); + assert_eq!(simplify(expr), col("c1").eq(lit("a"))); + let expr = not_like(col("c1"), lit("a")); + assert_eq!(simplify(expr), col("c1").not_eq(lit("a"))); + let expr = like(col("c1"), lit("a_")); + assert_eq!(simplify(expr), col("c1").like(lit("a_"))); + let expr = not_like(col("c1"), lit("a_")); + assert_eq!(simplify(expr), col("c1").not_like(lit("a_"))); } #[test] @@ -4034,6 +4192,7 @@ mod tests { Ok(DataType::Int16) } } + #[test] fn test_optimize_volatile_conditions() { let fun = Arc::new(ScalarUDF::new_from_impl(VolatileUdf::new())); diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 1993598f5cf7..a55fecec98f6 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -66,7 +66,7 @@ use datafusion_expr::{ SubqueryAlias, TableScan, Values, Window, }, DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr, - WindowUDF, + Statement, WindowUDF, }; use datafusion_expr::{AggregateUDF, ColumnUnnestList, FetchType, SkipType, Unnest}; @@ -1502,11 +1502,11 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } - LogicalPlan::Prepare(Prepare { + LogicalPlan::Statement(Statement::Prepare(Prepare { name, data_types, input, - }) => { + })) => { let input = LogicalPlanNode::try_from_logical_plan(input, extension_codec)?; Ok(LogicalPlanNode { @@ -1633,9 +1633,6 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::RecursiveQuery(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for RecursiveQuery", )), - LogicalPlan::Execute(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Execute", - )), } } } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 149e094f3d6e..f3988f463268 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -141,7 +141,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?; expr = self.rewrite_partial_qualifier(expr, schema); self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?; - let expr = expr.infer_placeholder_types(schema)?; + let (expr, _) = expr.infer_placeholder_types(schema)?; Ok(expr) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 00949aa13ae1..0ac804b706c8 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -46,10 +46,10 @@ use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody, - CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable, - DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation, - Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, - OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr, + CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, Deallocate, + DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, + EmptyRelation, Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, + LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, @@ -636,11 +636,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { *statement, &mut planner_context, )?; - Ok(LogicalPlan::Prepare(Prepare { + Ok(LogicalPlan::Statement(PlanStatement::Prepare(Prepare { name: ident_to_string(&name), data_types, input: Arc::new(plan), - })) + }))) } Statement::Execute { name, @@ -660,12 +660,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context)) .collect::>>()?; - Ok(LogicalPlan::Execute(Execute { + Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute { name: ident_to_string(&name), parameters, - schema: DFSchemaRef::new(empty_schema), - })) + }))) } + Statement::Deallocate { + name, + // Similar to PostgreSQL, the PREPARE keyword is ignored + prepare: _, + } => Ok(LogicalPlan::Statement(PlanStatement::Deallocate( + Deallocate { + name: ident_to_string(&name), + }, + ))), Statement::ShowTables { extended, @@ -841,7 +849,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionStart(TransactionStart { access_mode, isolation_level, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -849,7 +856,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Commit, chain, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -860,7 +866,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Rollback, chain, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) } @@ -1535,7 +1540,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let statement = PlanStatement::SetVariable(SetVariable { variable: variable_lower, value: value_string, - schema: DFSchemaRef::new(DFSchema::empty()), }); Ok(LogicalPlan::Statement(statement)) diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 8167ddacffb4..c46bc6206379 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -111,8 +111,6 @@ impl Unparser<'_> { LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) - | LogicalPlan::Prepare(_) - | LogicalPlan::Execute(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 698c408e538f..b2f128778a1c 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -33,7 +33,7 @@ use datafusion_expr::{ logical_plan::{LogicalPlan, Prepare}, test::function_stub::sum_udaf, ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF, - ScalarUDFImpl, Signature, Volatility, + ScalarUDFImpl, Signature, Statement, Volatility, }; use datafusion_functions::{string, unicode}; use datafusion_sql::{ @@ -2710,7 +2710,9 @@ fn prepare_stmt_quick_test( assert_eq!(format!("{assert_plan}"), expected_plan); // verify data types - if let LogicalPlan::Prepare(Prepare { data_types, .. }) = assert_plan { + if let LogicalPlan::Statement(Statement::Prepare(Prepare { data_types, .. })) = + assert_plan + { let dt = format!("{data_types:?}"); assert_eq!(dt, expected_data_types); } diff --git a/datafusion/sqllogictest/test_files/prepare.slt b/datafusion/sqllogictest/test_files/prepare.slt index b0c67af9e14f..c3bd15c08626 100644 --- a/datafusion/sqllogictest/test_files/prepare.slt +++ b/datafusion/sqllogictest/test_files/prepare.slt @@ -64,6 +64,10 @@ PREPARE my_plan AS SELECT $1; statement error Prepared statement \'my_plan\' does not exist EXECUTE my_plan('Foo', 'Bar'); +# deallocate a non-existing plan +statement error Prepared statement \'my_plan\' does not exist +DEALLOCATE my_plan; + statement ok PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter); @@ -77,6 +81,28 @@ EXECUTE my_plan('Foo', 'Bar'); statement error Prepared statement \'my_plan\' already exists PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter); +# deallocate a plan +statement ok +DEALLOCATE my_plan; + +# can't EXECUTE a deallocated plan +statement error Prepared statement \'my_plan\' does not exist +EXECUTE my_plan('Foo', 'Bar'); + +# re-prepare a deallocated plan +statement ok +PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter); + +query IT +EXECUTE my_plan('Foo', 'Bar'); +---- +1 Foo +2 Bar + +# deallocate with the PREPARE keyword +statement ok +DEALLOCATE PREPARE my_plan; + statement error Prepare specifies 1 data types but query has 0 parameters PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = 10; @@ -89,6 +115,9 @@ EXECUTE my_plan2; ---- 1 20 +statement ok +DEALLOCATE my_plan2; + statement ok PREPARE my_plan3(INT) AS SELECT $1; @@ -97,6 +126,9 @@ EXECUTE my_plan3(10); ---- 10 +statement ok +DEALLOCATE my_plan3; + statement ok PREPARE my_plan4(INT) AS SELECT 1 + $1; @@ -105,6 +137,9 @@ EXECUTE my_plan4(10); ---- 11 +statement ok +DEALLOCATE my_plan4; + statement ok PREPARE my_plan5(INT, DOUBLE) AS SELECT 1 + $1 + $2; @@ -113,6 +148,9 @@ EXECUTE my_plan5(10, 20.5); ---- 31.5 +statement ok +DEALLOCATE my_plan5; + statement ok PREPARE my_plan6(INT) AS SELECT id, age FROM person WHERE age = $1; @@ -140,6 +178,9 @@ EXECUTE my_plan6('foo'); statement error Unsupported parameter type EXECUTE my_plan6(10 + 20); +statement ok +DEALLOCATE my_plan6; + statement ok PREPARE my_plan7(INT, STRING, DOUBLE, INT, DOUBLE, STRING) AS @@ -150,6 +191,9 @@ EXECUTE my_plan7(10, 'jane', 99999.45, 20, 200000.45, 'foo'); ---- 1 20 foo +statement ok +DEALLOCATE my_plan7; + statement ok PREPARE my_plan8(INT, DOUBLE, DOUBLE, DOUBLE) AS @@ -161,6 +205,9 @@ EXECUTE my_plan8(100000, 99999.45, 100000.45, 200000.45); ---- 1 20 +statement ok +DEALLOCATE my_plan8; + statement ok PREPARE my_plan9(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t (num, letter); @@ -170,6 +217,8 @@ EXECUTE my_plan9('Foo', 'Bar'); 1 Foo 2 Bar +statement ok +DEALLOCATE my_plan9; # Test issue: https://github.com/apache/datafusion/issues/12294 # prepare argument is in the LIMIT clause @@ -196,22 +245,40 @@ EXECUTE get_N_rand_ints_from_last_run(2); 1 1 +statement ok +DEALLOCATE get_N_rand_ints_from_last_run; + statement ok DROP TABLE test; +statement ok +SET datafusion.explain.logical_plan_only=true; + +# OptimizeProjections rule works with PREPARE and pushes down the `id` projection to TableScan +query TT +EXPLAIN PREPARE my_plan(INT) AS SELECT id + $1 FROM person; +---- +logical_plan +01)Prepare: "my_plan" [Int32] +02)--Projection: person.id + $1 +03)----TableScan: person projection=[id] # test creating logical plan for EXECUTE statements query TT EXPLAIN EXECUTE my_plan; ---- logical_plan Execute: my_plan params=[] -physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query TT EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo'); ---- logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")] -physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query error DataFusion error: Schema error: No field named a\. EXPLAIN EXECUTE my_plan(a); + +statement ok +SET datafusion.explain.logical_plan_only=false; + +statement ok +DROP TABLE person; diff --git a/datafusion/sqllogictest/test_files/string/init_data.slt.part b/datafusion/sqllogictest/test_files/string/init_data.slt.part index e1248c73da5f..9cdeff1977ee 100644 --- a/datafusion/sqllogictest/test_files/string/init_data.slt.part +++ b/datafusion/sqllogictest/test_files/string/init_data.slt.part @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +# TODO (https://github.com/apache/datafusion/issues/12637): add a row with '%%' pattern statement ok create table test_source as values ('Andrew', 'X', 'datafusion📊🔥', '🔥'), diff --git a/datafusion/sqllogictest/test_files/string/string_literal.slt b/datafusion/sqllogictest/test_files/string/string_literal.slt index 57261470f6eb..493da64063bc 100644 --- a/datafusion/sqllogictest/test_files/string/string_literal.slt +++ b/datafusion/sqllogictest/test_files/string/string_literal.slt @@ -1623,3 +1623,44 @@ a\_c \%abc false \%abc a\_c false \%abc %abc true \%abc \%abc false + +# test utf8, largeutf8, utf8view, DictionaryString for bit_length +query IIII +SELECT + bit_length('Andrew'), + bit_length('datafusion数据融合'), + bit_length('💖'), + bit_length('josé') +; +---- +48 176 32 40 + +query IIII +SELECT + bit_length(arrow_cast('Andrew', 'LargeUtf8')), + bit_length(arrow_cast('datafusion数据融合', 'LargeUtf8')), + bit_length(arrow_cast('💖', 'LargeUtf8')), + bit_length(arrow_cast('josé', 'LargeUtf8')) +; +---- +48 176 32 40 + +query IIII +SELECT + bit_length(arrow_cast('Andrew', 'Utf8View')), + bit_length(arrow_cast('datafusion数据融合', 'Utf8View')), + bit_length(arrow_cast('💖', 'Utf8View')), + bit_length(arrow_cast('josé', 'Utf8View')) +; +---- +48 176 32 40 + +query IIII +SELECT + bit_length(arrow_cast('Andrew', 'Dictionary(Int32, Utf8)')), + bit_length(arrow_cast('datafusion数据融合', 'Dictionary(Int32, Utf8)')), + bit_length(arrow_cast('💖', 'Dictionary(Int32, Utf8)')), + bit_length(arrow_cast('josé', 'Dictionary(Int32, Utf8)')) +; +---- +48 176 32 40 diff --git a/datafusion/sqllogictest/test_files/string/string_query.slt.part b/datafusion/sqllogictest/test_files/string/string_query.slt.part index 9536cfee9359..f781b9dc33ca 100644 --- a/datafusion/sqllogictest/test_files/string/string_query.slt.part +++ b/datafusion/sqllogictest/test_files/string/string_query.slt.part @@ -953,45 +953,43 @@ NULL NULL NULL NULL NULL #Raphael datafusionДатаФусион false false false false #NULL NULL NULL NULL NULL NULL -# TODO (https://github.com/apache/datafusion/issues/12637) uncomment additional test projections -query TTBB +query TTBBBB SELECT ascii_1, unicode_1, ascii_1 LIKE '%' AS ascii_1_like_percent, - unicode_1 LIKE '%' AS unicode_1_like_percent - -- ascii_1 LIKE '%%' AS ascii_1_like_percent_percent, -- TODO enable after fixing https://github.com/apache/datafusion/issues/12637 - -- unicode_1 LIKE '%%' AS unicode_1_like_percent_percent -- TODO enable after fixing https://github.com/apache/datafusion/issues/12637 + unicode_1 LIKE '%' AS unicode_1_like_percent, + ascii_1 LIKE '%%' AS ascii_1_like_percent_percent, + unicode_1 LIKE '%%' AS unicode_1_like_percent_percent FROM test_basic_operator ---- -Andrew datafusion📊🔥 true true -Xiangpeng datafusion数据融合 true true -Raphael datafusionДатаФусион true true -under_score un iść core true true -percent pan Tadeusz ma iść w kąt true true -(empty) (empty) true true -% (empty) true true -_ (empty) true true -NULL NULL NULL NULL -NULL NULL NULL NULL +Andrew datafusion📊🔥 true true true true +Xiangpeng datafusion数据融合 true true true true +Raphael datafusionДатаФусион true true true true +under_score un iść core true true true true +percent pan Tadeusz ma iść w kąt true true true true +(empty) (empty) true true true true +% (empty) true true true true +_ (empty) true true true true +NULL NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL NULL -# TODO (https://github.com/apache/datafusion/issues/12637) uncomment additional test projections -query TTBB +query TTBBBB SELECT ascii_1, unicode_1, ascii_1 NOT LIKE '%' AS ascii_1_not_like_percent, - unicode_1 NOT LIKE '%' AS unicode_1_not_like_percent - -- ascii_1 NOT LIKE '%%' AS ascii_1_not_like_percent_percent, -- TODO enable after fixing https://github.com/apache/datafusion/issues/12637 - -- unicode_1 NOT LIKE '%%' AS unicode_1_not_like_percent_percent -- TODO enable after fixing https://github.com/apache/datafusion/issues/12637 + unicode_1 NOT LIKE '%' AS unicode_1_not_like_percent, + ascii_1 NOT LIKE '%%' AS ascii_1_not_like_percent_percent, + unicode_1 NOT LIKE '%%' AS unicode_1_not_like_percent_percent FROM test_basic_operator ---- -Andrew datafusion📊🔥 false false -Xiangpeng datafusion数据融合 false false -Raphael datafusionДатаФусион false false -under_score un iść core false false -percent pan Tadeusz ma iść w kąt false false -(empty) (empty) false false -% (empty) false false -_ (empty) false false -NULL NULL NULL NULL -NULL NULL NULL NULL +Andrew datafusion📊🔥 false false false false +Xiangpeng datafusion数据融合 false false false false +Raphael datafusionДатаФусион false false false false +under_score un iść core false false false false +percent pan Tadeusz ma iść w kąt false false false false +(empty) (empty) false false false false +% (empty) false false false false +_ (empty) false false false false +NULL NULL NULL NULL NULL NULL +NULL NULL NULL NULL NULL NULL query T SELECT ascii_1 FROM test_basic_operator WHERE ascii_1 LIKE '%' diff --git a/datafusion/sqllogictest/test_files/string/string_view.slt b/datafusion/sqllogictest/test_files/string/string_view.slt index 43b08cb25f3f..ce8a295373aa 100644 --- a/datafusion/sqllogictest/test_files/string/string_view.slt +++ b/datafusion/sqllogictest/test_files/string/string_view.slt @@ -93,6 +93,7 @@ select octet_length(column1_utf8view) from test; 0 NULL +# TODO: Revisit this issue after upgrading to the arrow-rs version that includes apache/arrow-rs#6671. query error DataFusion error: Arrow error: Compute error: bit_length not supported for Utf8View select bit_length(column1_utf8view) from test; @@ -391,12 +392,12 @@ drop table test_lowercase ## Ensure no casts for LIKE/ILIKE query TT EXPLAIN SELECT - column1_utf8view like 'foo' as "like", - column1_utf8view ilike 'foo' as "ilike" + column1_utf8view like '%foo%' as "like", + column1_utf8view ilike '%foo%' as "ilike" FROM test; ---- logical_plan -01)Projection: test.column1_utf8view LIKE Utf8View("foo") AS like, test.column1_utf8view ILIKE Utf8View("foo") AS ilike +01)Projection: test.column1_utf8view LIKE Utf8View("%foo%") AS like, test.column1_utf8view ILIKE Utf8View("%foo%") AS ilike 02)--TableScan: test projection=[column1_utf8view] diff --git a/dev/release/README.md b/dev/release/README.md index 0e0daa9d6c40..cac8ea654f40 100644 --- a/dev/release/README.md +++ b/dev/release/README.md @@ -263,6 +263,7 @@ Verify that the Cargo.toml in the tarball contains the correct version (cd datafusion/expr-common && cargo publish) (cd datafusion/physical-expr-common && cargo publish) (cd datafusion/functions-aggregate-common && cargo publish) +(cd datafusion/functions-window-common && cargo publish) (cd datafusion/expr && cargo publish) (cd datafusion/execution && cargo publish) (cd datafusion/physical-expr && cargo publish) @@ -280,6 +281,7 @@ Verify that the Cargo.toml in the tarball contains the correct version (cd datafusion/proto-common && cargo publish) (cd datafusion/proto && cargo publish) (cd datafusion/substrait && cargo publish) +(cd datafusion/ffi && cargo publish) ``` The CLI needs a `--no-verify` argument because `build.rs` generates source into the `src` directory. diff --git a/dev/update_datafusion_versions.py b/dev/update_datafusion_versions.py index 4cb479b6dffd..cf72e5a4159d 100755 --- a/dev/update_datafusion_versions.py +++ b/dev/update_datafusion_versions.py @@ -32,6 +32,7 @@ 'datafusion': 'datafusion/core/Cargo.toml', 'datafusion-execution': 'datafusion/execution/Cargo.toml', 'datafusion-expr': 'datafusion/expr/Cargo.toml', + 'datafusion-ffi': 'datafusion/ffi/Cargo.toml', 'datafusion-functions': 'datafusion/functions/Cargo.toml', 'datafusion-functions-aggregate': 'datafusion/functions-aggregate/Cargo.toml', 'datafusion-functions-nested': 'datafusion/functions-nested/Cargo.toml', diff --git a/docs/source/conf.py b/docs/source/conf.py index f64c71f399fd..51408f4fa76f 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -119,3 +119,9 @@ # enable nice rendering of checkboxes for the task lists myst_enable_extensions = ["colon_fence", "deflist", "tasklist"] + +# Some code blocks (sql) are not being highlighted correctly, due to the +# presence of some special characters like: 🚀, å, {,... But this isn’t a major +# issue for our documentation. So, suppress these warnings to keep our build +# log cleaner. +suppress_warnings = ['misc.highlighting_failure'] diff --git a/docs/source/contributor-guide/howtos.md b/docs/source/contributor-guide/howtos.md index f105ab2c42db..e303c2a0f07d 100644 --- a/docs/source/contributor-guide/howtos.md +++ b/docs/source/contributor-guide/howtos.md @@ -49,6 +49,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data - Run `./dev/update_function_docs.sh` to update docs [advanced_udf.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_udaf.rs +[datafusion/expr/src]: https://github.com/apache/datafusion/tree/main/datafusion/expr/src [sqllogictest/test_files]: https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest/test_files ## How to add a new aggregate function @@ -56,7 +57,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data Below is a checklist of what you need to do to add a new aggregate function to DataFusion: - Add the actual implementation of an `Accumulator` and `AggregateExpr`: -- In [datafusion/expr/src](../../../datafusion/expr/src/aggregate_function.rs), add: +- In [datafusion/expr/src], add: - a new variant to `AggregateFunction` - a new entry to `FromStr` with the name of the function as called by SQL - a new line in `return_type` with the expected return type of the function, given an incoming type @@ -131,3 +132,17 @@ After you've confirmed your `taplo` version, you can format all the `.toml` file ```bash taplo fmt ``` + +## How to update protobuf/gen dependencies + +The prost/tonic code can be generated by running `./regen.sh`, which in turn invokes the Rust binary located in [gen](./gen) + +This is necessary after modifying the protobuf definitions or altering the dependencies of [gen](./gen), and requires a +valid installation of [protoc] (see [installation instructions] for details). + +```bash +./regen.sh +``` + +[protoc]: https://github.com/protocolbuffers/protobuf#protocol-compiler-installation +[installation instructions]: https://datafusion.apache.org/contributor-guide/getting_started.html#protoc-installation diff --git a/docs/source/library-user-guide/profiling.md b/docs/source/library-user-guide/profiling.md index 02f6958d1728..40fae6f44705 100644 --- a/docs/source/library-user-guide/profiling.md +++ b/docs/source/library-user-guide/profiling.md @@ -21,7 +21,7 @@ The section contains examples how to perform CPU profiling for Apache DataFusion on different operating systems. -### Building a flamegraph +## Building a flamegraph [Video: how to CPU profile DataFusion with a Flamegraph](https://youtu.be/2z11xtYw_xs) @@ -34,7 +34,7 @@ in images such as this: ## MacOS -#### Step 1: Install the flamegraph Tool +### Step 1: Install the flamegraph Tool To install flamegraph, run: @@ -42,11 +42,11 @@ To install flamegraph, run: cargo install flamegraph ``` -#### Step 2: Prepare Your Environment +### Step 2: Prepare Your Environment Ensure that you're in the directory containing the necessary data files for your DataFusion query. The flamegraph tool will profile the execution of your query against this data. -#### Step 3: Running the Flamegraph Tool +### Step 3: Running the Flamegraph Tool To generate a flamegraph, you'll need to use the -- separator to pass arguments to the binary you're profiling. For datafusion-cli, you need to make sure to run the command with sudo permissions (especially on macOS, where DTrace requires elevated privileges). diff --git a/docs/source/library-user-guide/query-optimizer.md b/docs/source/library-user-guide/query-optimizer.md index 5aacfaf59cb1..c2c60af85f4c 100644 --- a/docs/source/library-user-guide/query-optimizer.md +++ b/docs/source/library-user-guide/query-optimizer.md @@ -68,7 +68,7 @@ let optimizer = Optimizer::with_rules(vec![ ## Writing Optimization Rules Please refer to the -[optimizer_rule.rs](../../datafusion-examples/examples/optimizer_rule.rs) +[optimizer_rule.rs](../../../datafusion-examples/examples/optimizer_rule.rs) example to learn more about the general approach to writing optimizer rules and then move onto studying the existing rules. diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index b92b815d7c95..232efb02d423 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -3983,44 +3983,42 @@ make_map(['key1', 'key2'], ['value1', 'value2']) #### Example -````sql - -- Using map function - SELECT MAP('type', 'test'); - ---- - {type: test} - - SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]); - ---- - {POST: 41, HEAD: 33, PATCH: } +```sql +-- Using map function +SELECT MAP('type', 'test'); +---- +{type: test} - SELECT MAP([[1,2], [3,4]], ['a', 'b']); - ---- - {[1, 2]: a, [3, 4]: b} +SELECT MAP(['POST', 'HEAD', 'PATCH'], [41, 33, null]); +---- +{POST: 41, HEAD: 33, PATCH: } - SELECT MAP { 'a': 1, 'b': 2 }; - ---- - {a: 1, b: 2} +SELECT MAP([[1,2], [3,4]], ['a', 'b']); +---- +{[1, 2]: a, [3, 4]: b} - -- Using make_map function - SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]); - ---- - {POST: 41, HEAD: 33} +SELECT MAP { 'a': 1, 'b': 2 }; +---- +{a: 1, b: 2} - SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]); - ---- - {key1: value1, key2: } - ``` +-- Using make_map function +SELECT MAKE_MAP(['POST', 'HEAD'], [41, 33]); +---- +{POST: 41, HEAD: 33} +SELECT MAKE_MAP(['key1', 'key2'], ['value1', null]); +---- +{key1: value1, key2: } +``` ### `map_extract` Returns a list containing the value for the given key or an empty list if the key is not present in the map. -```` - +``` map_extract(map, key) +``` -```` #### Arguments - **map**: Map expression. Can be a constant, column, or function, and any combination of map operators. @@ -4040,7 +4038,7 @@ SELECT map_extract(MAP {1: 'one', 2: 'two'}, 2); SELECT map_extract(MAP {'x': 10, 'y': NULL, 'z': 30}, 'y'); ---- [] -```` +``` #### Aliases diff --git a/test-utils/src/array_gen/binary.rs b/test-utils/src/array_gen/binary.rs new file mode 100644 index 000000000000..d342118fa85d --- /dev/null +++ b/test-utils/src/array_gen/binary.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + ArrayRef, BinaryViewArray, GenericBinaryArray, OffsetSizeTrait, UInt32Array, +}; +use arrow::compute; +use rand::rngs::StdRng; +use rand::Rng; + +/// Randomly generate binary arrays +pub struct BinaryArrayGenerator { + /// The maximum length of each binary element + pub max_len: usize, + /// The total number of binaries in the output + pub num_binaries: usize, + /// The number of distinct binary values in the columns + pub num_distinct_binaries: usize, + /// The percentage of nulls in the columns + pub null_pct: f64, + /// Random number generator + pub rng: StdRng, +} + +impl BinaryArrayGenerator { + /// Creates a BinaryArray or LargeBinaryArray with random binary data. + pub fn gen_data(&mut self) -> ArrayRef { + let distinct_binaries: GenericBinaryArray = (0..self.num_distinct_binaries) + .map(|_| Some(random_binary(&mut self.rng, self.max_len))) + .collect(); + + // Pick num_binaries randomly from the distinct binary table + let indices: UInt32Array = (0..self.num_binaries) + .map(|_| { + if self.rng.gen::() < self.null_pct { + None + } else if self.num_distinct_binaries > 1 { + let range = 0..(self.num_distinct_binaries as u32); + Some(self.rng.gen_range(range)) + } else { + Some(0) + } + }) + .collect(); + + compute::take(&distinct_binaries, &indices, None).unwrap() + } + + /// Creates a BinaryViewArray with random binary data. + pub fn gen_binary_view(&mut self) -> ArrayRef { + let distinct_binary_views: BinaryViewArray = (0..self.num_distinct_binaries) + .map(|_| Some(random_binary(&mut self.rng, self.max_len))) + .collect(); + + let indices: UInt32Array = (0..self.num_binaries) + .map(|_| { + if self.rng.gen::() < self.null_pct { + None + } else if self.num_distinct_binaries > 1 { + let range = 0..(self.num_distinct_binaries as u32); + Some(self.rng.gen_range(range)) + } else { + Some(0) + } + }) + .collect(); + + compute::take(&distinct_binary_views, &indices, None).unwrap() + } +} + +/// Return a binary vector of random bytes of length 1..=max_len +fn random_binary(rng: &mut StdRng, max_len: usize) -> Vec { + if max_len == 0 { + Vec::new() + } else { + let len = rng.gen_range(1..=max_len); + (0..len).map(|_| rng.gen()).collect() + } +} diff --git a/test-utils/src/array_gen/mod.rs b/test-utils/src/array_gen/mod.rs index 8e0e39ddfdce..d076bb1b6f0b 100644 --- a/test-utils/src/array_gen/mod.rs +++ b/test-utils/src/array_gen/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod binary; mod decimal; mod primitive; mod random_data; mod string; +pub use binary::BinaryArrayGenerator; pub use decimal::DecimalArrayGenerator; pub use primitive::PrimitiveArrayGenerator; pub use string::StringArrayGenerator; diff --git a/test-utils/src/array_gen/primitive.rs b/test-utils/src/array_gen/primitive.rs index 2469cbf44660..500a68143f03 100644 --- a/test-utils/src/array_gen/primitive.rs +++ b/test-utils/src/array_gen/primitive.rs @@ -56,10 +56,13 @@ impl PrimitiveArrayGenerator { | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) - | DataType::Interval(_) => (0..self.num_distinct_primitives) + | DataType::Interval(_) + | DataType::Binary + | DataType::LargeBinary + | DataType::BinaryView + | DataType::Timestamp(_, _) => (0..self.num_distinct_primitives) .map(|_| Some(A::generate_random_native_data(&mut self.rng))) .collect(), - _ => { let arrow_type = A::DATA_TYPE; panic!("Unsupported arrow data type: {arrow_type}") diff --git a/test-utils/src/array_gen/random_data.rs b/test-utils/src/array_gen/random_data.rs index 23227100d73f..a7297d45fdf0 100644 --- a/test-utils/src/array_gen/random_data.rs +++ b/test-utils/src/array_gen/random_data.rs @@ -21,8 +21,9 @@ use arrow::datatypes::{ Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano, IntervalMonthDayNanoType, IntervalYearMonthType, Time32MillisecondType, Time32SecondType, - Time64MicrosecondType, Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, + Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, }; use rand::distributions::Standard; use rand::prelude::Distribution; @@ -66,6 +67,10 @@ basic_random_data!(Time64MicrosecondType); basic_random_data!(Time64NanosecondType); basic_random_data!(IntervalYearMonthType); basic_random_data!(Decimal128Type); +basic_random_data!(TimestampSecondType); +basic_random_data!(TimestampMillisecondType); +basic_random_data!(TimestampMicrosecondType); +basic_random_data!(TimestampNanosecondType); impl RandomNativeData for Date64Type { fn generate_random_native_data(rng: &mut StdRng) -> Self::Native {