Skip to content

Commit

Permalink
Support duplicate column aliases in queries
Browse files Browse the repository at this point in the history
In SQL, selecting single column multiple times is legal and most modern
databases support this. This commit adds such support to DataFusion too.
  • Loading branch information
findepi committed Nov 19, 2024
1 parent ce87bc5 commit 7d63cee
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 116 deletions.
59 changes: 11 additions & 48 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
//! fields with optional relation names.
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -154,7 +154,6 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand Down Expand Up @@ -183,7 +182,6 @@ impl DFSchema {
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

Expand All @@ -201,7 +199,6 @@ impl DFSchema {
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
schema.check_names()?;
Ok(schema)
}

Expand All @@ -215,40 +212,9 @@ impl DFSchema {
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
dfschema.check_names()?;
Ok(dfschema)
}

/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
let mut unqualified_names = BTreeSet::new();

for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
if let Some(qualifier) = qualifier {
if !qualified_names.insert((qualifier, field.name())) {
return _schema_err!(SchemaError::DuplicateQualifiedField {
qualifier: Box::new(qualifier.clone()),
name: field.name().to_string(),
});
}
} else if !unqualified_names.insert(field.name()) {
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string()
});
}
}

for (qualifier, name) in qualified_names {
if unqualified_names.contains(name) {
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new(Some(qualifier.clone()), name)
});
}
}
Ok(())
}

/// Assigns functional dependencies.
pub fn with_functional_dependencies(
mut self,
Expand Down Expand Up @@ -285,7 +251,6 @@ impl DFSchema {
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
};
new_self.check_names()?;
Ok(new_self)
}

Expand Down Expand Up @@ -1141,10 +1106,10 @@ mod tests {
fn join_qualified_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let join = left.join(&right);
let join = left.join(&right)?;
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate qualified field name t1.c0",
"fields:[t1.c0, t1.c1, t1.c0, t1.c1], metadata:{}",
join.to_string()
);
Ok(())
}
Expand All @@ -1153,11 +1118,8 @@ mod tests {
fn join_unqualified_duplicate() -> Result<()> {
let left = DFSchema::try_from(test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_eq!(
join.unwrap_err().strip_backtrace(),
"Schema error: Schema contains duplicate unqualified field name c0"
);
let join = left.join(&right)?;
assert_eq!("fields:[c0, c1, c0, c1], metadata:{}", join.to_string());
Ok(())
}

Expand Down Expand Up @@ -1190,10 +1152,11 @@ mod tests {
fn join_mixed_duplicate() -> Result<()> {
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let right = DFSchema::try_from(test_schema_1())?;
let join = left.join(&right);
assert_contains!(join.unwrap_err().to_string(),
"Schema error: Schema contains qualified \
field name t1.c0 and unqualified field name c0 which would be ambiguous");
let join = left.join(&right)?;
assert_eq!(
"fields:[t1.c0, t1.c1, c0, c1], metadata:{}",
join.to_string()
);
Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ pub enum SchemaError {
qualifier: Box<TableReference>,
name: String,
},
/// Schema duplicate qualified fields with duplicate unqualified names
QualifiedFieldWithDuplicateName {
qualifier: Box<TableReference>,
name: String,
},
/// Schema contains duplicate unqualified field name
DuplicateUnqualifiedField { name: String },
/// No field with this name
Expand Down Expand Up @@ -188,6 +193,14 @@ impl Display for SchemaError {
quote_identifier(name)
)
}
Self::QualifiedFieldWithDuplicateName { qualifier, name } => {
write!(
f,
"Schema contains qualified fields with duplicate unqualified names {}.{}",
qualifier.to_quoted_string(),
quote_identifier(name)
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
Expand Down
31 changes: 9 additions & 22 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1557,8 +1557,6 @@ pub fn project(
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
}
}
validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
}

Expand Down Expand Up @@ -1966,7 +1964,7 @@ mod tests {
use crate::logical_plan::StringifiedPlan;
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};

use datafusion_common::{RecursionUnnestOption, SchemaError};
use datafusion_common::RecursionUnnestOption;

#[test]
fn plan_builder_simple() -> Result<()> {
Expand Down Expand Up @@ -2202,25 +2200,14 @@ mod tests {
Some(vec![0, 1]),
)?
// two columns with the same name => error
.project(vec![col("id"), col("first_name").alias("id")]);

match plan {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field:
Column {
relation: Some(TableReference::Bare { table }),
name,
},
},
_,
)) => {
assert_eq!(*"employee_csv", *table);
assert_eq!("id", &name);
Ok(())
}
_ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
}
.project(vec![col("id"), col("first_name").alias("id")])?
.build()?;

let expected = "\
Projection: employee_csv.id, employee_csv.first_name AS id\
\n TableScan: employee_csv projection=[id, first_name]";
assert_eq!(expected, format!("{plan}"));
Ok(())
}

fn employee_schema() -> Schema {
Expand Down
Loading

0 comments on commit 7d63cee

Please sign in to comment.