-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables #9064
ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables #9064
Conversation
9e6747f
to
5b2b6f2
Compare
Codecov Report
@@ Coverage Diff @@
## master #9064 +/- ##
==========================================
- Coverage 82.57% 81.66% -0.92%
==========================================
Files 204 215 +11
Lines 50327 52093 +1766
==========================================
+ Hits 41560 42540 +980
- Misses 8767 9553 +786
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking great - thank you @yordan-pavlov
I like the high level approach and algorithm and the implementation looks good.
👍 thank you
@@ -209,6 +251,479 @@ impl ParquetExec { | |||
} | |||
} | |||
|
|||
#[derive(Debug, Clone)] | |||
/// Predicate builder used for generating of predicate functions, used to filter row group metadata | |||
pub struct PredicateExpressionBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably this doesn't need to be a pub
struct given that it seems to be tied to the parquet scan implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly about this either way, but at the moment it has to be public because it is used as a parameter in pub ParquetExec::new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly either way either -- no need to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking some more about this, it could be done by moving the creation of PredicateExpressionBuilder from ParquetExec::try_from_files
into (ExecutionPlan for ParquetExec)::execute
, but then this work would be repeated for each partition, where as currently it's only done once; at this point I don't think it's worth it.
Operator::Eq => { | ||
let min_column_name = expr_builder.add_min_column(); | ||
let max_column_name = expr_builder.add_max_column(); | ||
// column = literal => column = (min, max) => min <= literal && literal <= max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these comments are quite helpful. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this particular comment should be // column = literal => (min, max) = literal => min <= literal && literal <= max
:), but yes, it does require some thinking so I thought it would be good to add these comments to help with the process
} | ||
|
||
/// Translate logical filter expression into parquet statistics physical filter expression | ||
fn build_predicate_expression( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest copying the (nicely written) summary of your algorithm from this PR' description somewhere into this file
It is probably good to mention the assumptions of this predicate expression -- which I think is that it will return true
if a rowgroup may contain rows that match the predicate, and will return false
if and only if all rows in the row group can not match the predicate.
The idea of creating arrays of (col1_min, col1_max, col2_min, col2_max ...)
is clever (and could likely be applied to sources other than parquet files).
// (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) | ||
expr_builder | ||
.scalar_expr() | ||
.gt_eq(rewrite_column_expr( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stylistically, you might be able to hoist out the repeated calls to
rewrite_column_expr(
expr_builder.column_expr(),
expr_builder.column_name(),
max_column_name.as_str()
and
rewrite_column_expr(
expr_builder.column_expr(),
expr_builder.column_name(),
min_column_name.as_str()
by evaluating them once before the match
expression:
let min_col_expr = rewrite_column_expr(
expr_builder.column_expr(),
expr_builder.column_name(),
min_column_name.as_str());
let max_col_expr = rewrite_column_expr(
expr_builder.column_expr(),
expr_builder.column_name(),
max_column_name.as_str())
But they way you have it works well too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I have just pushed a change which removes this repetition and makes that part of the code cleaner; not long to go now, some more tests to add for the execution of the row group predicate in the next couple of days and this work should be ready to merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome -- thanks @yordan-pavlov -- I am excited for this one. When it is ready I'll re-review the code and get it merged asap.
Thanks again for introducing this feature. 🎉
Max, | ||
} | ||
|
||
fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you could use NullArray
here instead: https://github.com/apache/arrow/blob//rust/arrow/src/array/null.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's what I thought at first, and then realized that NullArray
returns data type of DataType::Null
, which doesn't work when the statistics record batch is created as it checks that types from the schema fields and from arrays are the same; that's why I wrote the build_null_array
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I have now changed the code to use NullArray
but have had to add a new new_with_type
constructor function (for the reason explained in my previous comment)
@@ -137,6 +137,22 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> { | |||
metadata, | |||
}) | |||
} | |||
|
|||
pub fn filter_row_groups( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a fancy way to filter out the row groups -- it is probably worth adding documentation here.
I don't know if there are assumptions in the parquet reader code that the row group metadata matches what was read from the file or not
I suggest you consider filtering the row groups at the DataFusion (aka skip them in the datafusion physical operator) level rather than in the parquet reader level and avoid that potential problem completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we can either move this to the application layer (i.e., data fusion), or expose it as a util function from footer.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about documentation - will add some shortly.
As long as row group metadata is filtered immediately after creating a SerializedFileReader, this approach will work.
That's the simplest way I could think of to allow filtering of row groups using statistics metadata; not sure how this could be done within DataFusion itself, because it reads data in batches (of configurable size) which could potentially span multiple row groups; it could be done, but would probably move a lot of complexity into DataFusion which today is nicely abstracted into the parquet library. This would also expose a lot more about the internals of a parquet file format to the outside as the user would have to be aware of row groups rather than just requesting batches of data.
May be I misunderstand what you are suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is another possibility - I have just noticed FilePageIterator::with_row_groups
which could be used to filter row groups based on a list of row group indexes; this could replace the filter_row_groups
method but would require the row group indexes to be passed down all the way to build_for_primitive_type_inner
where FilePageIterator
is created; this could be done through a new field in ArrayReaderBuilderContext
.
It's a deeper change but would mean that filter_row_groups
method is no longer necessary. @sunchao do you think this would be a better way to go about filtering of row groups? I am not sure the complexity is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking is that, we can have another constructor for SerializedFileReader
which takes a custom metadata:
pub fn new_with_metadata(chunk_reader: R, metadata: ParquetMetaData) -> Result<Self> {
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: metadata,
})
}
and we move the metadata filtering part to data fusion, or a util function in footer.rs
.
In the long term though, I think we should do something similar to parquet-mr is doing, that is, having a ParquetReadOptions
-like struct which allows user to specify various configs, properties, filters, etc when reading a parquet file. The struct is extendable as well to accommodate new features in future such as filtering with column indexes or bloom filters, so we don't need to have multiple constructors. The constructor can become like this:
pub fn new(chunk_reader: R, options: ParquetReadOptions) -> Result<Self> {
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
options: options,
})
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the second option, with the ParquetReadOptions
parameter, sounds better (compared to the new_with_metadata
method) - more extensible as you have described; however I think this falls outside of the scope of this PR;
one issue I can think of, though, is that the code needs to read the statistics metadata from the parquet file, in order create the statistics record batch, execute the predicate expression on it, and then use the results to filter the parquet row groups; this could still work, if the parquet metadata can be read before SerializedFileReader
is crated using the proposed constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
however I think this falls outside of the scope of this PR;
I agree -- this is already a large enough PR (and important enough). If we need to add some non ideal api to parquet and then upgrade it later I think that is the better approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I didn't mean we should tackle it here - which is why I said "in the long term" :-)
None | ||
} else { | ||
Some( | ||
filters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment explaining the logic here? It isn't immediately obvious to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Immediately after posting that comment I see how it works now, but I think a comment would still be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave a quick overview to this, and while I am not the parquet resident expert (👀 @nevi-me), I was able to follow the idea and understand what it is happening. I think it is a great design (using min,max
) and implementation so far.
I left some comments to the implementation of building the Arrow arrays, but other than that, really good work here so far! 💯
Box::new(move |_, i| predicate_values[i]) | ||
} | ||
// predicate result is not a BooleanArray | ||
_ => Box::new(|_r, _i| true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would error or panic!
here or before that, or validate that the predicate is a boolean array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking in designing this has been that pushing the predicate down to parquet is optional, because even if it fails the query will still compute, just slower; because of that the code tries to avoid panicking and instead returns a predicate which returns true - it doesn't filter any row groups and lets them be processed by downstream operators.
It is even possible to have a partial predicate expression, where multiple conditions are joined using a logical AND
, and some of them can't be translated for some reason to physical expressions, they will be replaced by true
, but the rest will still be evaluated and could still filter some row groups.
}) | ||
}); | ||
|
||
if arrow_type == DataType::Utf8 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a match, as this is a bit brittle against matching specific datatypes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may not be very idiomatic Rust, but allows the code to handle this single special case separately
make_array(array_data) | ||
} | ||
|
||
fn build_statistics_array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have split this in N functions, one per array type (via generics), and write build_statistics_array
as simply match data_type { each implementation }
.
This would follow the convention in other places and reduces the risk of mistakes, particularly in matching datatypes.
let mut builder = ArrayData::builder(arrow_type) | ||
.len(statistics_count) | ||
.add_buffer(data_buffer.into()); | ||
if null_count > 0 { | ||
builder = builder.null_bit_buffer(bitmap_builder.finish()); | ||
} | ||
let array_data = builder.build(); | ||
let statistics_array = make_array(array_data); | ||
if statistics_array.data_type() == data_type { | ||
return statistics_array; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only valid for primitive types. In general, I would recommend using PrimitiveArray<T>::from_iter
, BooleanArray::from_iter
and StringArray::from_iter
. Using MutableBuffer
in this high level is prone to errors. E.g. if we add a filter for boolean types (e.g. eq and neq), this does not panic but the array is not valid (as the size is measured in bits, not bytes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your feedback. I was looking for a (mostly) generic approach to building statistics arrays and this is the simplest implementation I could come up with. Using MutableBuffer
may be prone to errors, but I have added a test to confirm it's working. Your questions make me think if this could be done with generics though.
From what I have seen, parquet statistics are only stored as Int32, Int64, Float, Double or ByteArray (used for strings and other complex types); may be someone with more experience with parquet can advise on how statistics work for boolean columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao actually what I wrote in my previous comment is incorrect - boolean is a valid statistics type; although in most cases I suspect that it may not provide very helpful statistics (because it only has two values - true and false); anyway I will look into a better implementation for the build_statistics_array
method and support for more types, but probably in a separate PR as this one is already fairly large
/// Scan the file(s), using the provided projection, and return one BatchIterator per | ||
/// partition. | ||
fn scan( | ||
&self, | ||
projection: &Option<Vec<usize>>, | ||
batch_size: usize, | ||
_filters: &[Expr], | ||
filters: &[Expr], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, I feel we should have a proper filter API defined in data fusion which can be shared among various data sources. On the other hand, the actual filtering logic should be implemented by different data sources / formats, probably via converting the data fusion's filter API to the corresponding ones from the latter.
But this is a very good start and we can probably do them as follow ups (if we don't care much for API changes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that starting with this PR and then extending the approach to be something more generic is a good approach.
@@ -137,6 +137,22 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> { | |||
metadata, | |||
}) | |||
} | |||
|
|||
pub fn filter_row_groups( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we can either move this to the application layer (i.e., data fusion), or expose it as a util function from footer.rs
.
…orks, plus one more test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went over the changes in this PR since I last reviewed it and I think it is looking good. Thank you so much for all the work @yordan-pavlov
I recommend we merge this PR soon and handle subsequent improvements (e.g. adding the algorithm description into comments and creating parquet reader options) as subsequent PRs -- this one is already large and with lots of commentary.
.downcast_ref::<StringArray>() | ||
.unwrap(); | ||
let string_vec = string_array.into_iter().collect::<Vec<_>>(); | ||
// here the first max value is None and not the Some("10") value which was actually set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@yordan-pavlov do you think this PR is ready to merge? |
@alamb yes I think this is ready to merge and as you said, already large enough |
Sounds good. I'll plan to merge it once master is opened for 4.0 commits (eta tomorrow). Thanks again |
I apologize for the delay in merging Rust PRs -- the 3.0 release is being finalized now and are planning to minimize entropy by postponing merging changes not critical for the release until the process was complete. I hope the process is complete in the next few days. There is more discussion in the mailing list |
Thanks again @yordan-pavlov -- I am totally stoked for this feature |
I think this is one of the big features of 4.0 already! Thanks @yordan-pavlov great work |
…quet tables While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements. I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain. This work is based on the following key ideas: * predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters * it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion * filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table * for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file This is still work in progress - more tests left to write; I am publishing this now to gather feedback. @andygrove let me know what you think Closes #9064 from yordan-pavlov/parquet_predicate_push_down Authored-by: Yordan Pavlov <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…quet tables While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements. I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain. This work is based on the following key ideas: * predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters * it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion * filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table * for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file This is still work in progress - more tests left to write; I am publishing this now to gather feedback. @andygrove let me know what you think Closes apache#9064 from yordan-pavlov/parquet_predicate_push_down Authored-by: Yordan Pavlov <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…quet tables While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements. I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain. This work is based on the following key ideas: * predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters * it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion * filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table * for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file This is still work in progress - more tests left to write; I am publishing this now to gather feedback. @andygrove let me know what you think Closes apache#9064 from yordan-pavlov/parquet_predicate_push_down Authored-by: Yordan Pavlov <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
…quet tables While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements. I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (`140ms for DataFusion vs 200ms for Spark`) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain. This work is based on the following key ideas: * predicate-push down is implemented by filtering row group metadata entries to only those which could contain data that could satisfy query filters * it's best to reuse the existing code for evaluating physical expressions already implemented in DataFusion * filter expressions pushed down to a parquet table are rewritten to use parquet statistics (instead of the actual column data), for example `(column / 2) = 4` becomes `(column_min / 2) <= 4 && 4 <= (column_max / 2)` - this is done once for all files in a parquet table * for each parquet file, a RecordBatch containing all required statistics columns ( [`column_min`, `column_max`] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet file This is still work in progress - more tests left to write; I am publishing this now to gather feedback. @andygrove let me know what you think Closes apache#9064 from yordan-pavlov/parquet_predicate_push_down Authored-by: Yordan Pavlov <[email protected]> Signed-off-by: Andrew Lamb <[email protected]>
While profiling a DataFusion query I found that the code spends a lot of time in reading data from parquet files. Predicate / filter push-down is a commonly used performance optimization, where statistics data stored in parquet files (such as min / max values for columns in a parquet row group) is evaluated against query filters to determine which row groups could contain data requested by a query. In this way, by pushing down query filters all the way to the parquet data source, entire row groups or even parquet files can be skipped often resulting in significant performance improvements.
I have been working on an implementation for a few weeks and initial results look promising - with predicate push-down, DataFusion is now faster than Apache Spark (
140ms for DataFusion vs 200ms for Spark
) for the same query against the same parquet files. Without predicate push-down into parquet, DataFusion takes about 2 - 3s (depending on concurrency) for the same query, because the data is ordered and most files don't contain data that satisfies the query filters, but are still loaded and processed in vain.This work is based on the following key ideas:
(column / 2) = 4
becomes(column_min / 2) <= 4 && 4 <= (column_max / 2)
- this is done once for all files in a parquet tablecolumn_min
,column_max
] in the example above) is produced, and the predicate expression from the previous step is evaluated, producing a binary array which is finally used to filter the row groups in each parquet fileThis is still work in progress - more tests left to write; I am publishing this now to gather feedback.
@andygrove let me know what you think