-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
write_parquet function in polars-u64-idx does not support large data frames #3120
Comments
I think we must split these large dataframes up in different row groups. I already open a discussion/issue for this upstream: jorgecarleitao/arrow2#932 Maybe we should also default to another compression like |
@jnthnhss I wan to change the default compression algorithm to |
I tried out all the options for compression below. With no compression I also get an error. I should mentioned that it does work with import polars as pl
df = pl.select(pl.repeat(0,n=2**32).alias('col_1'))
df.write_parquet('tmp.parquet',compression='uncompressed')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")
df.write_parquet('tmp.parquet',compression='snappy')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("underlying snap error: snappy: input buffer (size = 34359738368) is larger than allowed (size = 4294967295)")
df.write_parquet('tmp.parquet',compression='gzip')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")
df.write_parquet('tmp.parquet',compression='lzo')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("Compression Lzo is not supported")
df.write_parquet('tmp.parquet',compression='brotli')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")
df.write_parquet('tmp.parquet',compression='lz4')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285")
df.write_parquet('tmp.parquet',compression='zstd')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.8/dist-packages/polars/internals/frame.py", line 1400, in write_parquet
self._df.to_parquet(file, compression, statistics)
exceptions.ArrowErrorException: ExternalFormat("A page can only contain i32::MAX uncompressed bytes. This one contains 34896609285") |
This should be fixed by #3181. We now split large pages before writing. |
Thank you so much. I will give it a try today. |
Same error implementing in the Rust API using 0.20.0 yesterday. Is the mentioned patch only available on a pre-release version? Amazing work, by the way. For completeness:
FWIW, Snappy > Lzo as a default. At least Cloudera thinks so and they helped develop parquet. |
Also, is there an option to control the page size? |
No, there is not. The fix upstream now halves the page size when it is estimated to hit the limit. |
Did you took the PR branch? It is not yet on master. |
Apologies, that was a noob error. I have re-built using the PR branch this time. Used Snappy with this test on a 43GB TSV file being converted to parquet. I get a new error:
Full code is just based on your example:
Using |
Thanks for giving it another try. I will see if I can try to reproduce this in a few days. |
Hi @sammysheep, could you give it another try? There might still be a problem, but that would then mean you have a single value that is huge. Could that be the case? |
I'll give it a shot and let you know. This file has been converted before via Spark, which I think also uses arrow, albeit a different implementation. The largest single values are about 32k bytes or so. |
This time it does something different. It just says However, I watched the process and it died after it reached the memory limit of the server, which is about 187G. The file size this time was 56G, which was my mistake that I did not save the earlier version of the file. I can try to do something closer to the original test on Monday. I converted the same file yesterday on a similar server using local mode and coalescing to one partition. This is the closest to what I am doing with More on Monday with different sizes. |
I installed polars-u64-idx (0.13.26), and ran the above code. No errors. (And reading the resultant parquet file showed no problems.) If there's anything I can do to test/benchmark/whatever, please let me know. Just point me to the files. I have only 0.5TB of RAM, so my machine may not be able to read the absolute largest of files. |
Sorry for the delay. I chunked the data by lines equally and it ran successfully*:
The runtime stats show it is using around 150 to 160G memory for each file:
My system didn't like the parquet files *Some oddities include that the compressed parquet is much larger than the original file (over 2x). I don't know if the behavior I am showing is expected or not, but if not expected they likely fit into separate issues. Thanks for everything and let me know if you have other stuff I can test down the road. |
I will ping @jorgecarleitao in here as he has more understanding of this. What compression algorithm did you use? And do you see a difference with other ones? |
I used Snappy because that is the cluster default we use. Happy to try another one. What do you recommend? |
I am sorry this slipped through the cracks on my triage. @ritchie46 din't you fixed this by with the split of the array in multiple pages? |
I believe so. My machine is only a mere 16GB, so I am not able to test this. But @cbilot's successful run gives me trust that it is fixed now. |
Hmmm, when I ran that code, I was using So just to re-confirm the result, today, using import polars as pl
import numpy as np
df = pl.DataFrame({
'col1': np.random.normal(0, 1, 10_000_000_000),
}).with_row_count()
df
df.write_parquet("tmp.parquet")
df2 = pl.read_parquet('tmp.parquet')
df2
I can't compare the two files using |
Hi @jorgecarleitao ! Would you mind taking a look at the example run I posted above? Questions:
@ritchie46 I think it is "fixed" in terms of it running now but if you have memory like yours (16GB) or too large a file even on a server then when you try to convert a large file you will get a failure when the OS kills your process due to OOM. For the alternative I am trying to replace, my hunch is that Spark is chunking the data as it is read and writing to a fixed page/block size that typically won't blow up memory (more than the JVM's usual). In Polars am I basically slurping everything to memory at once and trying to do the conversion in a couple (2?) pages? In terms of large parquet being splittable in distributed systems (Cloud, Hadoop) it looks like you'd want to be able to set a smallish (128MB) page/block size in parquet, no? If it is something that is exposed in pyarrow2 it would be great to have available in polars. Use-case / scenarios:
|
There is. We can/should expose that in polars.
What do you mean by conversion? Polars is in-memory. So it will always read a file completely into memory (save for memory mapped ipc/feather files).
This possible for polars datasets that don't have struct data. Polars will write the chunks of your dataframe to parquet. If the dataframe is contiguous, it will write a single chunk. I will close this issue as it has been resolved. Discussion may continue. |
Thanks, looking forward to this!
Conversion from CSV to parquet using POLARS. I was wondering where one could set the parquet page size functionality that you added recently. If it exists, I'd play with the setting to try to get less peak memory allocated (assuming it re-uses a buffer or de-allocs after writing a page). My earlier analysis indicated that trying to convert a 28G CSV to parquet needed 150G+ of memory, which is OOM for most machines.
So if I am understanding you, I have to chunk the data frame to possibly lower the page sizes being written? Note: earlier I meant "arrow2" not "pyarrow2". |
By the bye, I really enjoyed your talk at the Data and AI Summit 2022 this year. It was definitely one of the better ones, at least to me. Thanks so much for your wonderful contributions to the community! |
What language are you using?
Python
What version of polars are you using?
0.13.21
What operating system are you using polars on?
Ubuntu 20.04.1 LTS
What language version are you using
Python 3.8.5
Describe your bug.
I'm using the 64 bit version of Polars. However, the
write_parquet
function does not seem to support large data frames.What are the steps to reproduce the behavior?
What is the actual behavior?
The text was updated successfully, but these errors were encountered: