Skip to content

Commit

Permalink
[fix] improvements following review
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 29, 2021
1 parent c5cfcfb commit cb0789e
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 87 deletions.
160 changes: 76 additions & 84 deletions datafusion/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use log::debug;
use crate::{
error::Result,
execution::context::ExecutionContext,
logical_plan::{self, Expr},
logical_plan::{self, Expr, ExpressionVisitor, Recursion},
physical_plan::functions::Volatility,
scalar::ScalarValue,
};
Expand All @@ -51,93 +51,83 @@ const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_";

/// The `ExpressionVisitor` for `expr_applicable_for_cols`. Walks the tree to
/// validate that the given expression is applicable with only the `col_names`
/// set of columns.
struct ApplicabilityVisitor<'a> {
col_names: &'a [String],
is_applicable: &'a mut bool,
}

impl ApplicabilityVisitor<'_> {
fn visit_volatility(self, volatility: Volatility) -> Recursion<Self> {
match volatility {
Volatility::Immutable => Recursion::Continue(self),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable | Volatility::Volatile => {
*self.is_applicable = false;
Recursion::Stop(self)
}
}
}
}

impl ExpressionVisitor for ApplicabilityVisitor<'_> {
fn pre_visit(self, expr: &Expr) -> Result<Recursion<Self>> {
let rec = match expr {
Expr::Column(logical_plan::Column { ref name, .. }) => {
*self.is_applicable &= self.col_names.contains(name);
Recursion::Stop(self) // leaf node anyway
}
Expr::Literal(_)
| Expr::Alias(_, _)
| Expr::ScalarVariable(_)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::Negative(_)
| Expr::Cast { .. }
| Expr::TryCast { .. }
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::Case { .. } => Recursion::Continue(self),

Expr::ScalarFunction { fun, .. } => self.visit_volatility(fun.volatility()),
Expr::ScalarUDF { fun, .. } => {
self.visit_volatility(fun.signature.volatility)
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateUDF { .. }
| Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard => {
*self.is_applicable = false;
Recursion::Stop(self)
}
};
Ok(rec)
}
}

/// Check whether the given expression can be resolved using only the columns `col_names`.
/// This means that if this function returns true:
/// - the table provider can filter the table partition values with this expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
/// was performed
pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
match expr {
// leaf
Expr::Literal(_) => true,
// TODO how to handle qualified / unqualified names?
Expr::Column(logical_plan::Column { ref name, .. }) => col_names.contains(name),
// unary
Expr::Alias(child, _)
| Expr::Not(child)
| Expr::IsNotNull(child)
| Expr::IsNull(child)
| Expr::Negative(child)
| Expr::Cast { expr: child, .. }
| Expr::TryCast { expr: child, .. } => expr_applicable_for_cols(col_names, child),
// binary
Expr::BinaryExpr {
ref left,
ref right,
..
} => {
expr_applicable_for_cols(col_names, left)
&& expr_applicable_for_cols(col_names, right)
}
// ternary
Expr::Between {
expr: item,
low,
high,
..
} => {
expr_applicable_for_cols(col_names, item)
&& expr_applicable_for_cols(col_names, low)
&& expr_applicable_for_cols(col_names, high)
}
// variadic
Expr::ScalarFunction { fun, args } => match fun.volatility() {
Volatility::Immutable => args
.iter()
.all(|arg| expr_applicable_for_cols(col_names, arg)),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable => false,
Volatility::Volatile => false,
},
Expr::ScalarUDF { fun, args } => match fun.signature.volatility {
Volatility::Immutable => args
.iter()
.all(|arg| expr_applicable_for_cols(col_names, arg)),
// TODO: Stable functions could be `applicable`, but that would require access to the context
Volatility::Stable => false,
Volatility::Volatile => false,
},
Expr::InList {
expr: item, list, ..
} => {
expr_applicable_for_cols(col_names, item)
&& list.iter().all(|e| expr_applicable_for_cols(col_names, e))
}
Expr::Case {
expr,
when_then_expr,
else_expr,
} => {
let expr_constant = expr
.as_ref()
.map(|e| expr_applicable_for_cols(col_names, e))
.unwrap_or(true);
let else_constant = else_expr
.as_ref()
.map(|e| expr_applicable_for_cols(col_names, e))
.unwrap_or(true);
let when_then_constant = when_then_expr.iter().all(|(w, th)| {
expr_applicable_for_cols(col_names, w)
&& expr_applicable_for_cols(col_names, th)
});
expr_constant && else_constant && when_then_constant
}
// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
_ => false,
}
let mut is_applicable = true;
expr.accept(ApplicabilityVisitor {
col_names,
is_applicable: &mut is_applicable,
})
.unwrap();
is_applicable
}

