-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pyarrow filter not pushed to scan_ds if datatype is a string #6395
Comments
@stinodego that would explain why partitioning must be set in scan_delta. |
Could we help here somehow? I think the issue is not in Python, but in the Rust backend. But we don't really know where to start debugging. |
I have to look at this more closely - I admit it fell off my radar. Will come back to this later this weekend. |
That is a different issue.
Any subsequent filters are pushed to The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this. On topicAbout the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look? |
I need to look at this more closely, but I think if the filters are properly pushed into the scan_ds. The partition is also filtered and file skipping is activated. At least this is described here: reading partitioned data Regarding the scan_ds issue, I can also confirm that none of the predicated are pushed through when a string is used. In the example both filters are not included: predicate = None |
Something to consider: delta-io/delta-rs#1128 |
@stinodego your above explanation is correct, but wont this affect any thing that relies on Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the |
I think there are two problems with the I believe if @chitralverma by the way, I saw that delta-rs 0.7.0 was released, that would allow the |
for lazy dfs,
actually i confirmed it with some of the delta guys for another usecase on how this works, If i use the deltalake lib to just list the parquet files for a specified version, the delta log is looked up and the file skipping actually happens. the main question is this - does the polars parquet read use parquet statistics for optimizing IO ? @ritchie46 maybe you can clarify this one. |
I did some tests on my side on the dataset we are struggling with:
Function Name : scan Function Name : concat The parquet concat method is significantly faster in my example. @chitralverma thanks a lot! We can already use this internally for our use case. |
@dominikpeter try setting rechunk to False in pl.concat and see if it gets better |
rechunk=False rechunk=True @chitralverma it slightly improved the performance in my example |
ok, i expected that.
What I am looking for is an argument against this approach. I don't think we are missing out on any capabilities but I am also not 100% sure. BTW sorry for using your time to test these things but if you have a delta table in an object storage (any one) can you try the scan_delta vs scan_parquet test on that. my guess is that in that case the results are not going to be very different. |
I don't think there is a real downside. This approach is also described here: https://delta-io.github.io/delta-rs/python/usage.html for dask dataframes. I think this could live inside rust and benefit from the things you mentioned. I would prefer this approach. No problem. I have to thank here :-). I will test it tomorrow and give feedback. |
I was not able to make it work with an object store (azure storage account).
delta-rs gives me the correct parquet files with the correct uris back. But polars results in an error: We discussed it a little bit internally. For the moment, we can push down the filters to the pyarrow dataset or use the concat approach. This will work for our use case. I guess, it makes sense to wait for ADBC before putting too much effort in this. One downside of the concat parquet approach will be for sure for future releases of delta-rs, when they want to support column mapping, computed columns and so on. Still believe it would make sense to fix the |
the file not found issue is because abfss scheme is not supported by fsspec see supported schemes in the azure example here but this can be changed now that the pickling issue is fixed on their side, however then in polars we will have to pin deltalake dependency to 0.7+ ADBC will require a conversion of polars plan to SQL for pushdown which does not exist at the moment in polars. |
I did some extensive test with the application we are running. I compared the We are using for example It was a really mixed result. I think for the moment I would keep it as it is. Maybe bump to 0.7 to take advantage of the new pickling. What we end up doing probably is something like this:
|
The problem seems to be that we don't push the predicate down to |
Polars version checks
I have checked that this issue has not already been reported.
I have confirmed this bug exists on the latest version of Polars.
Issue description
I have quite some performance differences when filtering a pyarrow dataset with the scanner method compared to polars own filtering.
After some testing, I found a strange behavior that could explain the performance difference. Could it be that predicates will not be pushed down to the pyarrow dataset if the datatype is a string?
In the example below the predicate is None in the first example compared to the other examples that filter on an integer:
predicate = None
with_columns = ['animal']
predicate = (pa.dataset.field('n_legs') == 2)
with_columns = ['animal']
predicate = (pa.dataset.field('year') == 2020)
with_columns = ['animal']
Therefore, on a bigger dataset, ds.scanner(filter=ds.field("animal") == "Flamingo") is way faster.
Or do I miss something?
Reproducible example
Expected behavior
Also push down the string predicate:
predicate = (pa.dataset.field('animal') == 'Flamingo')
with_columns = ['animal']
predicate = (pa.dataset.field('n_legs') == 2)
with_columns = ['animal']
predicate = (pa.dataset.field('year') == 2020)
with_columns = ['animal']
Installed versions
---Version info---
Polars: 0.15.16
Index type: UInt32
Platform: macOS-13.1-arm64-arm-64bit
Python: 3.11.1 (v3.11.1:a7a450f84a, Dec 6 2022, 15:24:06) [Clang 13.0.0 (clang-1300.0.29.30)]
---Optional dependencies---
pyarrow: 10.0.1
pandas: 1.5.2
numpy: 1.24.1
fsspec:
connectorx:
xlsx2csv:
deltalake:
matplotlib:
The text was updated successfully, but these errors were encountered: