Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extreme memory usage when reading Parquet files written by Polars in PySpark #4393

Closed
Hoeze opened this issue Aug 13, 2022 · 19 comments · Fixed by #4406 or #5777
Closed

Extreme memory usage when reading Parquet files written by Polars in PySpark #4393

Hoeze opened this issue Aug 13, 2022 · 19 comments · Fixed by #4406 or #5777
Labels
bug Something isn't working

Comments

@Hoeze
Copy link

Hoeze commented Aug 13, 2022

What language are you using?

Python

Have you tried latest version of polars?

yes

What version of polars are you using?

'0.13.57'

What operating system are you using polars on?

Rocky Linux 8

What language version are you using

python 3.8

Describe your bug.

When reading a parquet file written with Polars in PySpark, I can observe a very high peak memory usage.
Also, printing the first 10 lines of the file is surprisingly slow.

What is the actual behavior?

Slow reading and high memory usage when one is using the native Parquet file writer.

What is the expected behavior?

Equal high-speed, low-memory reading as when the file would have been written with PyArrow.

What are the steps to reproduce the behavior?

I am writing a large dataframe with 19464707 rows to parquet:

(
    aggregated_df
    .collect()
    .write_parquet("test.parquet", compression="snappy", statistics=True)
)

Then I try reading it with PySpark:

test_df = spark.read.parquet('test.parquet')

test_df.limit(10).toPandas()

This gives me a peak memory usage of 20GB and takes quite a while.

Next, I tried writing it with limited row group size:
aggregated_df.collect().write_parquet("test.parquet", compression="snappy", statistics=True, row_group_size=64*1024*1024)
This results in a panic:

thread '<unnamed>' panicked at 'attempt to divide by zero', local_dependencies/polars-core/src/utils/mod.rs:160:22

---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Input In [28], in <module>
      1 (
----> 2     aggregated_df
      3     .collect()
      4     .write_parquet(snakemake.output["veff_pq"], compression="snappy", statistics=True, row_group_size=64*1024*1024)
      5 )

File /opt/anaconda/envs/<env>/lib/python3.9/site-packages/polars/internals/frame.py:1465, in DataFrame.write_parquet(self, file, compression, compression_level, statistics, row_group_size, use_pyarrow, **kwargs)
   1457     pa.parquet.write_table(
   1458         table=tbl,
   1459         where=file,
   (...)
   1462         **kwargs,
   1463     )
   1464 else:
-> 1465     self._df.to_parquet(
   1466         file, compression, compression_level, statistics, row_group_size
   1467     )

PanicException: attempt to divide by zero

As a workaround, one can set use_pyarrow=True.
Writing a file like this works as intended:
aggregated_df.collect().write_parquet("test.parquet", compression="snappy", statistics=True, row_group_size=64*1024*1024), use_pyarrow=True
Such a file can be immediately read by PySpark with low memory usage and very fast .limit(10).

@ritchie46
Copy link
Member

A fix is coming up in #4406. Note that the size unit is number of rows, so I think you want something smaller than 1024 * 1024 * 64 ~ 67M.

Regarding the large memory usage. Smaller row groups should fix that I assume. Polars' defaults to single chunks as it often can read that faster.

@Hoeze
Copy link
Author

Hoeze commented Aug 14, 2022

@ritchie46 Do you have any recommendation for this value?
I guess smaller values would be also advantageous for data skipping, right?
What is PyArrow using by default?

@Hoeze
Copy link
Author

Hoeze commented Aug 14, 2022

@ritchie46 I do not think the row group size is the issue.
I just checked the stats of the files written by PyArrow:

num of row groups: 1
row groups:

row group 0:
--------------------------------------------------------------------------------
total byte size: 23380034
num of rows: 12997812

num of columns: 90

As you can see, I still have all data in a single row group.

@ritchie46 ritchie46 reopened this Aug 14, 2022
@ritchie46
Copy link
Member

Can you show the output of pyarrow's parquet metadata for one written by polars and one written by pyarrow? You can set row_group_size=None.

@Hoeze
Copy link
Author

Hoeze commented Aug 14, 2022

