-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement pruning on partition columns #1179
Conversation
Looks like the windows build is flaking on the main branch. Not sure of the cause. If possible would be nice to rerun just that step. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for writing this @Blajda.
Would you be willing to add some more tests? I think it's worth making sure this is very robust :)
rust/src/delta_datafusion.rs
Outdated
@@ -510,6 +529,59 @@ impl ExecutionPlan for DeltaScan { | |||
} | |||
} | |||
|
|||
fn get_null_of_arrow_type(t: &ArrowDataType) -> ScalarValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably goes beyond the scope of this PR, but it would be nice to consolidate these implementations with the one here:
delta-rs/rust/src/table_state_arrow.rs
Lines 547 to 550 in 7e8abaa
fn json_value_to_array_general<'a>( | |
datatype: &arrow::datatypes::DataType, | |
values: impl Iterator<Item = &'a serde_json::Value>, | |
) -> Result<ArrayRef, DeltaTableError> { |
In some places we seem to be going from serde_json::Value
to ScalarValue
to ArrayRef
, which seems a little inefficient.
let metrics = get_scan_metrics(&table, &state, &[e]).await?; | ||
assert!(metrics.num_scanned_files() == 1); | ||
|
||
// Check pruning for null partitions. Since there are no record count statistics pruning cannot be done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems odd. Why can't it eliminate the null partition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Datafusion, null indicates that statistics for a particular column do not exist. Also null counts are only used for the functions is_null
and is_not_null
. Further work is required to accommodate the edge case where all records for a columns are actually null. I imagine this can be accomplished by checking the null count statistic with a record count statistic.
I'd take a further look and raise an issue on their end.
rust/tests/datafusion_test.rs
Outdated
|
||
let e = col("c3").eq(lit(0)); | ||
let metrics = get_scan_metrics(&table, &state, &[e]).await?; | ||
assert!(metrics.num_scanned_files() == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also test a conjunction and disjunction?
@@ -228,8 +228,31 @@ async fn test_datafusion_stats() -> Result<()> { | |||
Ok(()) | |||
} | |||
|
|||
async fn get_scan_metrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not strictly required in this PR, but we have a very basic implementation for a metrics collector, that I plan to promote to a first class delta-rs member soon. The aim would be to harmonise our approach to metrics collection across operations. Just in case you also want to use that here :)
delta-rs/rust/tests/datafusion_test.rs
Lines 39 to 59 in 8dcf46e
pub struct ExecutionMetricsCollector { | |
scanned_files: HashSet<Label>, | |
} | |
impl ExecutionMetricsCollector { | |
fn num_scanned_files(&self) -> usize { | |
self.scanned_files.len() | |
} | |
} | |
impl ExecutionPlanVisitor for ExecutionMetricsCollector { | |
type Error = DataFusionError; | |
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> { | |
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() { | |
let files = get_scanned_files(exec); | |
self.scanned_files.extend(files); | |
} | |
Ok(true) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm already using this implementation with a minor change if there are zero records. I found it really useful!
b019a06
to
6bd7a54
Compare
@wjones127 There seems to be incomplete stats for I think Currently the following types cannot be used as partition column
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great work, especially the tests. Happy to see that some of the bigger holes in our datafusion story are being fixed 😀.
rust/src/delta_datafusion.rs
Outdated
| ArrowDataType::Interval(_) | ||
| ArrowDataType::RunEndEncoded(_, _) | ||
| ArrowDataType::Map(_, _) => { | ||
panic!("{}", format!("Implement data type for Delta Lake {}", t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually try to be a panic-free crate, would it be possible to surface this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunately mostly used at an interface boundary with Datafusion that doesn't return a result. I removed the panic and restored the original behavior of returning a null scalar when it cannot be handled.
If there are no additional fixes required I would like to wait for #1180 to be merged first so I can check if the partition tests are resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rust/tests/datafusion_test.rs
Outdated
// (Column name, value from file 1, value from file 2, value from file 3, non existant value) | ||
let tests = [ | ||
("utf8", lit("1"), lit("5"), lit("8"), lit("3")), | ||
( | ||
"int64", | ||
lit(1 as i64), | ||
lit(5 as i64), | ||
lit(8 as i64), | ||
lit(3 as i64), | ||
), | ||
( | ||
"int32", | ||
lit(1 as i32), | ||
lit(5 as i32), | ||
lit(8 as i32), | ||
lit(3 as i32), | ||
), | ||
( | ||
"int16", | ||
lit(1 as i16), | ||
lit(5 as i16), | ||
lit(8 as i16), | ||
lit(3 as i16), | ||
), | ||
( | ||
"int8", | ||
lit(1 as i8), | ||
lit(5 as i8), | ||
lit(8 as i8), | ||
lit(3 as i8), | ||
), | ||
( | ||
"float64", | ||
lit(1 as f64), | ||
lit(5 as f64), | ||
lit(8 as f64), | ||
lit(3 as f64), | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like, so we aren't referencing everything by position:
// (Column name, value from file 1, value from file 2, value from file 3, non existant value) | |
let tests = [ | |
("utf8", lit("1"), lit("5"), lit("8"), lit("3")), | |
( | |
"int64", | |
lit(1 as i64), | |
lit(5 as i64), | |
lit(8 as i64), | |
lit(3 as i64), | |
), | |
( | |
"int32", | |
lit(1 as i32), | |
lit(5 as i32), | |
lit(8 as i32), | |
lit(3 as i32), | |
), | |
( | |
"int16", | |
lit(1 as i16), | |
lit(5 as i16), | |
lit(8 as i16), | |
lit(3 as i16), | |
), | |
( | |
"int8", | |
lit(1 as i8), | |
lit(5 as i8), | |
lit(8 as i8), | |
lit(3 as i8), | |
), | |
( | |
"float64", | |
lit(1 as f64), | |
lit(5 as f64), | |
lit(8 as f64), | |
lit(3 as f64), | |
), | |
struct TestCase { | |
column: &'static str, | |
file1_value: Expr, | |
file2_value: Expr, | |
file3_value: Expr, | |
non_existent_value: Expr, | |
} | |
impl TestCase { | |
fn new(column: &str, expression_builder: Fn(i64) -> Expr) | |
{ | |
TestCase { | |
column, | |
file1_value: expression_builder(1), | |
file2_value: expression_builder(5), | |
file3_value: expression_builder(8), | |
non_existent_value: expression_builder(3), | |
} | |
} | |
} | |
let tests = [ | |
TestCase::new("utf8", |value| lit(value.to_string())), | |
TestCase::new("int64", |value| lit(value)), | |
TestCase::new("int32", |value| lit(value as i32)), | |
TestCase::new("int16", |value| lit(value as i16)), | |
TestCase::new("int8", |value| lit(value as i8)), | |
TestCase::new("float64", |value| lit(value as f64)), | |
TestCase::new("float32", |value| lit(value as f32)), | |
// TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written | |
// not the proper string representation as specified by the percision and scale | |
TestCase::new("float32", |value| lit(Decimal128(Some(100 * value), 10, 2))), | |
TestCase::new("timestamp", |value| lit(ScalarValue::TimestampMicrosecond(Some(value * 1_000_000), None))), | |
// TODO: The writer does not write complete statistiics for date columns | |
TestCase::new("date", |value| lit(ScalarValue::Date32(Some(value)))), | |
// TODO: The writer does not write complete statistics for binary columns | |
TestCase::new("binary", |value| lit(to_binary(value.to_string()))), | |
]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to replace the lines below too. The GH UI doesn't make it easy to select all of them :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wjones127 I've update the cases based on your outline. I agree it makes it a lot easier to read. Thanks for the suggestion :)
06426d4
to
2758345
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks excellent. Thanks for helping identify all those cases we need to fix. 😄
# Description Exposes partition columns in Datafusion's `PruningStatistics` which will reduce the number of files scanned when the table is queried. This also resolves another partition issues where involving `null` partitions. Previously `ScalarValue::Null` was used which would cause an error when the actual datatype was obtained from the physical parquet files. # Related Issue(s) - closes delta-io#1175
Description
Exposes partition columns in Datafusion's
PruningStatistics
which will reduce the number of files scanned when the table is queried.This also resolves another partition issues where involving
null
partitions. PreviouslyScalarValue::Null
was used which would cause an error when the actual datatype was obtained from the physical parquet files.Related Issue(s)