Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve TableScan with filters pushdown unparsing (joins) #13132

Merged
merged 5 commits into from
Oct 29, 2024

Conversation

sgrebnov
Copy link
Member

@sgrebnov sgrebnov commented Oct 26, 2024

Which issue does this PR close?

With filter pushdown optimization enabled, TableScan filters and LogicalPlan::Filter without a dedicated Projection are incorrectly applied after the join statement, leading to incorrect results. For example

Original query (TPC-H Q13)

select
            c_custkey,
            count(o_orderkey)
        from
            customer left join orders on
                        c_custkey = o_custkey
                    and o_comment not like '%special%requests%'
        group by
            c_custkey

is unparsed as below. You can see that original filter from Join is unparsed after join as where. The difference is that original query will contain customer records where there is no matching order and the second version does not (filter is applied after join).

select
    "customer"."c_custkey",
    count("orders"."o_orderkey")
  from
    "customer"
  left join "orders" on
    ("customer"."c_custkey" = "orders"."o_custkey")
  where
    "orders"."o_comment" not like '%special%requests%'
  group by
    "customer"."c_custkey"
  order by "customer"."c_custkey" asc

What changes are included in this PR?

The PR improves unparsing by determining if the join is a simple table scan (not a subquery that will be translated to a full subquery (SELECT .. FROM ..)) so that filters must be applied directly to the join. In the case of a full subquery, filters will be applied within the subquery, and this works correctly. This fixes TPC-H Q12 (filters on the left join were missing/not applied at all) and TPC-H Q13 (the filter was applied after the join instead of during/before, as shown above).

SELECT "customer"."c_custkey", count("orders"."o_orderkey") FROM "customer" LEFT JOIN "orders" ON (("customer"."c_custkey" = "orders"."o_custkey") AND "orders"."o_comment" NOT LIKE '%special%requests%') GROUP BY "customer"."c_custkey" 


