From cb0789e5f789fd80d5238d4fd5f22b682c309b01 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 29 Oct 2021 10:59:22 +0200 Subject: [PATCH] [fix] improvements following review https://github.com/apache/arrow-datafusion/pull/1141#pullrequestreview-792371776 --- datafusion/src/datasource/listing/helpers.rs | 160 +++++++++--------- .../src/physical_plan/file_format/mod.rs | 70 +++++++- 2 files changed, 143 insertions(+), 87 deletions(-) diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index b37602c7ffeb..59a4333806e0 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -37,7 +37,7 @@ use log::debug; use crate::{ error::Result, execution::context::ExecutionContext, - logical_plan::{self, Expr}, + logical_plan::{self, Expr, ExpressionVisitor, Recursion}, physical_plan::functions::Volatility, scalar::ScalarValue, }; @@ -51,93 +51,83 @@ const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_"; const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_"; const FILE_MODIFIED_COLUMN_NAME: &str = "_df_part_file_modified_"; +/// The `ExpressionVisitor` for `expr_applicable_for_cols`. Walks the tree to +/// validate that the given expression is applicable with only the `col_names` +/// set of columns. +struct ApplicabilityVisitor<'a> { + col_names: &'a [String], + is_applicable: &'a mut bool, +} + +impl ApplicabilityVisitor<'_> { + fn visit_volatility(self, volatility: Volatility) -> Recursion { + match volatility { + Volatility::Immutable => Recursion::Continue(self), + // TODO: Stable functions could be `applicable`, but that would require access to the context + Volatility::Stable | Volatility::Volatile => { + *self.is_applicable = false; + Recursion::Stop(self) + } + } + } +} + +impl ExpressionVisitor for ApplicabilityVisitor<'_> { + fn pre_visit(self, expr: &Expr) -> Result> { + let rec = match expr { + Expr::Column(logical_plan::Column { ref name, .. }) => { + *self.is_applicable &= self.col_names.contains(name); + Recursion::Stop(self) // leaf node anyway + } + Expr::Literal(_) + | Expr::Alias(_, _) + | Expr::ScalarVariable(_) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::Negative(_) + | Expr::Cast { .. } + | Expr::TryCast { .. } + | Expr::BinaryExpr { .. } + | Expr::Between { .. } + | Expr::InList { .. } + | Expr::Case { .. } => Recursion::Continue(self), + + Expr::ScalarFunction { fun, .. } => self.visit_volatility(fun.volatility()), + Expr::ScalarUDF { fun, .. } => { + self.visit_volatility(fun.signature.volatility) + } + + // TODO other expressions are not handled yet: + // - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases + // - Can `Wildcard` be considered as a `Literal`? + // - ScalarVariable could be `applicable`, but that would require access to the context + Expr::AggregateUDF { .. } + | Expr::AggregateFunction { .. } + | Expr::Sort { .. } + | Expr::WindowFunction { .. } + | Expr::Wildcard => { + *self.is_applicable = false; + Recursion::Stop(self) + } + }; + Ok(rec) + } +} + /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: /// - the table provider can filter the table partition values with this expression /// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering /// was performed pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool { - match expr { - // leaf - Expr::Literal(_) => true, - // TODO how to handle qualified / unqualified names? - Expr::Column(logical_plan::Column { ref name, .. }) => col_names.contains(name), - // unary - Expr::Alias(child, _) - | Expr::Not(child) - | Expr::IsNotNull(child) - | Expr::IsNull(child) - | Expr::Negative(child) - | Expr::Cast { expr: child, .. } - | Expr::TryCast { expr: child, .. } => expr_applicable_for_cols(col_names, child), - // binary - Expr::BinaryExpr { - ref left, - ref right, - .. - } => { - expr_applicable_for_cols(col_names, left) - && expr_applicable_for_cols(col_names, right) - } - // ternary - Expr::Between { - expr: item, - low, - high, - .. - } => { - expr_applicable_for_cols(col_names, item) - && expr_applicable_for_cols(col_names, low) - && expr_applicable_for_cols(col_names, high) - } - // variadic - Expr::ScalarFunction { fun, args } => match fun.volatility() { - Volatility::Immutable => args - .iter() - .all(|arg| expr_applicable_for_cols(col_names, arg)), - // TODO: Stable functions could be `applicable`, but that would require access to the context - Volatility::Stable => false, - Volatility::Volatile => false, - }, - Expr::ScalarUDF { fun, args } => match fun.signature.volatility { - Volatility::Immutable => args - .iter() - .all(|arg| expr_applicable_for_cols(col_names, arg)), - // TODO: Stable functions could be `applicable`, but that would require access to the context - Volatility::Stable => false, - Volatility::Volatile => false, - }, - Expr::InList { - expr: item, list, .. - } => { - expr_applicable_for_cols(col_names, item) - && list.iter().all(|e| expr_applicable_for_cols(col_names, e)) - } - Expr::Case { - expr, - when_then_expr, - else_expr, - } => { - let expr_constant = expr - .as_ref() - .map(|e| expr_applicable_for_cols(col_names, e)) - .unwrap_or(true); - let else_constant = else_expr - .as_ref() - .map(|e| expr_applicable_for_cols(col_names, e)) - .unwrap_or(true); - let when_then_constant = when_then_expr.iter().all(|(w, th)| { - expr_applicable_for_cols(col_names, w) - && expr_applicable_for_cols(col_names, th) - }); - expr_constant && else_constant && when_then_constant - } - // TODO other expressions are not handled yet: - // - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases - // - Can `Wildcard` be considered as a `Literal`? - // - ScalarVariable could be `applicable`, but that would require access to the context - _ => false, - } + let mut is_applicable = true; + expr.accept(ApplicabilityVisitor { + col_names, + is_applicable: &mut is_applicable, + }) + .unwrap(); + is_applicable } /// Partition the list of files into `n` groups @@ -191,8 +181,10 @@ pub async fn pruned_partition_list( .collect(); let stream_path = table_path.to_owned(); if applicable_filters.is_empty() { - // parse the partition values while listing all the files - // TODO we might avoid parsing the partition values if they are not used in any projection + // Parse the partition values while listing all the files + // Note: We might avoid parsing the partition values if they are not used in any projection, + // but the cost of parsing will likely be far dominated by the time to fetch the listing from + // the object store. let table_partition_cols_stream = table_partition_cols.to_vec(); Ok(Box::pin( store diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index cd30c6f7e05b..d460e9830fe5 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -166,7 +166,12 @@ impl<'a> Display for FileGroupsDisplay<'a> { } } -/// A helper that projects partition columns into the file record batches +/// A helper that projects partition columns into the file record batches. +/// +/// One interesting trick is the usage of a cache for the key buffers of the partition column +/// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them +/// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, +/// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). struct PartitionColumnProjector { /// An Arrow buffer initialized to zeros that represents the key array of all partition /// columns (partition columns are materialized by dictionary arrays with only one @@ -202,7 +207,7 @@ impl PartitionColumnProjector { } } - // Transform the batch read from the fileby inserting the partitioning columns + // Transform the batch read from the file by inserting the partitioning columns // to the right positions as deduced from `projected_schema` // - file_batch: batch read from the file, with internal projection applied // - partition_values: the list of partition values, one for each partition column @@ -379,6 +384,8 @@ mod tests { let (proj_schema, _) = conf.project(); // created a projector for that projected schema let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols); + + // project first batch let projected_batch = proj .project( // file_batch is ok here because we kept all the file cols in the projection @@ -390,7 +397,6 @@ mod tests { ], ) .expect("Projection of partition columns into record batch failed"); - let expected = vec![ "+---+----+----+------+-----+", "| a | b | c | year | day |", @@ -401,6 +407,64 @@ mod tests { "+---+----+----+------+-----+", ]; crate::assert_batches_eq!(expected, &[projected_batch]); + + // project another batch that is larger than the previous one + let file_batch = build_table_i32( + ("a", &vec![5, 6, 7, 8, 9]), + ("b", &vec![-10, -9, -8, -7, -6]), + ("c", &vec![12, 13, 14, 15, 16]), + ); + let projected_batch = proj + .project( + // file_batch is ok here because we kept all the file cols in the projection + file_batch, + &[ + ScalarValue::Utf8(Some("2021".to_owned())), + ScalarValue::Utf8(Some("10".to_owned())), + ScalarValue::Utf8(Some("27".to_owned())), + ], + ) + .expect("Projection of partition columns into record batch failed"); + let expected = vec![ + "+---+-----+----+------+-----+", + "| a | b | c | year | day |", + "+---+-----+----+------+-----+", + "| 5 | -10 | 12 | 2021 | 27 |", + "| 6 | -9 | 13 | 2021 | 27 |", + "| 7 | -8 | 14 | 2021 | 27 |", + "| 8 | -7 | 15 | 2021 | 27 |", + "| 9 | -6 | 16 | 2021 | 27 |", + "+---+-----+----+------+-----+", + ]; + crate::assert_batches_eq!(expected, &[projected_batch]); + + // project another batch that is smaller than the previous one + let file_batch = build_table_i32( + ("a", &vec![0, 1, 3]), + ("b", &vec![2, 3, 4]), + ("c", &vec![4, 5, 6]), + ); + let projected_batch = proj + .project( + // file_batch is ok here because we kept all the file cols in the projection + file_batch, + &[ + ScalarValue::Utf8(Some("2021".to_owned())), + ScalarValue::Utf8(Some("10".to_owned())), + ScalarValue::Utf8(Some("28".to_owned())), + ], + ) + .expect("Projection of partition columns into record batch failed"); + let expected = vec![ + "+---+---+---+------+-----+", + "| a | b | c | year | day |", + "+---+---+---+------+-----+", + "| 0 | 2 | 4 | 2021 | 28 |", + "| 1 | 3 | 5 | 2021 | 28 |", + "| 3 | 4 | 6 | 2021 | 28 |", + "+---+---+---+------+-----+", + ]; + crate::assert_batches_eq!(expected, &[projected_batch]); } // sets default for configs that play no role in projections