Now this gets interresting:
The PyArrow dataset is 4.4Mb large, the Polars dataset 760Mb!

  • PyArrow:
    num of row groups: 1
    row groups:
    
    row group 0:
    --------------------------------------------------------------------------------
    total byte size: 31171256
    num of rows: 19464707
    
  • Polars:
    num of row groups: 1
    row groups:
    
    row group 0:
    --------------------------------------------------------------------------------
    total byte size: 8057671249
    num of rows: 19464707
    

For the record, the files were written like that:

(
    aggregated_df
    .sort(["chrom", "start", "end", "ref", "alt", "gene", "subtissue"])
    .collect()
    .write_parquet(f"{os.environ.get('TMP')}/pyarrow.parquet", compression="snappy", statistics=True, use_pyarrow=True)
)

(
    aggregated_df
    .sort(["chrom", "start", "end", "ref", "alt", "gene", "subtissue"])
    .collect()
    .write_parquet(f"{os.environ.get('TMP')}/polars.parquet", compression="snappy", statistics=True, use_pyarrow=False)
)

@Hoeze
Copy link
Author

Hoeze commented Aug 14, 2022

There are quite large differences between the schemas.
E.g. many columns are written as int64 by PyArrow while polars writes them as int32:

pyarrow.parquet.schema
polars.parquet.schema

@jorgecarleitao
Copy link
Collaborator

jorgecarleitao commented Aug 14, 2022

Thanks a lot for sharing these. I suspect that the root cause is that by default pyarrow=True uses Dictionary encoding, while pyarrow=False does not. Example:

Native:

column 7:
--------------------------------------------------------------------------------
column type: INT32
column path: "features.transcript_ablation.max"
encodings: PLAIN RLE
total compressed size (in bytes): 8518250
total uncompressed size (in bytes): 82725062

Pyarrow

column 7:
--------------------------------------------------------------------------------
column type: INT64
column path: "features.transcript_ablation.max"
encodings: PLAIN_DICTIONARY PLAIN RLE
total compressed size (in bytes): 429
total uncompressed size (in bytes): 417

It is likely that your data is very suitable for dictionary encoding (i.e. this column has like 417/8~50 or so distinct values).

@ritchie46
Copy link
Member

ritchie46 commented Aug 14, 2022

@jorgecarleitao do you know if pyarrow does this data-driven? I have been thinking about encodings today, as we keep a sorted flag in polars and will also keep a parted flag in the future.

For categoricals we also know the amount of unique values per columns without doing any compute.

Any idea what good heuristics would be?

Another thought; If we write with statistics=true we have a lot more information and should be able to benefit from that as well.

@jorgecarleitao
Copy link
Collaborator

pyarrow does not use an heuristics afaik - it always writes as dict encoded unless specified.

For example:

import io

import pyarrow as pa
import pyarrow.parquet


a = pa.array(range(0, 10**8))
t = pa.table(data=[a], schema=pa.schema([pa.field("a", a.type, False)]))

bytes = io.BytesIO()
pa.parquet.write_table(t, bytes, write_statistics=True, compression=None)
bytes.seek(0)

meta = pa.parquet.read_metadata(bytes)
print(meta.row_group(0).column(0))

prints

encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN')
 has_dictionary_page: True
 dictionary_page_offset: 4
 total_compressed_size: 537185050

i.e. it does create and write a dictionary, even though every value is unique.

That would be awesome - if we have some information about the cardinality, we could use it. The API to write to parquet dictionaries in arrow2 is that we convert to a DictionaryArray (and pass the dictionary encoding to the encodings).

@ritchie46
Copy link
Member

That would be awesome - if we have some information about the cardinality, we could use

So for sorted integer data: DeltaBinaryPacked?

I read that the plain dictionary encoding is deprecated. So we should go for the RLE I guess?

@jorgecarleitao
Copy link
Collaborator

Yes, but DeltaBinaryPacked is not yet available in arrow2 - I am working on improving the support for other encodings right now :)

Yes, RLEDictionary :)

@ritchie46
Copy link
Member