sql> explain select
            c_custkey,
            count(o_orderkey)
        from
            customer left join orders on
                        c_custkey = o_custkey
                    and o_comment not like '%special%requests%'
        group by
            c_custkey;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | BytesProcessedNode                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |   Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |  Projection: customer.c_custkey, count(orders.o_orderkey)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |   Aggregate: groupBy=[[customer.c_custkey]], aggr=[[count(orders.o_orderkey)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |     Left Join:  Filter: customer.c_custkey = orders.o_custkey                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |       TableScan: customer                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       TableScan: orders, full_filters=[orders.o_comment NOT LIKE Utf8("%special%requests%")]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| physical_plan | BytesProcessedExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |   SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |     VirtualExecutionPlan name=postgres compute_context=host=Tcp("localhost"),port=6432,db=tpch_sf1,user=postgres, sql=SELECT customer.c_custkey, count(orders.o_orderkey) FROM customer LEFT JOIN orders ON ((customer.c_custkey = orders.o_custkey) AND orders.o_comment NOT LIKE '%special%requests%') GROUP BY customer.c_custkey rewritten_sql=SELECT "customer"."c_custkey", count("orders"."o_orderkey") FROM "customer" LEFT JOIN "orders" ON (("customer"."c_custkey" = "orders"."o_custkey") AND "orders"."o_comment" NOT LIKE '%special%requests%') GROUP BY "customer"."c_custkey" |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Note: The alternative approach considered was always unparsing join sub-nodes by adding a wildcard projection if needed. However, it requires introducing additional alias for a subquery (and updating corresponding upper nodes to match alias added) as not all engines support adding subquery with the same name as original table, for example JOIN (SELECT * FROM "orders" WHERE "orders"."o_comment" not like '%special%requests%') as orders .. - as orders needs to be replaced with new alias for most engines

Are these changes tested?

Added unit test, tested as part of TPC-H and TPC-DS queries unparsing by https://github.com/spiceai/spiceai (running benchmarks with some filters pushdown optimizations enabled)

Are there any user-facing changes?

Fixes unparsing issues related to incorrectly generated 'JOIN' filters when running TPC-H and TPC-DS queries with filters pushdown optimization enabled

@github-actions github-actions bot added the sql SQL Planner label Oct 26, 2024
@sgrebnov sgrebnov force-pushed the sgrebnov/improve-join-upstream branch from 9070372 to e560ece Compare October 28, 2024 15:36
Copy link
Contributor

@goldmedal goldmedal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @sgrebnov. This PR is good to me but I have some findings regarding the placement of the predicate.


let sql = plan_to_sql(&join_plan_no_filter)?;

let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND ("left"."name" LIKE 'some_name' AND (age > 10))"#;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed you put the pushdown condition in the join condition instead of the WHERE. In my opinion, the SQL plan will be different if we sometimes put the condition in a different place.

I did some tests for different join type and different place (join condition or filter) in DataFusion

    let join_type = vec![
        "inner join", "left join", "right join", "full join"
    ];

    for join in join_type {
        println!("-----------------{join}-------------------");
        println!("###### predicate in filter ######");
        let sql = format!("select o_orderkey from orders {join} customer on o_custkey = c_custkey where c_name = 'Customer#000000001'");
        println!("SQL: {}", sql);
        match ctx.sql(&sql).await?.into_optimized_plan() {
            Ok(plan) => {println!("{plan}")},
            Err(e) => eprintln!("Error: {}", e),
        }
        println!("###### predicate in join condition ######");
        let sql = format!("select o_orderkey from orders {join} customer on o_custkey = c_custkey and c_name = 'Customer#000000001'");
        println!("SQL: {}", sql);
        match ctx.sql(&sql).await?.into_optimized_plan() {
            Ok(plan) => {println!("{plan}")},
            Err(e) => eprintln!("Error: {}", e),
        }
    }

The result is

-----------------inner join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders inner join customer on o_custkey = c_custkey where c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Inner Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
###### predicate in join condition ######
SQL: select o_orderkey from orders inner join customer on o_custkey = c_custkey and c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Inner Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
-----------------left join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders left join customer on o_custkey = c_custkey where c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Inner Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
###### predicate in join condition ######
SQL: select o_orderkey from orders left join customer on o_custkey = c_custkey and c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Left Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
-----------------right join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders right join customer on o_custkey = c_custkey where c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Right Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
###### predicate in join condition ######
SQL: select o_orderkey from orders right join customer on o_custkey = c_custkey and c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Right Join: orders.o_custkey = customer.c_custkey Filter: customer.c_name = Utf8("Customer#000000001")
    TableScan: orders projection=[o_orderkey, o_custkey]
    TableScan: customer projection=[c_custkey, c_name]
-----------------full join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders full join customer on o_custkey = c_custkey where c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Right Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
###### predicate in join condition ######
SQL: select o_orderkey from orders full join customer on o_custkey = c_custkey and c_name = 'Customer#000000001'
Projection: orders.o_orderkey
  Full Join: orders.o_custkey = customer.c_custkey Filter: customer.c_name = Utf8("Customer#000000001")
    TableScan: orders projection=[o_orderkey, o_custkey]
    TableScan: customer projection=[c_custkey, c_name]

We can find the plan is the same in inner join and left join. The filter pushdown works fine. However, in right join and full join cases, if we put the predicate in the join condition, the filter pushdown doesn't work.
In the DataFusion case, the filter pushdown always works when putting the filter in WHERE.

I'm not pretty sure if it's a common rule (putting the predicate in WHERE is better) for the other database. However, in the DataFusino case, we're better to put them in WHERE.

By the way, this PR is ok for me now. I think it can be improved by a follow-up PR if we care about the performance of the generated SQL.

cc @alamb @phillipleblanc

Copy link
Member Author

@sgrebnov sgrebnov Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goldmedal - thank you for deep review. I suspect that filters were not fully pushed down for the right join and full join cases by DF during optimization for samples above as two test queries are not exactly the same as how records are filtered:

`on o_custkey = c_custkey where c_name = 'Customer#000000001'  <-- filter is applied after join, fully filter out non matching records
`on o_custkey = c_custkey and c_name = 'Customer#000000001'` <-- filtering is done during join, will join/include NULL

It seems in all examples above the original WHERE was moved inside Join by optimizer (all cases), so optimized plan should be unparsed as below for right join, for example

select o_orderkey from orders right join (select c_custkey from customer where c_name = 'Customer#000000001') on o_custkey = c_custkey

and we will translate it to

select o_orderkey from orders right join customers on o_custkey = c_custkey and c_name = 'Customer#000000001'

@goldmedal - Is my understanding correct that tha main concern is that the first option is preferred as it could be executed more efficient by target engine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goldmedal - Is my understanding correct that tha main concern is that the first option is preferred as it could be executed more efficient by target engine?

Yes, if we can push down the predicate to the table scan, it usually means it will perform better.

I tried the subquery pattern:

-----------------inner join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders inner join (select c_custkey  from customer where c_name = 'Customer#000000001') on o_custkey = c_custkey
Projection: orders.o_orderkey
  Inner Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
-----------------left join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders left join (select c_custkey  from customer where c_name = 'Customer#000000001') on o_custkey = c_custkey
Projection: orders.o_orderkey
  Left Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
-----------------right join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders right join (select c_custkey  from customer where c_name = 'Customer#000000001') on o_custkey = c_custkey
Projection: orders.o_orderkey
  Right Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]
-----------------full join-------------------
###### predicate in filter ######
SQL: select o_orderkey from orders full join (select c_custkey  from customer where c_name = 'Customer#000000001') on o_custkey = c_custkey
Projection: orders.o_orderkey
  Full Join: orders.o_custkey = customer.c_custkey
    TableScan: orders projection=[o_orderkey, o_custkey]
    Projection: customer.c_custkey
      Filter: customer.c_name = Utf8("Customer#000000001")
        TableScan: customer projection=[c_custkey, c_name], partial_filters=[customer.c_name = Utf8("Customer#000000001")]

Every predicate is pushed down to the table scan. It's better 👍
I haven't checked the planner of other databases but I think they're similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sgrebnov Do you want to improve it in this PR? or we can do it in the follow-up PR (maybe file an issue). WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goldmedal - I would prefer the incremental approach with a follow-up PR. Thank you!

@goldmedal goldmedal merged commit 0b45b9a into apache:main Oct 29, 2024
24 checks passed
@goldmedal
Copy link
Contributor

Thanks, @sgrebnov 👍 I filed #13156 to trace the improvement work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants