Skip to content

Commit

Permalink
Merge pull request #3513 from zhyass/feature_index
Browse files Browse the repository at this point in the history
ISSUE-3512: Add filters pushdown for read datasource
  • Loading branch information
BohuTANG authored Dec 17, 2021
2 parents 7cb8185 + e3dc408 commit 34dbe41
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 45 deletions.
2 changes: 1 addition & 1 deletion common/functions/src/aggregates/aggregate_avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ pub fn try_create_aggregate_avg_function(

{
Err(ErrorCode::BadDataValueType(format!(
"AggregateSumFunction does not support type '{:?}'",
"AggregateAvgFunction does not support type '{:?}'",
data_type
)))
})
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_display_indent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> {
comma = true;
}

if !p.filters.is_empty() {
if comma {
write!(f, ", ")?;
}
write!(f, "filters: {:?}", p.filters)?;
comma = true;
}

if p.limit.is_some() {
if comma {
write!(f, ", ")?;
Expand Down
10 changes: 8 additions & 2 deletions query/src/sql/statements/query/query_collect_push_downs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::sql::statements::QueryASTIR;

pub struct QueryCollectPushDowns {
require_columns: HashSet<String>,
require_filters: Vec<Expression>,
}

/// Collect the query need to push downs parts .
Expand All @@ -40,12 +41,18 @@ impl QueryASTIRVisitor<QueryCollectPushDowns> for QueryCollectPushDowns {

Ok(())
}

fn visit_filter(predicate: &mut Expression, data: &mut QueryCollectPushDowns) -> Result<()> {
data.require_filters = vec![predicate.clone()];
Self::visit_recursive_expr(predicate, data)
}
}

impl QueryCollectPushDowns {
pub fn collect_extras(ir: &mut QueryASTIR, schema: &mut JoinedSchema) -> Result<()> {
let mut push_downs_data = Self {
require_columns: HashSet::new(),
require_filters: vec![],
};
QueryCollectPushDowns::visit(ir, &mut push_downs_data)?;
push_downs_data.collect_push_downs(schema)
Expand All @@ -58,8 +65,7 @@ impl QueryCollectPushDowns {

schema.set_table_push_downs(index, Extras {
projection: Some(projection),
// TODO:
filters: vec![],
filters: self.require_filters.clone(),
limit: None,
order_by: vec![],
});
Expand Down
16 changes: 8 additions & 8 deletions query/tests/it/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ async fn test_explain_interpreter() -> Result<()> {
assert_eq!(block.column(0).len(), 4);

let expected = vec![
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| explain |",
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| Projection: number:UInt64 |",
"| Having: ((number + 1) = 4) |",
"| Filter: ((number + 1) = 4) |",
"| ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]] |",
"+-------------------------------------------------------------------------------------------------------------------------------------------------------+",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| explain |",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| Projection: number:UInt64 |",
"| Having: ((number + 1) = 4) |",
"| Filter: ((number + 1) = 4) |",
"| ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number + 1) = 4)]] |",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
];
common_datablocks::assert_blocks_eq(expected, result.as_slice());
} else {
Expand Down
6 changes: 3 additions & 3 deletions query/tests/it/optimizers/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn test_literal_false_filter() -> Result<()> {
let expect = "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]";
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [((1 + 2) = 2)]]";

assert_eq!(actual, expect);
Ok(())
Expand All @@ -77,7 +77,7 @@ fn test_skip_read_data_source() -> Result<()> {
expect:"\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [((1 + 2) = 2)]]",
},
Test {
name: "Limit with zero should skip the scan",
Expand All @@ -86,7 +86,7 @@ fn test_skip_read_data_source() -> Result<()> {
Limit: 0\
\n Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [true]]",
},
Test {
name: "Having with 'having 1+1=3' should skip the scan",
Expand Down
16 changes: 8 additions & 8 deletions query/tests/it/optimizers/optimizer_constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,63 +110,63 @@ fn test_constant_folding_optimizer() -> Result<()> {
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(true AND (number > 1))]]",
},
Test {
name: "Filter cond and true",
query: "SELECT number from numbers(10) where number > 1 AND true",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) AND true)]]",
},
Test {
name: "Filter false and cond",
query: "SELECT number from numbers(10) where false AND number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(false AND (number > 1))]]",
},
Test {
name: "Filter cond and false",
query: "SELECT number from numbers(10) where number > 1 AND false",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) AND false)]]",
},
Test {
name: "Filter false or cond",
query: "SELECT number from numbers(10) where false OR number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(false OR (number > 1))]]",
},
Test {
name: "Filter cond or false",
query: "SELECT number from numbers(10) where number > 1 OR false",
expect: "\
Projection: number:UInt64\
\n Filter: (number > 1)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) OR false)]]",
},
Test {
name: "Filter true or cond",
query: "SELECT number from numbers(10) where true OR number > 1",
expect: "\
Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(true OR (number > 1))]]",
},
Test {
name: "Filter cond or true",
query: "SELECT number from numbers(10) where number > 1 OR true",
expect: "\
Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((number > 1) OR true)]]",
},
];

