From 1e09dc244989f8c880be1c7f4974f81a2485db92 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sun, 14 Apr 2024 00:52:31 -0700 Subject: [PATCH] add partition filter --- datafusion/common/src/scalar/mod.rs | 12 +- .../core/src/datasource/listing/helpers.rs | 219 +++++++++++++++++- .../core/tests/expr_api/simplification.rs | 2 +- .../optimizer/tests/optimizer_integration.rs | 4 +- .../test_files/filter_without_sort_exec.slt | 4 +- .../optimizer_group_by_constant.slt | 2 +- .../sqllogictest/test_files/tpch/q1.slt.part | 6 +- .../sqllogictest/test_files/tpch/q10.slt.part | 6 +- .../sqllogictest/test_files/tpch/q12.slt.part | 6 +- 9 files changed, 239 insertions(+), 22 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index cfed69fad8dde..d2c6513eef957 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -45,8 +45,8 @@ use arrow::{ compute::kernels::cast::{cast_with_options, CastOptions}, datatypes::{ i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType, - Date32Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type, - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, + Date32Type, Date64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type, + Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION, @@ -3179,8 +3179,12 @@ impl fmt::Display for ScalarValue { ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?, ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?, ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?, - ScalarValue::Date32(e) => format_option!(f, e)?, - ScalarValue::Date64(e) => format_option!(f, e)?, + ScalarValue::Date32(e) => { + format_option!(f, e.map(|v| Date32Type::to_naive_date(v).to_string()))? + } + ScalarValue::Date64(e) => { + format_option!(f, e.map(|v| Date64Type::to_naive_date(v).to_string()))? + } ScalarValue::Time32Second(e) => format_option!(f, e)?, ScalarValue::Time32Millisecond(e) => format_option!(f, e)?, ScalarValue::Time64Microsecond(e) => format_option!(f, e)?, diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 5b87090096652..d64bf7357b637 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,11 +17,13 @@ //! Helper functions for the table implementation +use std::collections::HashMap; use std::sync::Arc; use super::PartitionedFile; use crate::datasource::listing::ListingTableUrl; use crate::execution::context::SessionState; +use crate::logical_expr::{BinaryExpr, Operator}; use crate::{error::Result, scalar::ScalarValue}; use arrow::{ @@ -169,9 +171,17 @@ async fn list_partitions( store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, + partition_prefix: Option, ) -> Result> { let partition = Partition { - path: table_path.prefix().clone(), + path: match partition_prefix { + Some(prefix) => Path::from_iter( + Path::from(table_path.prefix().as_ref()) + .parts() + .chain(Path::from(prefix.as_ref()).parts()), + ), + None => table_path.prefix().clone(), + }, depth: 0, files: None, }; @@ -305,6 +315,81 @@ async fn prune_partitions( Ok(filtered) } +#[derive(Debug)] +enum PartitionValue { + Single(String), + Multi, +} + +fn populate_partition_values<'a>( + partition_values: &mut HashMap<&'a str, PartitionValue>, + filter: &'a Expr, +) { + if let Expr::BinaryExpr(BinaryExpr { + ref left, + op, + ref right, + }) = filter + { + match op { + Operator::Eq => match (left.as_ref(), right.as_ref()) { + (Expr::Column(Column { ref name, .. }), Expr::Literal(val)) + | (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => { + if partition_values + .insert(name, PartitionValue::Single(val.to_string())) + .is_some() + { + partition_values.insert(name, PartitionValue::Multi); + } + } + _ => {} + }, + Operator::And => { + populate_partition_values(partition_values, left); + populate_partition_values(partition_values, right); + } + _ => {} + } + } +} + +fn evaluate_partition_prefix<'a>( + partition_cols: &'a [(String, DataType)], + filters: &'a [Expr], +) -> Option { + let mut partition_values = HashMap::new(); + + if filters.len() > 1 { + return None; + } + + for filter in filters { + populate_partition_values(&mut partition_values, filter); + } + + if partition_values.is_empty() { + return None; + } + + let mut parts = vec![]; + for (p, _) in partition_cols { + match partition_values.get(p.as_str()) { + Some(PartitionValue::Single(val)) => { + parts.push(format!("{p}={val}")); + } + _ => { + break; + } + } + } + + if parts.is_empty() { + None + } else { + Some(Path::from_iter(parts)) + } +} + /// Discover the partitions on the given path and prune out files /// that belong to irrelevant partitions using `filters` expressions. /// `filters` might contain expressions that can be resolved only at the @@ -327,7 +412,10 @@ pub async fn pruned_partition_list<'a>( )); } - let partitions = list_partitions(store, table_path, partition_cols.len()).await?; + let partition_prefix = evaluate_partition_prefix(partition_cols, filters); + let partitions = + list_partitions(store, table_path, partition_cols.len(), partition_prefix) + .await?; debug!("Listed {} partitions", partitions.len()); let pruned = @@ -416,7 +504,9 @@ where mod tests { use std::ops::Not; - use crate::logical_expr::{case, col, lit}; + use futures::StreamExt; + + use crate::logical_expr::{case, col, lit, Expr}; use crate::test::object_store::make_test_store_and_state; use super::*; @@ -675,4 +765,127 @@ mod tests { // this helper function assert!(expr_applicable_for_cols(&[], &lit(true))); } + + #[test] + fn test_evaluate_partition_prefix() { + let partitions = &[ + ("a".to_string(), DataType::Utf8), + ("b".to_string(), DataType::Int16), + ("c".to_string(), DataType::Boolean), + ]; + + assert_eq!( + evaluate_partition_prefix(partitions, &[Expr::eq(col("a"), lit("foo"))],), + Some(Path::from("a=foo")), + ); + + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::and( + Expr::eq(col("a"), lit("foo")), + Expr::eq(col("b"), lit("bar")), + )], + ), + Some(Path::from("a=foo/b=bar")), + ); + + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::and( + Expr::eq(col("a"), lit("foo")), + Expr::and( + Expr::eq(col("b"), lit("1")), + Expr::eq(col("c"), lit("true")), + ), + )], + ), + Some(Path::from("a=foo/b=1/c=true")), + ); + + // no prefix when filter is empty + assert_eq!(evaluate_partition_prefix(partitions, &[]), None); + + // b=foo results in no prefix because a is not restricted + assert_eq!( + evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))],), + None, + ); + + // a=foo and c=baz only results in preifx a=foo because b is not restricted + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::and( + Expr::eq(col("a"), lit("foo")), + Expr::eq(col("c"), lit("baz")), + )], + ), + Some(Path::from("a=foo")), + ); + + // a=foo or b=bar results in no prefix + assert_eq!( + evaluate_partition_prefix( + partitions, + &[ + Expr::eq(col("a"), lit("foo")), + Expr::eq(col("b"), lit("bar")), + ], + ), + None, + ); + + // partition with multiple values results in no prefix + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::and( + Expr::eq(col("a"), lit("foo")), + Expr::eq(col("a"), lit("bar")), + )], + ), + None, + ); + + // no prefix because partition a is not restricted to a single literal + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::or( + Expr::eq(col("a"), lit("foo")), + Expr::eq(col("a"), lit("bar")), + )], + ), + None, + ); + } + + #[test] + fn test_evaluate_date_partition_prefix() { + let partitions = &[("a".to_string(), DataType::Date32)]; + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::eq( + col("a"), + Expr::Literal(ScalarValue::Date32(Some(3))) + )], + ), + Some(Path::from("a=1970-01-04")), + ); + + let partitions = &[("a".to_string(), DataType::Date64)]; + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::eq( + col("a"), + Expr::Literal(ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000))) + )], + ), + Some(Path::from("a=1970-01-05")), + ); + } } diff --git a/datafusion/core/tests/expr_api/simplification.rs b/datafusion/core/tests/expr_api/simplification.rs index bb41929834267..9d714df331c30 100644 --- a/datafusion/core/tests/expr_api/simplification.rs +++ b/datafusion/core/tests/expr_api/simplification.rs @@ -289,7 +289,7 @@ fn select_date_plus_interval() -> Result<()> { // Note that constant folder runs and folds the entire // expression down to a single constant (true) - let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408") + let expected = r#"Projection: Date32("2021-01-09") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408") TableScan: test"#; let actual = get_optimized_plan_formatted(plan, &time); diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index b2fbc26ebaf1b..ae3feafbb753e 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -187,7 +187,7 @@ fn between_date32_plus_interval() -> Result<()> { let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\ \n Projection: \ - \n Filter: test.col_date32 >= Date32(\"10303\") AND test.col_date32 <= Date32(\"10393\")\ + \n Filter: test.col_date32 >= Date32(\"1998-03-18\") AND test.col_date32 <= Date32(\"1998-06-16\")\ \n TableScan: test projection=[col_date32]"; assert_eq!(expected, format!("{plan:?}")); Ok(()) @@ -201,7 +201,7 @@ fn between_date64_plus_interval() -> Result<()> { let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\ \n Projection: \ - \n Filter: test.col_date64 >= Date64(\"890179200000\") AND test.col_date64 <= Date64(\"897955200000\")\ + \n Filter: test.col_date64 >= Date64(\"1998-03-18\") AND test.col_date64 <= Date64(\"1998-06-16\")\ \n TableScan: test projection=[col_date64]"; assert_eq!(expected, format!("{plan:?}")); Ok(()) diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt index d5d3d87b5747b..3b4deae3326f3 100644 --- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -143,11 +143,11 @@ ORDER BY "ticker", "time"; ---- logical_plan 01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST -02)--Filter: data.date = Date32("13150") +02)--Filter: data.date = Date32("2006-01-02") 03)----TableScan: data projection=[date, ticker, time] physical_plan 01)SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 -03)----FilterExec: date@0 = 13150 +03)----FilterExec: date@0 = 2006-01-02 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt index d54e82de07e43..f578b08482ac7 100644 --- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt +++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt @@ -72,7 +72,7 @@ FROM test_table t GROUP BY 1, 2 ---- logical_plan -01)Projection: Date32("19481") AS dt, Boolean(true) AS today_filter, COUNT(Int64(1)) +01)Projection: Date32("2023-05-04") AS dt, Boolean(true) AS today_filter, COUNT(Int64(1)) 02)--Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[] diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part b/datafusion/sqllogictest/test_files/tpch/q1.slt.part index f2c14f2628ee3..90d7e7d905cf0 100644 --- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part @@ -44,8 +44,8 @@ logical_plan 02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(*) AS count_order 03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM({lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}} AS lineitem.l_extendedprice * Decimal128(Some(1),20,0) - lineitem.l_discount * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(Int64(1)) AS COUNT(*)]] 04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus -05)--------Filter: lineitem.l_shipdate <= Date32("10471") -06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("10471")] +05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02") +06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")] physical_plan 01)SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] 02)--SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true] @@ -56,7 +56,7 @@ physical_plan 07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(*)] 08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as {lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)|{Decimal128(Some(1),20,0) - lineitem.l_discount|{lineitem.l_discount}|{Decimal128(Some(1),20,0)}}|{lineitem.l_extendedprice}}, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] 09)----------------CoalesceBatchesExec: target_batch_size=8192 -10)------------------FilterExec: l_shipdate@6 <= 10471 +10)------------------FilterExec: l_shipdate@6 <= 1998-09-02 11)--------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], has_header=false query TTRRRRRRRI diff --git a/datafusion/sqllogictest/test_files/tpch/q10.slt.part b/datafusion/sqllogictest/test_files/tpch/q10.slt.part index b732610de095f..c33cdef8c0095 100644 --- a/datafusion/sqllogictest/test_files/tpch/q10.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q10.slt.part @@ -63,8 +63,8 @@ logical_plan 10)------------------Inner Join: customer.c_custkey = orders.o_custkey 11)--------------------TableScan: customer projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_comment] 12)--------------------Projection: orders.o_orderkey, orders.o_custkey -13)----------------------Filter: orders.o_orderdate >= Date32("8674") AND orders.o_orderdate < Date32("8766") -14)------------------------TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate], partial_filters=[orders.o_orderdate >= Date32("8674"), orders.o_orderdate < Date32("8766")] +13)----------------------Filter: orders.o_orderdate >= Date32("1993-10-01") AND orders.o_orderdate < Date32("1994-01-01") +14)------------------------TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate], partial_filters=[orders.o_orderdate >= Date32("1993-10-01"), orders.o_orderdate < Date32("1994-01-01")] 15)----------------Projection: lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount 16)------------------Filter: lineitem.l_returnflag = Utf8("R") 17)--------------------TableScan: lineitem projection=[l_orderkey, l_extendedprice, l_discount, l_returnflag], partial_filters=[lineitem.l_returnflag = Utf8("R")] @@ -96,7 +96,7 @@ physical_plan 24)--------------------------------------RepartitionExec: partitioning=Hash([o_custkey@1], 4), input_partitions=4 25)----------------------------------------ProjectionExec: expr=[o_orderkey@0 as o_orderkey, o_custkey@1 as o_custkey] 26)------------------------------------------CoalesceBatchesExec: target_batch_size=8192 -27)--------------------------------------------FilterExec: o_orderdate@2 >= 8674 AND o_orderdate@2 < 8766 +27)--------------------------------------------FilterExec: o_orderdate@2 >= 1993-10-01 AND o_orderdate@2 < 1994-01-01 28)----------------------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_orderkey, o_custkey, o_orderdate], has_header=false 29)----------------------------CoalesceBatchesExec: target_batch_size=8192 30)------------------------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4 diff --git a/datafusion/sqllogictest/test_files/tpch/q12.slt.part b/datafusion/sqllogictest/test_files/tpch/q12.slt.part index 5f1b344f784d5..6d1213b3507d0 100644 --- a/datafusion/sqllogictest/test_files/tpch/q12.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q12.slt.part @@ -55,8 +55,8 @@ logical_plan 04)------Projection: lineitem.l_shipmode, orders.o_orderpriority 05)--------Inner Join: lineitem.l_orderkey = orders.o_orderkey 06)----------Projection: lineitem.l_orderkey, lineitem.l_shipmode -07)------------Filter: (lineitem.l_shipmode = Utf8("MAIL") OR lineitem.l_shipmode = Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("8766") AND lineitem.l_receiptdate < Date32("9131") -08)--------------TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8("MAIL") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("8766"), lineitem.l_receiptdate < Date32("9131")] +07)------------Filter: (lineitem.l_shipmode = Utf8("MAIL") OR lineitem.l_shipmode = Utf8("SHIP")) AND lineitem.l_receiptdate > lineitem.l_commitdate AND lineitem.l_shipdate < lineitem.l_commitdate AND lineitem.l_receiptdate >= Date32("1994-01-01") AND lineitem.l_receiptdate < Date32("1995-01-01") +08)--------------TableScan: lineitem projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8("MAIL") OR lineitem.l_shipmode = Utf8("SHIP"), lineitem.l_receiptdate > lineitem.l_commitdate, lineitem.l_shipdate < lineitem.l_commitdate, lineitem.l_receiptdate >= Date32("1994-01-01"), lineitem.l_receiptdate < Date32("1995-01-01")] 09)----------TableScan: orders projection=[o_orderkey, o_orderpriority] physical_plan 01)SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST] @@ -72,7 +72,7 @@ physical_plan 11)--------------------RepartitionExec: partitioning=Hash([l_orderkey@0], 4), input_partitions=4 12)----------------------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_shipmode@4 as l_shipmode] 13)------------------------CoalesceBatchesExec: target_batch_size=8192 -14)--------------------------FilterExec: (l_shipmode@4 = MAIL OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131 +14)--------------------------FilterExec: (l_shipmode@4 = MAIL OR l_shipmode@4 = SHIP) AND l_receiptdate@3 > l_commitdate@2 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 1994-01-01 AND l_receiptdate@3 < 1995-01-01 15)----------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_orderkey, l_shipdate, l_commitdate, l_receiptdate, l_shipmode], has_header=false 16)------------------CoalesceBatchesExec: target_batch_size=8192 17)--------------------RepartitionExec: partitioning=Hash([o_orderkey@0], 4), input_partitions=4