Skip to content

Commit

Permalink
Merge branch 'main' into add-stacker-and-recursive
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion-cli/Cargo.lock
#	datafusion/optimizer/Cargo.toml
  • Loading branch information
peter-toth committed Nov 10, 2024
2 parents 534acb9 + 7ebd993 commit cd57de9
Show file tree
Hide file tree
Showing 49 changed files with 1,149 additions and 456 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::utils::{parse_identifiers_normalized, quote_identifier};
use std::sync::Arc;

/// A fully resolved path to a table of the form "catalog.schema.table"
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ResolvedTableReference {
/// The catalog (aka database) containing the table
pub catalog: Arc<str>,
Expand Down
29 changes: 29 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext {
ctx
}

/// Target of this benchmark: control that placeholders replacing does not get slower,
/// if the query does not contain placeholders at all.
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {
const COLUMNS_NUM: usize = 200;
let mut aggregates = String::new();
for i in 0..COLUMNS_NUM {
if i > 0 {
aggregates.push_str(", ");
}
aggregates.push_str(format!("MAX(a{})", i).as_str());
}
// SELECT max(attr0), ..., max(attrN) FROM t1.
let query = format!("SELECT {} FROM t1", aggregates);
let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
let rt = Runtime::new().unwrap();
let plan =
rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() });
b.iter(|| {
let plan = plan.clone();
criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap());
});
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) {
}
})
});

c.bench_function("with_param_values_many_columns", |b| {
benchmark_with_param_values_many_columns(&ctx, b);
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
22 changes: 11 additions & 11 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,11 @@ impl SessionContext {
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
self.set_variable(stmt).await
}
LogicalPlan::Prepare(Prepare {
LogicalPlan::Statement(Statement::Prepare(Prepare {
name,
input,
data_types,
}) => {
})) => {
// The number of parameters must match the specified data types length.
if !data_types.is_empty() {
let param_names = input.get_parameter_names()?;
Expand All @@ -712,7 +712,15 @@ impl SessionContext {
self.state.write().store_prepared(name, data_types, input)?;
self.return_empty_dataframe()
}
LogicalPlan::Execute(execute) => self.execute_prepared(execute),
LogicalPlan::Statement(Statement::Execute(execute)) => {
self.execute_prepared(execute)
}
LogicalPlan::Statement(Statement::Deallocate(deallocate)) => {
self.state
.write()
.remove_prepared(deallocate.name.as_str())?;
self.return_empty_dataframe()
}
plan => Ok(DataFrame::new(self.state(), plan)),
}
}
Expand Down Expand Up @@ -1773,14 +1781,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
plan_err!("Statement not supported: {}", stmt.name())
}
// TODO: Implement PREPARE as a LogicalPlan::Statement
LogicalPlan::Prepare(_) if !self.options.allow_statements => {
plan_err!("Statement not supported: PREPARE")
}
// TODO: Implement EXECUTE as a LogicalPlan::Statement
LogicalPlan::Execute(_) if !self.options.allow_statements => {
plan_err!("Statement not supported: EXECUTE")
}
_ => Ok(TreeNodeRecursion::Continue),
}
}
Expand Down
20 changes: 16 additions & 4 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,9 @@ impl SessionState {
};