Expand Down
22 changes: 11 additions & 11 deletions query/tests/it/optimizers/optimizer_expression_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,79 +30,79 @@ fn test_expression_transform_optimizer() -> Result<()> {
expect: "\
Projection: number:UInt64\
\n Filter: ((number <= 1) or (number > 3))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT ((number > 1) AND (number <= 3)))]]",
},
Test {
name: "Complex expression",
query: "select number from numbers_mt(10) where not(number>=5 or number<3 and toBoolean(number))",
expect: "\
Projection: number:UInt64\
\n Filter: ((number < 5) and ((number >= 3) or (NOT toBoolean(number))))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT ((number >= 5) OR ((number < 3) AND toBoolean(number))))]]",
},
Test {
name: "Like and isNotNull expression",
query: "select * from system.databases where not (isNotNull(name) and name LIKE '%sys%')",
expect: "\
Projection: name:String\
\n Filter: (isnull(name) or (name not like %sys%))\
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [(NOT (isNotNull(name) AND (name LIKE %sys%)))]]",
},
Test {
name: "Not like and isNull expression",
query: "select * from system.databases where not (name is null or name not like 'a%')",
expect: "\
Projection: name:String\
\n Filter: (isnotnull(name) and (name like a%))\
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [1], scan schema: [name:String], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [(NOT (isnull(name) OR (name NOT LIKE a%)))]]",
},
Test {
name: "Equal expression",
query: "select number from numbers_mt(10) where not(number=1) and number<5",
expect: "\
Projection: number:UInt64\
\n Filter: ((number <> 1) and (number < 5))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((NOT (number = 1)) AND (number < 5))]]",
},
Test {
name: "Not equal expression",
query: "select number from numbers_mt(10) where not(number!=1) or number<5",
expect: "\
Projection: number:UInt64\
\n Filter: ((number = 1) or (number < 5))\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [((NOT (number <> 1)) OR (number < 5))]]",
},
Test {
name: "Not expression",
query: "select number from numbers_mt(10) where not(NOT toBoolean(number))",
expect: "\
Projection: number:UInt64\
\n Filter: toBoolean(number)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT (NOT toBoolean(number)))]]",
},
Test {
name: "Boolean transform",
query: "select number from numbers_mt(10) where number",
expect: "\
Projection: number:UInt64\
\n Filter: (number != 0)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [number]]",
},
Test {
name: "Boolean and truth transform",
query: "select number from numbers_mt(10) where not number",
expect: "\
Projection: number:UInt64\
\n Filter: (number = 0)\
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [8], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80], push_downs: [projections: [0], filters: [(NOT number)]]",
},
Test {
name: "Literal boolean transform",
query: "select number from numbers_mt(10) where false",
expect: "\
Projection: number:UInt64\
\n Filter: false\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [false]]",
},
Test {
name: "Limit zero transform",
Expand All @@ -111,7 +111,7 @@ fn test_expression_transform_optimizer() -> Result<()> {
Limit: 0\
\n Projection: number:UInt64\
\n Filter: true\
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0]]",
\n ReadDataSource: scan partitions: [0], scan schema: [number:UInt64], statistics: [read_rows: 0, read_bytes: 0], push_downs: [projections: [0], filters: [true]]",
},
];

Expand Down
Loading

0 comments on commit 34dbe41

Please sign in to comment.