Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-3512: Add filters pushdown for read datasource #3513

Merged
merged 5 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/functions/src/aggregates/aggregate_avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub fn try_create_aggregate_avg_function(

{
Err(ErrorCode::BadDataValueType(format!(
"AggregateSumFunction does not support type '{:?}'",
"AggregateAvgFunction does not support type '{:?}'",
data_type
)))
})
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_display_indent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> {
comma = true;
}

if !p.filters.is_empty() {
if comma {
write!(f, ", ")?;
}
write!(f, "filters: {:?}", p.filters)?;
comma = true;
}

if p.limit.is_some() {
if comma {
write!(f, ", ")?;
Expand Down
10 changes: 8 additions & 2 deletions query/src/sql/statements/query/query_collect_push_downs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::sql::statements::QueryASTIR;

pub struct QueryCollectPushDowns {
require_columns: HashSet<String>,
require_filters: Vec<Expression>,
}

/// Collect the query need to push downs parts .
Expand All @@ -40,12 +41,18 @@ impl QueryASTIRVisitor<QueryCollectPushDowns> for QueryCollectPushDowns {

Ok(())
}

fn visit_filter(predicate: &mut Expression, data: &mut QueryCollectPushDowns) -> Result<()> {
data.require_filters = vec![predicate.clone()];
Self::visit_recursive_expr(predicate, data)
}
}

impl QueryCollectPushDowns {
pub fn collect_extras(ir: &mut QueryASTIR, schema: &mut JoinedSchema) -> Result<()> {
let mut push_downs_data = Self {
require_columns: HashSet::new(),
require_filters: vec![],
};
QueryCollectPushDowns::visit(ir, &mut push_downs_data)?;
push_downs_data.collect_push_downs(schema)
Expand All @@ -58,8 +65,7 @@ impl QueryCollectPushDowns {

schema.set_table_push_downs(index, Extras {
projection: Some(projection),
// TODO:
filters: vec![],
filters: self.require_filters.clone(),
limit: None,
order_by: vec![],
});
Expand Down
16 changes: 8 additions & 8 deletions query/tests/it/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ async fn test_explain_interpreter() -> Result<()> {
assert_eq!(block.column(0).len(), 4);

let expected = vec![
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| explain |",
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| Projection: number:UInt64 |",
"| Having: ((number + 1) = 4) |",
"| Filter: ((number + 1) = 4) |",
"| ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]] |",
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| explain |",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| Projection: number:UInt64 |",
"| Having: ((number + 1) = 4) |",
"| Filter: ((number + 1) = 4) |",
"| ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number + 1) = 4)]] |",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
];
common_datablocks::assert_blocks_eq(expected, result.as_slice());
} else {
Expand Down
6 changes: 3 additions & 3 deletions query/tests/it/optimizers/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_literal_false_filter() -> Result<()> {
let expect = "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]";
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [((1 + 2) = 2)]]";

assert_eq!(actual, expect);
Ok(())
Expand All @@ -77,7 +77,7 @@ fn test_skip_read_data_source() -> Result<()> {
expect:"\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [((1 + 2) = 2)]]",
},
Test {
name: "Limit with zero should skip the scan",
Expand All @@ -86,7 +86,7 @@ fn test_skip_read_data_source() -> Result<()> {
Limit: 0\
\n Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [true]]",
},
Test {
name: "Having with 'having 1+1=3' should skip the scan",
Expand Down
16 changes: 8 additions & 8 deletions query/tests/it/optimizers/optimizer_constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,63 +110,63 @@ fn test_constant_folding_optimizer() -> Result<()> {
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(true AND (number > 1))]]",
},
Test {
name: "Filter cond and true",
query: "SELECT number from numbers(10) where number > 1 AND true",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) AND true)]]",
},
Test {
name: "Filter false and cond",
query: "SELECT number from numbers(10) where false AND number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(false AND (number > 1))]]",
},
Test {
name: "Filter cond and false",
query: "SELECT number from numbers(10) where number > 1 AND false",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) AND false)]]",
},
Test {
name: "Filter false or cond",
query: "SELECT number from numbers(10) where false OR number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(false OR (number > 1))]]",
},
Test {
name: "Filter cond or false",
query: "SELECT number from numbers(10) where number > 1 OR false",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) OR false)]]",
},
Test {
name: "Filter true or cond",
query: "SELECT number from numbers(10) where true OR number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(true OR (number > 1))]]",
},
Test {
name: "Filter cond or true",
query: "SELECT number from numbers(10) where number > 1 OR true",
expect: "\
Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) OR true)]]",
},
];

Expand Down
22 changes: 11 additions & 11 deletions query/tests/it/optimizers/optimizer_expression_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,79 +30,79 @@ fn test_expression_transform_optimizer() -> Result<()> {
expect: "\
Projection: number:UInt64\
\n Filter: ((number <= 1) or (number > 3))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT ((number > 1) AND (number <= 3)))]]",
},
Test {
name: "Complex expression",
query: "select number from numbers_mt(10) where not(number>=5 or number<3 and toBoolean(number))",
expect: "\
Projection: number:UInt64\
\n Filter: ((number < 5) and ((number >= 3) or (NOT toBoolean(number))))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT ((number >= 5) OR ((number < 3) AND toBoolean(number))))]]",
},
Test {
name: "Like and isNotNull expression",
query: "select * from system.databases where not (isNotNull(name) and name LIKE '%sys%')",
expect: "\
Projection: name:String\
\n Filter: (isnull(name) or (name not like %sys%))\
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [(NOT (isNotNull(name) AND (name LIKE %sys%)))]]",
},
Test {
name: "Not like and isNull expression",
query: "select * from system.databases where not (name is null or name not like 'a%')",
expect: "\
Projection: name:String\
\n Filter: (isnotnull(name) and (name like a%))\
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [(NOT (isnull(name) OR (name NOT LIKE a%)))]]",
},
Test {
name: "Equal expression",
query: "select number from numbers_mt(10) where not(number=1) and number<5",
expect: "\
Projection: number:UInt64\
\n Filter: ((number <> 1) and (number < 5))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((NOT (number = 1)) AND (number < 5))]]",
},
Test {
name: "Not equal expression",
query: "select number from numbers_mt(10) where not(number!=1) or number<5",
expect: "\
Projection: number:UInt64\
\n Filter: ((number = 1) or (number < 5))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((NOT (number <> 1)) OR (number < 5))]]",
},
Test {
name: "Not expression",
query: "select number from numbers_mt(10) where not(NOT toBoolean(number))",
expect: "\
Projection: number:UInt64\
\n Filter: toBoolean(number)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT (NOT toBoolean(number)))]]",
},
Test {
name: "Boolean transform",
query: "select number from numbers_mt(10) where number",
expect: "\
Projection: number:UInt64\
\n Filter: (number != 0)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [number]]",
},
Test {
name: "Boolean and truth transform",
query: "select number from numbers_mt(10) where not number",
expect: "\
Projection: number:UInt64\
\n Filter: (number = 0)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT number)]]",
},
Test {
name: "Literal boolean transform",
query: "select number from numbers_mt(10) where false",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [false]]",
},
Test {
name: "Limit zero transform",
Expand All @@ -111,7 +111,7 @@ fn test_expression_transform_optimizer() -> Result<()> {
Limit: 0\
\n Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [true]]",
},
];

Expand Down
Loading