I tried to add this but I cannot seem to encode an Int64 as RleDictionary. Is the fact that the pyarrow file shows 4 encodings a hint? encodings: ('PLAIN_DICTIONARY', 'PLAIN', 'RLE', 'PLAIN') 😅

@mkleinbort-ic
Copy link

mkleinbort-ic commented Dec 7, 2022

Not sure what is going on here, but just as an FYI I hit this issue today:

# 2 GB
df.write_parquet(f'./data/df_base_enriched.{VERSION}.parquet', use_pyarrow=True, statistics=True, compression='lz4')

# 20 GB and the file starts behaving strange
df.write_parquet(f'./data/df_base_enriched.{VERSION}.parquet', use_pyarrow=False, statistics=True, compression='lz4')

When I say the file starts behaving strange....

  1. It can't be loaded by dask or pandas's pd.read_parquet (results in an OSError, end of Stream)
  2. I can read individual columns using pl.scan_parquet
  3. polar's can't read the full file using pl.read_parquet

I'm using polars 0.15.2 and pyarrow 8.0.0

df is some complex 1,500,000 x 200 dataframe. These are the counts of column types:

<class 'polars.datatypes.Float64'>    114
<class 'polars.datatypes.Utf8'>        20
list[f64]                              10
<class 'polars.datatypes.Boolean'>      7
<class 'polars.datatypes.Int8'>         7
<class 'polars.datatypes.Int64'>        6
datetime[ns]                            6
list[str]                               4
<class 'polars.datatypes.Date'>         3
duration[ns]                            3
<class 'polars.datatypes.Int16'>        3
<class 'polars.datatypes.Int32'>        2

@ritchie46
Copy link
Member

ritchie46 commented Dec 7, 2022

Can you send the dataframe? It's hard to fix if we cannot reproduce it.

Can you remove the columns that you can read? Those are clutter.

@ritchie46 ritchie46 reopened this Dec 7, 2022
@mkleinbort-ic
Copy link

It's probably the utf8 columns

Here is a minimal example.

import os
import polars as pl 

df = pl.DataFrame({'textCol': ['Hello World!', 'and', 'Happy', 'Holidays']*int(1e6)})
df.write_parquet('test-T1.parquet', use_pyarrow=True)
df.write_parquet('test-T2.parquet', use_pyarrow=False)

t1 = os.path.getsize('test-T1.parquet') / 1000
t2 = os.path.getsize('test-T2.parquet') / 1000

print(f'{t1=:,.0f}, {t2=:,.0f}')
>>> t1=3, t2=510 #kb

@ritchie46
Copy link
Member

It's probably the utf8 columns

Here is a minimal example.

import os
import polars as pl 

df = pl.DataFrame({'textCol': ['Hello World!', 'and', 'Happy', 'Holidays']*int(1e6)})
df.write_parquet('test-T1.parquet', use_pyarrow=True)
df.write_parquet('test-T2.parquet', use_pyarrow=False)

t1 = os.path.getsize('test-T1.parquet') / 1000
t2 = os.path.getsize('test-T2.parquet') / 1000

print(f'{t1=:,.0f}, {t2=:,.0f}')
>>> t1=3, t2=510 #kb

This still doesn't show the bug right? You reported end of stream?

@mkleinbort-ic
Copy link

mkleinbort-ic commented Dec 7, 2022

Oh, found it: list[i64] columns 💣

import os
import polars as pl 

df = pl.DataFrame({'listIntCol': [[1,1,1], [1,2,3], [None,2,None]]*int(1e6)})
df.write_parquet('test-T1.parquet', use_pyarrow=True)
df.write_parquet('test-T2.parquet', use_pyarrow=False)

t1 = os.path.getsize('test-T1.parquet') / 1000
t2 = os.path.getsize('test-T2.parquet') / 1000

print(f'{t1=:,.0f} kb, {t2=:,.0f} kb')
>>> t1=1 kb, t2=264,645 kb <<< 💣💣💣💣

@mkleinbort-ic
Copy link

pd.read_parquet('test-T2.parquet') results in OSError: Unexpected end of stream

@ritchie46
Copy link
Member

Thanks. I will investigate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants