diff --git a/datafusion-common/src/column.rs b/datafusion-common/src/column.rs index 5d68916622622..02faa24b0346d 100644 --- a/datafusion-common/src/column.rs +++ b/datafusion-common/src/column.rs @@ -75,7 +75,7 @@ impl Column { } // Internal implementation of normalize - fn normalize_with_schemas( + pub fn normalize_with_schemas( self, schemas: &[&Arc], using_columns: &[HashSet], diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs index 453932a1b3fce..46321c3131271 100644 --- a/datafusion-common/src/dfschema.rs +++ b/datafusion-common/src/dfschema.rs @@ -402,6 +402,40 @@ impl Display for DFSchema { } } +/// Provides schema information needed by [Expr] methods such as +/// [Expr::nullable] and [Expr::data_type]. +/// +/// Note that this trait is implemented for &[DFSchema] which is +/// widely used in the DataFusion codebase. +pub trait ExprSchema { + /// Is this column reference nullable? + fn nullable(&self, col: &Column) -> Result; + + /// What is the datatype of this column? + fn data_type(&self, col: &Column) -> Result<&DataType>; +} + +// Implement `ExprSchema` for `Arc` +impl> ExprSchema for P { + fn nullable(&self, col: &Column) -> Result { + self.as_ref().nullable(col) + } + + fn data_type(&self, col: &Column) -> Result<&DataType> { + self.as_ref().data_type(col) + } +} + +impl ExprSchema for DFSchema { + fn nullable(&self, col: &Column) -> Result { + Ok(self.field_from_column(col)?.is_nullable()) + } + + fn data_type(&self, col: &Column) -> Result<&DataType> { + Ok(self.field_from_column(col)?.data_type()) + } +} + /// DFField wraps an Arrow field and adds an optional qualifier #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFField { diff --git a/datafusion-common/src/lib.rs b/datafusion-common/src/lib.rs index 8090fc07b2f8f..11f9bbbb7e82d 100644 --- a/datafusion-common/src/lib.rs +++ b/datafusion-common/src/lib.rs @@ -20,5 +20,5 @@ mod dfschema; mod error; pub use column::Column; -pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema}; +pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema}; pub use error::{DataFusionError, Result}; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 613c8e950c936..d81fa9d2afa66 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -595,6 +595,17 @@ impl LogicalPlanBuilder { self.join_detailed(right, join_type, join_keys, false) } + fn normalize( + plan: &LogicalPlan, + column: impl Into + Clone, + ) -> Result { + let schemas = plan.all_schemas(); + let using_columns = plan.using_columns()?; + column + .into() + .normalize_with_schemas(&schemas, &using_columns) + } + /// Apply a join with on constraint and specified null equality /// If null_equals_null is true then null == null, else null != null pub fn join_detailed( @@ -633,7 +644,10 @@ impl LogicalPlanBuilder { match (l_is_left, l_is_right, r_is_left, r_is_right) { (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)), (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)), - _ => (l.normalize(&self.plan), r.normalize(right)), + _ => ( + Self::normalize(&self.plan, l), + Self::normalize(right, r), + ), } } (Some(lr), None) => { @@ -643,9 +657,12 @@ impl LogicalPlanBuilder { right.schema().field_with_qualified_name(lr, &l.name); match (l_is_left, l_is_right) { - (Ok(_), _) => (Ok(l), r.normalize(right)), - (_, Ok(_)) => (r.normalize(&self.plan), Ok(l)), - _ => (l.normalize(&self.plan), r.normalize(right)), + (Ok(_), _) => (Ok(l), Self::normalize(right, r)), + (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)), + _ => ( + Self::normalize(&self.plan, l), + Self::normalize(right, r), + ), } } (None, Some(rr)) => { @@ -655,22 +672,25 @@ impl LogicalPlanBuilder { right.schema().field_with_qualified_name(rr, &r.name); match (r_is_left, r_is_right) { - (Ok(_), _) => (Ok(r), l.normalize(right)), - (_, Ok(_)) => (l.normalize(&self.plan), Ok(r)), - _ => (l.normalize(&self.plan), r.normalize(right)), + (Ok(_), _) => (Ok(r), Self::normalize(right, l)), + (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)), + _ => ( + Self::normalize(&self.plan, l), + Self::normalize(right, r), + ), } } (None, None) => { let mut swap = false; - let left_key = - l.clone().normalize(&self.plan).or_else(|_| { + let left_key = Self::normalize(&self.plan, l.clone()) + .or_else(|_| { swap = true; - l.normalize(right) + Self::normalize(right, l) }); if swap { - (r.normalize(&self.plan), left_key) + (Self::normalize(&self.plan, r), left_key) } else { - (left_key, r.normalize(right)) + (left_key, Self::normalize(right, r)) } } } @@ -705,11 +725,11 @@ impl LogicalPlanBuilder { let left_keys: Vec = using_keys .clone() .into_iter() - .map(|c| c.into().normalize(&self.plan)) + .map(|c| Self::normalize(&self.plan, c)) .collect::>()?; let right_keys: Vec = using_keys .into_iter() - .map(|c| c.into().normalize(right)) + .map(|c| Self::normalize(right, c)) .collect::>()?; let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect(); diff --git a/datafusion/src/logical_plan/dfschema.rs b/datafusion/src/logical_plan/dfschema.rs index 7b6471f64dd73..eb624283ea4fc 100644 --- a/datafusion/src/logical_plan/dfschema.rs +++ b/datafusion/src/logical_plan/dfschema.rs @@ -18,671 +18,4 @@ //! DFSchema is an extended schema struct that DataFusion uses to provide support for //! fields with optional relation names. -use std::collections::HashSet; -use std::convert::TryFrom; -use std::sync::Arc; - -use crate::error::{DataFusionError, Result}; -use crate::logical_plan::Column; - -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use std::fmt::{Display, Formatter}; - -/// A reference-counted reference to a `DFSchema`. -pub type DFSchemaRef = Arc; - -/// DFSchema wraps an Arrow schema and adds relation names -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DFSchema { - /// Fields - fields: Vec, -} - -impl DFSchema { - /// Creates an empty `DFSchema` - pub fn empty() -> Self { - Self { fields: vec![] } - } - - /// Create a new `DFSchema` - pub fn new(fields: Vec) -> Result { - let mut qualified_names = HashSet::new(); - let mut unqualified_names = HashSet::new(); - - for field in &fields { - if let Some(qualifier) = field.qualifier() { - if !qualified_names.insert((qualifier, field.name())) { - return Err(DataFusionError::Plan(format!( - "Schema contains duplicate qualified field name '{}'", - field.qualified_name() - ))); - } - } else if !unqualified_names.insert(field.name()) { - return Err(DataFusionError::Plan(format!( - "Schema contains duplicate unqualified field name '{}'", - field.name() - ))); - } - } - - // check for mix of qualified and unqualified field with same unqualified name - // note that we need to sort the contents of the HashSet first so that errors are - // deterministic - let mut qualified_names = qualified_names - .iter() - .map(|(l, r)| (l.to_owned(), r.to_owned())) - .collect::>(); - qualified_names.sort_by(|a, b| { - let a = format!("{}.{}", a.0, a.1); - let b = format!("{}.{}", b.0, b.1); - a.cmp(&b) - }); - for (qualifier, name) in &qualified_names { - if unqualified_names.contains(name) { - return Err(DataFusionError::Plan(format!( - "Schema contains qualified field name '{}.{}' \ - and unqualified field name '{}' which would be ambiguous", - qualifier, name, name - ))); - } - } - Ok(Self { fields }) - } - - /// Create a `DFSchema` from an Arrow schema - pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result { - Self::new( - schema - .fields() - .iter() - .map(|f| DFField::from_qualified(qualifier, f.clone())) - .collect(), - ) - } - - /// Combine two schemas - pub fn join(&self, schema: &DFSchema) -> Result { - let mut fields = self.fields.clone(); - fields.extend_from_slice(schema.fields().as_slice()); - Self::new(fields) - } - - /// Merge a schema into self - pub fn merge(&mut self, other_schema: &DFSchema) { - for field in other_schema.fields() { - // skip duplicate columns - let duplicated_field = match field.qualifier() { - Some(q) => self.field_with_name(Some(q.as_str()), field.name()).is_ok(), - // for unqualifed columns, check as unqualified name - None => self.field_with_unqualified_name(field.name()).is_ok(), - }; - if !duplicated_field { - self.fields.push(field.clone()); - } - } - } - - /// Get a list of fields - pub fn fields(&self) -> &Vec { - &self.fields - } - - /// Returns an immutable reference of a specific `Field` instance selected using an - /// offset within the internal `fields` vector - pub fn field(&self, i: usize) -> &DFField { - &self.fields[i] - } - - /// Find the index of the column with the given unqualified name - pub fn index_of(&self, name: &str) -> Result { - for i in 0..self.fields.len() { - if self.fields[i].name() == name { - return Ok(i); - } - } - Err(DataFusionError::Plan(format!( - "No field named '{}'. Valid fields are {}.", - name, - self.get_field_names() - ))) - } - - fn index_of_column_by_name( - &self, - qualifier: Option<&str>, - name: &str, - ) -> Result { - let mut matches = self - .fields - .iter() - .enumerate() - .filter(|(_, field)| match (qualifier, &field.qualifier) { - // field to lookup is qualified. - // current field is qualified and not shared between relations, compare both - // qualifier and name. - (Some(q), Some(field_q)) => q == field_q && field.name() == name, - // field to lookup is qualified but current field is unqualified. - (Some(_), None) => false, - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => field.name() == name, - }) - .map(|(idx, _)| idx); - match matches.next() { - None => Err(DataFusionError::Plan(format!( - "No field named '{}.{}'. Valid fields are {}.", - qualifier.unwrap_or(""), - name, - self.get_field_names() - ))), - Some(idx) => match matches.next() { - None => Ok(idx), - // found more than one matches - Some(_) => Err(DataFusionError::Internal(format!( - "Ambiguous reference to qualified field named '{}.{}'", - qualifier.unwrap_or(""), - name - ))), - }, - } - } - - /// Find the index of the column with the given qualifier and name - pub fn index_of_column(&self, col: &Column) -> Result { - self.index_of_column_by_name(col.relation.as_deref(), &col.name) - } - - /// Find the field with the given name - pub fn field_with_name( - &self, - qualifier: Option<&str>, - name: &str, - ) -> Result<&DFField> { - if let Some(qualifier) = qualifier { - self.field_with_qualified_name(qualifier, name) - } else { - self.field_with_unqualified_name(name) - } - } - - /// Find all fields match the given name - pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.name() == name) - .collect() - } - - /// Find the field with the given name - pub fn field_with_unqualified_name(&self, name: &str) -> Result<&DFField> { - let matches = self.fields_with_unqualified_name(name); - match matches.len() { - 0 => Err(DataFusionError::Plan(format!( - "No field with unqualified name '{}'. Valid fields are {}.", - name, - self.get_field_names() - ))), - 1 => Ok(matches[0]), - _ => Err(DataFusionError::Plan(format!( - "Ambiguous reference to field named '{}'", - name - ))), - } - } - - /// Find the field with the given qualified name - pub fn field_with_qualified_name( - &self, - qualifier: &str, - name: &str, - ) -> Result<&DFField> { - let idx = self.index_of_column_by_name(Some(qualifier), name)?; - Ok(self.field(idx)) - } - - /// Find the field with the given qualified column - pub fn field_from_column(&self, column: &Column) -> Result<&DFField> { - match &column.relation { - Some(r) => self.field_with_qualified_name(r, &column.name), - None => self.field_with_unqualified_name(&column.name), - } - } - - /// Check to see if unqualified field names matches field names in Arrow schema - pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { - self.fields - .iter() - .zip(arrow_schema.fields().iter()) - .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name()) - } - - /// Strip all field qualifier in schema - pub fn strip_qualifiers(self) -> Self { - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| f.strip_qualifier()) - .collect(), - } - } - - /// Replace all field qualifier with new value in schema - pub fn replace_qualifier(self, qualifier: &str) -> Self { - DFSchema { - fields: self - .fields - .into_iter() - .map(|f| { - DFField::new( - Some(qualifier), - f.name(), - f.data_type().to_owned(), - f.is_nullable(), - ) - }) - .collect(), - } - } - - /// Get comma-seperated list of field names for use in error messages - fn get_field_names(&self) -> String { - self.fields - .iter() - .map(|f| match f.qualifier() { - Some(qualifier) => format!("'{}.{}'", qualifier, f.name()), - None => format!("'{}'", f.name()), - }) - .collect::>() - .join(", ") - } -} - -impl From for Schema { - /// Convert DFSchema into a Schema - fn from(df_schema: DFSchema) -> Self { - Schema::new( - df_schema - .fields - .into_iter() - .map(|f| { - if f.qualifier().is_some() { - Field::new( - f.name().as_str(), - f.data_type().to_owned(), - f.is_nullable(), - ) - } else { - f.field - } - }) - .collect(), - ) - } -} - -impl From<&DFSchema> for Schema { - /// Convert DFSchema reference into a Schema - fn from(df_schema: &DFSchema) -> Self { - Schema::new(df_schema.fields.iter().map(|f| f.field.clone()).collect()) - } -} - -/// Create a `DFSchema` from an Arrow schema -impl TryFrom for DFSchema { - type Error = DataFusionError; - fn try_from(schema: Schema) -> std::result::Result { - Self::new( - schema - .fields() - .iter() - .map(|f| DFField::from(f.clone())) - .collect(), - ) - } -} - -impl From for SchemaRef { - fn from(df_schema: DFSchema) -> Self { - SchemaRef::new(df_schema.into()) - } -} - -/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes -pub trait ToDFSchema -where - Self: Sized, -{ - /// Attempt to create a DSSchema - #[allow(clippy::wrong_self_convention)] - fn to_dfschema(self) -> Result; - - /// Attempt to create a DSSchemaRef - #[allow(clippy::wrong_self_convention)] - fn to_dfschema_ref(self) -> Result { - Ok(Arc::new(self.to_dfschema()?)) - } -} - -impl ToDFSchema for Schema { - #[allow(clippy::wrong_self_convention)] - fn to_dfschema(self) -> Result { - DFSchema::try_from(self) - } -} - -impl ToDFSchema for SchemaRef { - #[allow(clippy::wrong_self_convention)] - fn to_dfschema(self) -> Result { - // Attempt to use the Schema directly if there are no other - // references, otherwise clone - match Self::try_unwrap(self) { - Ok(schema) => DFSchema::try_from(schema), - Err(schemaref) => DFSchema::try_from(schemaref.as_ref().clone()), - } - } -} - -impl ToDFSchema for Vec { - fn to_dfschema(self) -> Result { - DFSchema::new(self) - } -} - -impl Display for DFSchema { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "{}", - self.fields - .iter() - .map(|field| field.qualified_name()) - .collect::>() - .join(", ") - ) - } -} - -/// DFField wraps an Arrow field and adds an optional qualifier -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DFField { - /// Optional qualifier (usually a table or relation name) - qualifier: Option, - /// Arrow field definition - field: Field, -} - -impl DFField { - /// Creates a new `DFField` - pub fn new( - qualifier: Option<&str>, - name: &str, - data_type: DataType, - nullable: bool, - ) -> Self { - DFField { - qualifier: qualifier.map(|s| s.to_owned()), - field: Field::new(name, data_type, nullable), - } - } - - /// Create an unqualified field from an existing Arrow field - pub fn from(field: Field) -> Self { - Self { - qualifier: None, - field, - } - } - - /// Create a qualified field from an existing Arrow field - pub fn from_qualified(qualifier: &str, field: Field) -> Self { - Self { - qualifier: Some(qualifier.to_owned()), - field, - } - } - - /// Returns an immutable reference to the `DFField`'s unqualified name - pub fn name(&self) -> &String { - self.field.name() - } - - /// Returns an immutable reference to the `DFField`'s data-type - pub fn data_type(&self) -> &DataType { - self.field.data_type() - } - - /// Indicates whether this `DFField` supports null values - pub fn is_nullable(&self) -> bool { - self.field.is_nullable() - } - - /// Returns a string to the `DFField`'s qualified name - pub fn qualified_name(&self) -> String { - if let Some(qualifier) = &self.qualifier { - format!("{}.{}", qualifier, self.field.name()) - } else { - self.field.name().to_owned() - } - } - - /// Builds a qualified column based on self - pub fn qualified_column(&self) -> Column { - Column { - relation: self.qualifier.clone(), - name: self.field.name().to_string(), - } - } - - /// Builds an unqualified column based on self - pub fn unqualified_column(&self) -> Column { - Column { - relation: None, - name: self.field.name().to_string(), - } - } - - /// Get the optional qualifier - pub fn qualifier(&self) -> Option<&String> { - self.qualifier.as_ref() - } - - /// Get the arrow field - pub fn field(&self) -> &Field { - &self.field - } - - /// Return field with qualifier stripped - pub fn strip_qualifier(mut self) -> Self { - self.qualifier = None; - self - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::DataType; - - #[test] - fn from_unqualified_field() { - let field = Field::new("c0", DataType::Boolean, true); - let field = DFField::from(field); - assert_eq!("c0", field.name()); - assert_eq!("c0", field.qualified_name()); - } - - #[test] - fn from_qualified_field() { - let field = Field::new("c0", DataType::Boolean, true); - let field = DFField::from_qualified("t1", field); - assert_eq!("c0", field.name()); - assert_eq!("t1.c0", field.qualified_name()); - } - - #[test] - fn from_unqualified_schema() -> Result<()> { - let schema = DFSchema::try_from(test_schema_1())?; - assert_eq!("c0, c1", schema.to_string()); - Ok(()) - } - - #[test] - fn from_qualified_schema() -> Result<()> { - let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - assert_eq!("t1.c0, t1.c1", schema.to_string()); - Ok(()) - } - - #[test] - fn from_qualified_schema_into_arrow_schema() -> Result<()> { - let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let arrow_schema: Schema = schema.into(); - let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \ - Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }"; - assert_eq!(expected, arrow_schema.to_string()); - Ok(()) - } - - #[test] - fn join_qualified() -> Result<()> { - let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?; - let join = left.join(&right)?; - assert_eq!("t1.c0, t1.c1, t2.c0, t2.c1", join.to_string()); - // test valid access - assert!(join.field_with_qualified_name("t1", "c0").is_ok()); - assert!(join.field_with_qualified_name("t2", "c0").is_ok()); - // test invalid access - assert!(join.field_with_unqualified_name("c0").is_err()); - assert!(join.field_with_unqualified_name("t1.c0").is_err()); - assert!(join.field_with_unqualified_name("t2.c0").is_err()); - Ok(()) - } - - #[test] - fn join_qualified_duplicate() -> Result<()> { - let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let join = left.join(&right); - assert!(join.is_err()); - assert_eq!( - "Error during planning: Schema contains duplicate \ - qualified field name \'t1.c0\'", - &format!("{}", join.err().unwrap()) - ); - Ok(()) - } - - #[test] - fn join_unqualified_duplicate() -> Result<()> { - let left = DFSchema::try_from(test_schema_1())?; - let right = DFSchema::try_from(test_schema_1())?; - let join = left.join(&right); - assert!(join.is_err()); - assert_eq!( - "Error during planning: Schema contains duplicate \ - unqualified field name \'c0\'", - &format!("{}", join.err().unwrap()) - ); - Ok(()) - } - - #[test] - fn join_mixed() -> Result<()> { - let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let right = DFSchema::try_from(test_schema_2())?; - let join = left.join(&right)?; - assert_eq!("t1.c0, t1.c1, c100, c101", join.to_string()); - // test valid access - assert!(join.field_with_qualified_name("t1", "c0").is_ok()); - assert!(join.field_with_unqualified_name("c0").is_ok()); - assert!(join.field_with_unqualified_name("c100").is_ok()); - assert!(join.field_with_name(None, "c100").is_ok()); - // test invalid access - assert!(join.field_with_unqualified_name("t1.c0").is_err()); - assert!(join.field_with_unqualified_name("t1.c100").is_err()); - assert!(join.field_with_qualified_name("", "c100").is_err()); - Ok(()) - } - - #[test] - fn join_mixed_duplicate() -> Result<()> { - let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let right = DFSchema::try_from(test_schema_1())?; - let join = left.join(&right); - assert!(join.is_err()); - assert_eq!( - "Error during planning: Schema contains qualified \ - field name \'t1.c0\' and unqualified field name \'c0\' which would be ambiguous", - &format!("{}", join.err().unwrap()) - ); - Ok(()) - } - - #[test] - fn helpful_error_messages() -> Result<()> { - let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?; - let expected_help = "Valid fields are \'t1.c0\', \'t1.c1\'."; - assert!(schema - .field_with_qualified_name("x", "y") - .unwrap_err() - .to_string() - .contains(expected_help)); - assert!(schema - .field_with_unqualified_name("y") - .unwrap_err() - .to_string() - .contains(expected_help)); - assert!(schema - .index_of("y") - .unwrap_err() - .to_string() - .contains(expected_help)); - Ok(()) - } - - #[test] - fn into() { - // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef - let arrow_schema = Schema::new(vec![Field::new("c0", DataType::Int64, true)]); - let arrow_schema_ref = Arc::new(arrow_schema.clone()); - - let df_schema = - DFSchema::new(vec![DFField::new(None, "c0", DataType::Int64, true)]).unwrap(); - let df_schema_ref = Arc::new(df_schema.clone()); - - { - let arrow_schema = arrow_schema.clone(); - let arrow_schema_ref = arrow_schema_ref.clone(); - - assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap()); - assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap()); - } - - { - let arrow_schema = arrow_schema.clone(); - let arrow_schema_ref = arrow_schema_ref.clone(); - - assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap()); - assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap()); - } - - // Now, consume the refs - assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap()); - assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap()); - } - - fn test_schema_1() -> Schema { - Schema::new(vec![ - Field::new("c0", DataType::Boolean, true), - Field::new("c1", DataType::Boolean, true), - ]) - } - - fn test_schema_2() -> Schema { - Schema::new(vec![ - Field::new("c100", DataType::Boolean, true), - Field::new("c101", DataType::Boolean, true), - ]) - } -} +pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema}; diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index c2763d097e855..4b539a8145517 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -34,152 +34,14 @@ use crate::physical_plan::{ use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue}; use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction}; use arrow::{compute::can_cast_types, datatypes::DataType}; +pub use datafusion_common::{Column, ExprSchema}; use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature}; use std::collections::{HashMap, HashSet}; -use std::convert::Infallible; use std::fmt; use std::hash::{BuildHasher, Hash, Hasher}; use std::ops::Not; -use std::str::FromStr; use std::sync::Arc; -/// A named reference to a qualified field in a schema. -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct Column { - /// relation/table name. - pub relation: Option, - /// field/column name. - pub name: String, -} - -impl Column { - /// Create Column from unqualified name. - pub fn from_name(name: impl Into) -> Self { - Self { - relation: None, - name: name.into(), - } - } - - /// Deserialize a fully qualified name string into a column - pub fn from_qualified_name(flat_name: &str) -> Self { - use sqlparser::tokenizer::Token; - - let dialect = sqlparser::dialect::GenericDialect {}; - let mut tokenizer = sqlparser::tokenizer::Tokenizer::new(&dialect, flat_name); - if let Ok(tokens) = tokenizer.tokenize() { - if let [Token::Word(relation), Token::Period, Token::Word(name)] = - tokens.as_slice() - { - return Column { - relation: Some(relation.value.clone()), - name: name.value.clone(), - }; - } - } - // any expression that's not in the form of `foo.bar` will be treated as unqualified column - // name - Column { - relation: None, - name: String::from(flat_name), - } - } - - /// Serialize column into a flat name string - pub fn flat_name(&self) -> String { - match &self.relation { - Some(r) => format!("{}.{}", r, self.name), - None => self.name.clone(), - } - } - - /// Normalizes `self` if is unqualified (has no relation name) - /// with an explicit qualifier from the first matching input - /// schemas. - /// - /// For example, `foo` will be normalized to `t.foo` if there is a - /// column named `foo` in a relation named `t` found in `schemas` - pub fn normalize(self, plan: &LogicalPlan) -> Result { - let schemas = plan.all_schemas(); - let using_columns = plan.using_columns()?; - self.normalize_with_schemas(&schemas, &using_columns) - } - - // Internal implementation of normalize - fn normalize_with_schemas( - self, - schemas: &[&Arc], - using_columns: &[HashSet], - ) -> Result { - if self.relation.is_some() { - return Ok(self); - } - - for schema in schemas { - let fields = schema.fields_with_unqualified_name(&self.name); - match fields.len() { - 0 => continue, - 1 => { - return Ok(fields[0].qualified_column()); - } - _ => { - // More than 1 fields in this schema have their names set to self.name. - // - // This should only happen when a JOIN query with USING constraint references - // join columns using unqualified column name. For example: - // - // ```sql - // SELECT id FROM t1 JOIN t2 USING(id) - // ``` - // - // In this case, both `t1.id` and `t2.id` will match unqualified column `id`. - // We will use the relation from the first matched field to normalize self. - - // Compare matched fields with one USING JOIN clause at a time - for using_col in using_columns { - let all_matched = fields - .iter() - .all(|f| using_col.contains(&f.qualified_column())); - // All matched fields belong to the same using column set, in orther words - // the same join clause. We simply pick the qualifer from the first match. - if all_matched { - return Ok(fields[0].qualified_column()); - } - } - } - } - } - - Err(DataFusionError::Plan(format!( - "Column {} not found in provided schemas", - self - ))) - } -} - -impl From<&str> for Column { - fn from(c: &str) -> Self { - Self::from_qualified_name(c) - } -} - -impl FromStr for Column { - type Err = Infallible; - - fn from_str(s: &str) -> std::result::Result { - Ok(s.into()) - } -} - -impl fmt::Display for Column { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &self.relation { - Some(r) => write!(f, "#{}.{}", r, self.name), - None => write!(f, "#{}", self.name), - } - } -} - /// `Expr` is a central struct of DataFusion's query API, and /// represent logical expressions such as `A + 1`, or `CAST(c1 AS /// int)`. @@ -392,40 +254,6 @@ impl PartialOrd for Expr { } } -/// Provides schema information needed by [Expr] methods such as -/// [Expr::nullable] and [Expr::data_type]. -/// -/// Note that this trait is implemented for &[DFSchema] which is -/// widely used in the DataFusion codebase. -pub trait ExprSchema { - /// Is this column reference nullable? - fn nullable(&self, col: &Column) -> Result; - - /// What is the datatype of this column? - fn data_type(&self, col: &Column) -> Result<&DataType>; -} - -// Implement `ExprSchema` for `Arc` -impl> ExprSchema for P { - fn nullable(&self, col: &Column) -> Result { - self.as_ref().nullable(col) - } - - fn data_type(&self, col: &Column) -> Result<&DataType> { - self.as_ref().data_type(col) - } -} - -impl ExprSchema for DFSchema { - fn nullable(&self, col: &Column) -> Result { - Ok(self.field_from_column(col)?.is_nullable()) - } - - fn data_type(&self, col: &Column) -> Result<&DataType> { - Ok(self.field_from_column(col)?.data_type()) - } -} - impl Expr { /// Returns the [arrow::datatypes::DataType] of the expression /// based on [ExprSchema]