Skip to content

Commit

Permalink
move column, dfschema, etc. to common module
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Feb 7, 2022
1 parent eab7615 commit b0eadd7
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 857 deletions.
2 changes: 1 addition & 1 deletion datafusion-common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Column {
}

// Internal implementation of normalize
fn normalize_with_schemas(
pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
Expand Down
34 changes: 34 additions & 0 deletions datafusion-common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,40 @@ impl Display for DFSchema {
}
}

/// Provides schema information needed by [Expr] methods such as
/// [Expr::nullable] and [Expr::data_type].
///
/// Note that this trait is implemented for &[DFSchema] which is
/// widely used in the DataFusion codebase.
pub trait ExprSchema {
/// Is this column reference nullable?
fn nullable(&self, col: &Column) -> Result<bool>;

/// What is the datatype of this column?
fn data_type(&self, col: &Column) -> Result<&DataType>;
}

// Implement `ExprSchema` for `Arc<DFSchema>`
impl<P: AsRef<DFSchema>> ExprSchema for P {
fn nullable(&self, col: &Column) -> Result<bool> {
self.as_ref().nullable(col)
}

fn data_type(&self, col: &Column) -> Result<&DataType> {
self.as_ref().data_type(col)
}
}

impl ExprSchema for DFSchema {
fn nullable(&self, col: &Column) -> Result<bool> {
Ok(self.field_from_column(col)?.is_nullable())
}

fn data_type(&self, col: &Column) -> Result<&DataType> {
Ok(self.field_from_column(col)?.data_type())
}
}

/// DFField wraps an Arrow field and adds an optional qualifier
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFField {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ mod dfschema;
mod error;

pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{DataFusionError, Result};
48 changes: 34 additions & 14 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,17 @@ impl LogicalPlanBuilder {
self.join_detailed(right, join_type, join_keys, false)
}

fn normalize(
plan: &LogicalPlan,
column: impl Into<Column> + Clone,
) -> Result<Column> {
let schemas = plan.all_schemas();
let using_columns = plan.using_columns()?;
column
.into()
.normalize_with_schemas(&schemas, &using_columns)
}

/// Apply a join with on constraint and specified null equality
/// If null_equals_null is true then null == null, else null != null
pub fn join_detailed(
Expand Down Expand Up @@ -633,7 +644,10 @@ impl LogicalPlanBuilder {
match (l_is_left, l_is_right, r_is_left, r_is_right) {
(_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
(Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
_ => (l.normalize(&self.plan), r.normalize(right)),
_ => (
Self::normalize(&self.plan, l),
Self::normalize(right, r),
),
}
}
(Some(lr), None) => {
Expand All @@ -643,9 +657,12 @@ impl LogicalPlanBuilder {
right.schema().field_with_qualified_name(lr, &l.name);

match (l_is_left, l_is_right) {
(Ok(_), _) => (Ok(l), r.normalize(right)),
(_, Ok(_)) => (r.normalize(&self.plan), Ok(l)),
_ => (l.normalize(&self.plan), r.normalize(right)),
(Ok(_), _) => (Ok(l), Self::normalize(right, r)),
(_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
_ => (
Self::normalize(&self.plan, l),
Self::normalize(right, r),
),
}
}
(None, Some(rr)) => {
Expand All @@ -655,22 +672,25 @@ impl LogicalPlanBuilder {
right.schema().field_with_qualified_name(rr, &r.name);

match (r_is_left, r_is_right) {
(Ok(_), _) => (Ok(r), l.normalize(right)),
(_, Ok(_)) => (l.normalize(&self.plan), Ok(r)),
_ => (l.normalize(&self.plan), r.normalize(right)),
(Ok(_), _) => (Ok(r), Self::normalize(right, l)),
(_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
_ => (
Self::normalize(&self.plan, l),
Self::normalize(right, r),
),
}
}
(None, None) => {
let mut swap = false;
let left_key =
l.clone().normalize(&self.plan).or_else(|_| {
let left_key = Self::normalize(&self.plan, l.clone())
.or_else(|_| {
swap = true;
l.normalize(right)
Self::normalize(right, l)
});
if swap {
(r.normalize(&self.plan), left_key)
(Self::normalize(&self.plan, r), left_key)
} else {
(left_key, r.normalize(right))
(left_key, Self::normalize(right, r))
}
}
}
Expand Down Expand Up @@ -705,11 +725,11 @@ impl LogicalPlanBuilder {
let left_keys: Vec<Column> = using_keys
.clone()
.into_iter()
.map(|c| c.into().normalize(&self.plan))
.map(|c| Self::normalize(&self.plan, c))
.collect::<Result<_>>()?;
let right_keys: Vec<Column> = using_keys
.into_iter()
.map(|c| c.into().normalize(right))
.map(|c| Self::normalize(right, c))
.collect::<Result<_>>()?;

let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys.into_iter()).collect();
Expand Down
Loading

0 comments on commit b0eadd7

Please sign in to comment.