/// Partition the list of files into `n` groups
Expand Down Expand Up @@ -191,8 +181,10 @@ pub async fn pruned_partition_list(
.collect();
let stream_path = table_path.to_owned();
if applicable_filters.is_empty() {
// parse the partition values while listing all the files
// TODO we might avoid parsing the partition values if they are not used in any projection
// Parse the partition values while listing all the files
// Note: We might avoid parsing the partition values if they are not used in any projection,
// but the cost of parsing will likely be far dominated by the time to fetch the listing from
// the object store.
let table_partition_cols_stream = table_partition_cols.to_vec();
Ok(Box::pin(
store
Expand Down
70 changes: 67 additions & 3 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ impl<'a> Display for FileGroupsDisplay<'a> {
}
}

/// A helper that projects partition columns into the file record batches
/// A helper that projects partition columns into the file record batches.
///
/// One interesting trick is the usage of a cache for the key buffers of the partition column
/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them
/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches,
/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count).
struct PartitionColumnProjector {
/// An Arrow buffer initialized to zeros that represents the key array of all partition
/// columns (partition columns are materialized by dictionary arrays with only one
Expand Down Expand Up @@ -202,7 +207,7 @@ impl PartitionColumnProjector {
}
}

// Transform the batch read from the fileby inserting the partitioning columns
// Transform the batch read from the file by inserting the partitioning columns
// to the right positions as deduced from `projected_schema`
// - file_batch: batch read from the file, with internal projection applied
// - partition_values: the list of partition values, one for each partition column
Expand Down Expand Up @@ -379,6 +384,8 @@ mod tests {
let (proj_schema, _) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols);

// project first batch
let projected_batch = proj
.project(
// file_batch is ok here because we kept all the file cols in the projection
Expand All @@ -390,7 +397,6 @@ mod tests {
],
)
.expect("Projection of partition columns into record batch failed");

let expected = vec![
"+---+----+----+------+-----+",
"| a | b | c | year | day |",
Expand All @@ -401,6 +407,64 @@ mod tests {
"+---+----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);

// project another batch that is larger than the previous one
let file_batch = build_table_i32(
("a", &vec![5, 6, 7, 8, 9]),
("b", &vec![-10, -9, -8, -7, -6]),
("c", &vec![12, 13, 14, 15, 16]),
);
let projected_batch = proj
.project(
// file_batch is ok here because we kept all the file cols in the projection
file_batch,
&[
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
ScalarValue::Utf8(Some("27".to_owned())),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = vec![
"+---+-----+----+------+-----+",
"| a | b | c | year | day |",
"+---+-----+----+------+-----+",
"| 5 | -10 | 12 | 2021 | 27 |",
"| 6 | -9 | 13 | 2021 | 27 |",
"| 7 | -8 | 14 | 2021 | 27 |",
"| 8 | -7 | 15 | 2021 | 27 |",
"| 9 | -6 | 16 | 2021 | 27 |",
"+---+-----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);

// project another batch that is smaller than the previous one
let file_batch = build_table_i32(
("a", &vec![0, 1, 3]),
("b", &vec![2, 3, 4]),
("c", &vec![4, 5, 6]),
);
let projected_batch = proj
.project(
// file_batch is ok here because we kept all the file cols in the projection
file_batch,
&[
ScalarValue::Utf8(Some("2021".to_owned())),
ScalarValue::Utf8(Some("10".to_owned())),
ScalarValue::Utf8(Some("28".to_owned())),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = vec![
"+---+---+---+------+-----+",
"| a | b | c | year | day |",
"+---+---+---+------+-----+",
"| 0 | 2 | 4 | 2021 | 28 |",
"| 1 | 3 | 5 | 2021 | 28 |",
"| 3 | 4 | 6 | 2021 | 28 |",
"+---+---+---+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
}

// sets default for configs that play no role in projections
Expand Down

0 comments on commit cb0789e

Please sign in to comment.