for reference in references {
let resolved = &self.resolve_table_ref(reference);
if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) {
let resolved = self.resolve_table_ref(reference);
if let Entry::Vacant(v) = provider.tables.entry(resolved) {
let resolved = v.key();
if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
if let Some(table) = schema.table(&resolved.table).await? {
v.insert(provider_as_source(table));
Expand Down Expand Up @@ -933,6 +934,17 @@ impl SessionState {
pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> {
self.prepared_plans.get(name).map(Arc::clone)
}

/// Remove the prepared plan with the given name.
pub(crate) fn remove_prepared(
&mut self,
name: &str,
) -> datafusion_common::Result<()> {
match self.prepared_plans.remove(name) {
Some(_) => Ok(()),
None => exec_err!("Prepared statement '{}' does not exist", name),
}
}
}

/// A builder to be used for building [`SessionState`]'s. Defaults will
Expand Down Expand Up @@ -1599,7 +1611,7 @@ impl From<SessionState> for SessionStateBuilder {
/// having a direct dependency on the [`SessionState`] struct (and core crate)
struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<String, Arc<dyn TableSource>>,
tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
}

impl<'a> ContextProvider for SessionContextProvider<'a> {
Expand All @@ -1611,7 +1623,7 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
&self,
name: TableReference,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let name = self.state.resolve_table_ref(name).to_string();
let name = self.state.resolve_table_ref(name);
self.tables
.get(&name)
.cloned()
Expand Down
9 changes: 0 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner {
let name = statement.name();
return not_impl_err!("Unsupported logical plan: Statement({name})");
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
return not_impl_err!("Unsupported logical plan: Prepare");
}
LogicalPlan::Execute(_) => {
return not_impl_err!("Unsupported logical plan: Execute");
}
LogicalPlan::Dml(dml) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);
Expand Down
20 changes: 19 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_common::HashMap;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use rand::rngs::StdRng;
use rand::{thread_rng, Rng, SeedableRng};
use std::str;
use tokio::task::JoinSet;

// ========================================================================
Expand Down Expand Up @@ -171,6 +172,21 @@ fn baseline_config() -> DatasetGeneratorConfig {
ColumnDescr::new("time32_ms", DataType::Time32(TimeUnit::Millisecond)),
ColumnDescr::new("time64_us", DataType::Time64(TimeUnit::Microsecond)),
ColumnDescr::new("time64_ns", DataType::Time64(TimeUnit::Nanosecond)),
ColumnDescr::new("timestamp_s", DataType::Timestamp(TimeUnit::Second, None)),
ColumnDescr::new(
"timestamp_ms",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
ColumnDescr::new(
"timestamp_us",
DataType::Timestamp(TimeUnit::Microsecond, None),
),
ColumnDescr::new(
"timestamp_ns",
DataType::Timestamp(TimeUnit::Nanosecond, None),
),
ColumnDescr::new("float32", DataType::Float32),
ColumnDescr::new("float64", DataType::Float64),
ColumnDescr::new(
"interval_year_month",
DataType::Interval(IntervalUnit::YearMonth),
Expand Down Expand Up @@ -206,10 +222,12 @@ fn baseline_config() -> DatasetGeneratorConfig {
ColumnDescr::new("utf8", DataType::Utf8),
ColumnDescr::new("largeutf8", DataType::LargeUtf8),
ColumnDescr::new("utf8view", DataType::Utf8View),
// todo binary
// low cardinality columns
ColumnDescr::new("u8_low", DataType::UInt8).with_max_num_distinct(10),
ColumnDescr::new("utf8_low", DataType::Utf8).with_max_num_distinct(10),
ColumnDescr::new("binary", DataType::Binary),
ColumnDescr::new("large_binary", DataType::LargeBinary),
ColumnDescr::new("binaryview", DataType::BinaryView),
];

let min_num_rows = 512;
Expand Down
126 changes: 117 additions & 9 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
use std::sync::Arc;

use arrow::datatypes::{
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, LargeUtf8Type,
StringViewType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
BinaryType, BinaryViewType, ByteArrayType, ByteViewType, Date32Type, Date64Type,
Decimal128Type, Decimal256Type, Float32Type, Float64Type, Int16Type, Int32Type,
Int64Type, Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType,
IntervalYearMonthType, LargeBinaryType, LargeUtf8Type, StringViewType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, Utf8Type,
};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field, IntervalUnit, Schema, TimeUnit};
Expand All @@ -35,7 +37,10 @@ use rand::{
thread_rng, Rng, SeedableRng,
};
use test_utils::{
array_gen::{DecimalArrayGenerator, PrimitiveArrayGenerator, StringArrayGenerator},
array_gen::{
BinaryArrayGenerator, DecimalArrayGenerator, PrimitiveArrayGenerator,
StringArrayGenerator,
},
stagger_batch,
};

Expand Down Expand Up @@ -71,17 +76,19 @@ pub struct DatasetGeneratorConfig {
}

impl DatasetGeneratorConfig {
/// return a list of all column names
/// Return a list of all column names
pub fn all_columns(&self) -> Vec<&str> {
self.columns.iter().map(|d| d.name.as_str()).collect()
}

/// return a list of column names that are "numeric"
/// Return a list of column names that are "numeric"
pub fn numeric_columns(&self) -> Vec<&str> {
self.columns
.iter()
.filter_map(|d| {
if d.column_type.is_numeric() {
if d.column_type.is_numeric()
&& !matches!(d.column_type, DataType::Float32 | DataType::Float64)
{
Some(d.name.as_str())
} else {
None
Expand Down Expand Up @@ -278,6 +285,37 @@ macro_rules! generate_primitive_array {
}};
}

macro_rules! generate_binary_array {
(
$SELF:ident,
$NUM_ROWS:ident,
$MAX_NUM_DISTINCT:expr,
$BATCH_GEN_RNG:ident,
$ARRAY_GEN_RNG:ident,
$ARROW_TYPE:ident
) => {{
let null_pct_idx = $BATCH_GEN_RNG.gen_range(0..$SELF.candidate_null_pcts.len());
let null_pct = $SELF.candidate_null_pcts[null_pct_idx];

let max_len = $BATCH_GEN_RNG.gen_range(1..100);

let mut generator = BinaryArrayGenerator {
max_len,
num_binaries: $NUM_ROWS,
num_distinct_binaries: $MAX_NUM_DISTINCT,
null_pct,
rng: $ARRAY_GEN_RNG,
};

match $ARROW_TYPE::DATA_TYPE {
DataType::Binary => generator.gen_data::<i32>(),
DataType::LargeBinary => generator.gen_data::<i64>(),
DataType::BinaryView => generator.gen_binary_view(),
_ => unreachable!(),
}
}};
}

impl RecordBatchGenerator {
fn new(min_rows_nun: usize, max_rows_num: usize, columns: Vec<ColumnDescr>) -> Self {
let candidate_null_pcts = vec![0.0, 0.01, 0.1, 0.5];
Expand Down Expand Up @@ -527,6 +565,76 @@ impl RecordBatchGenerator {
IntervalMonthDayNanoType
)
}
DataType::Timestamp(TimeUnit::Second, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampSecondType
)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampMillisecondType
)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampMicrosecondType
)
}
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
generate_primitive_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
TimestampNanosecondType
)
}
DataType::Binary => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
BinaryType
)
}
DataType::LargeBinary => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
LargeBinaryType
)
}
DataType::BinaryView => {
generate_binary_array!(
self,
num_rows,
max_num_distinct,
batch_gen_rng,
array_gen_rng,
BinaryViewType
)
}
DataType::Decimal128(precision, scale) => {
generate_decimal_array!(
self,
Expand Down
Loading

0 comments on commit cd57de9

Please sign in to comment.