Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrame from read_parquet is slow after repartition (each new partition reads from many old partitions) #1181

Open
lhoestq opened this issue Dec 31, 2024 · 12 comments

Comments

@lhoestq
Copy link

lhoestq commented Dec 31, 2024

Describe the issue:

When I run read_parquet on several files, I get a DataFrame without divisions. The issue is: if I increase npartitions with .repartition(), the resulting DataFrame has very slow partitions.

For example, calling head() starts reading from many files instead of one. This is not the case when using dask without dask-expr

Minimal Complete Verifiable Example:

minimal reproducible example:

import fsspec
import pyarrow as pa
import pyarrow.parquet as pq

fs = fsspec.filesystem("memory")
pq.write_table(pa.table({"i": [0]}), f"0.parquet", filesystem=fs)
pq.write_table(pa.table({"i": [1]}), f"1.parquet", filesystem=fs)

import dask.dataframe as dd
df = dd.read_parquet("memory://*.parquet")
df.npartitions  # 2
df.divisions  # (None, None, None)

# Now calling .head() should only read the first partition, so it should only read the first file
fs.delete("1.parquet")
df.head()  # works
df.repartition(npartitions=5).head()  # fails with dask-expr, and works without dask-expr
# FileNotFoundError

real world issue

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/HuggingFaceTB/finemath/finemath-3plus/train-*.parquet")  # 128 files
df.npartitions  # 512
df.divisions  # (None,) * 513
df = df.repartition(npartitions=2048)
df.head()  # super slow with dask-expr (downloads data from many many files), fast without dask-expr

Anything else we need to know?:

I need this to work correctly to showcase Dask to the Hugging Face community :)

There are >200k datasets on HF in Parquet format, and making partitions fast again can be a big improvement. E.g. a fast .head() can be a big plus for DX.

Environment:

  • Dask version: 2024.12.1, dask-expr 1.1.21 (also on main with 2024.12.1+8.g4fb8993c and 1.1.21+1.gedb6fd5)
  • Python version: 3.12.2
  • Operating System: MacOS
  • Install method (conda, pip, source): pip
@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

Hi, thanks for your report.

Dask-Expr combines multiple parquet files into a single partition to ensure reasonably sized partitions. Small partitions is generally bad for performance in Dask.

You can disable this behavior if needed through setting

dask.config.set({"dataframe.parquet.minimum-partition-size": 0})

I would advise against that for real workload that are more than exploratory though, tiny partitions are pretty bad for performance and scalability

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

Ah ok thanks ! Is there some documentation about this ? I'm trying to understand this better to share it with the community.

In particular does this mean that running df.repartition(npartitions=5).head() on small data actually runs 1 task as if there was 1 partition instead of 5 ?

@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

API docs is where this is documented: https://docs.dask.org/en/stable/generated/dask_expr.read_parquet.html#dask_expr.read_parquet

Assuming your parquet files are tiny, df will be a single partition since we combine all parquet files into one and then

df.repartition(npartitions=5).head() will split the single partition back up into 5 new partitions

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

I observe something different. I don't get one single partition but one partition per parquet file:

fs = fsspec.filesystem("memory")
pq.write_table(pa.table({"i": [0] * 10}), f"0.parquet", filesystem=fs)
pq.write_table(pa.table({"i": [1] * 10}), f"1.parquet", filesystem=fs)

import dask.dataframe as dd
df = dd.read_parquet("memory://*.parquet")
df.npartitions  # 2

each partition corresponds to one parquet file:

df.partitions[0].compute().to_dict(orient="records")
# [{'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}]
df.partitions[1].compute().to_dict(orient="records")
# [{'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}, {'i': 1}]

but for some reason if I repartition(npartitions=5) then the new first partition attempts to read from both files (even though it only keeps data from the first file)

df.repartition(npartitions=5).head().to_dict(orient="records")
# [{'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}]

fs.delete("1.parquet")
df.repartition(npartitions=5).head().to_dict(orient="records")
# FileNotFoundError

And I suspect this is linked to bad effects I'm getting when I run dask on HF Datasets where read_parquet + repartition returns a dataframe that is super slow because each partitions reads from many files instead of one.

@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

The partition squashing kicks in during optimization. Try

df.optimize().npartitions

that will return 1

npartitions is generally an implementation detail with the query optimizer and you shouldn't rely on us not changing the value. This was exposed with the legacy implementation because it was too dumb to handle things automatically, but it's generally an abstraction leak

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

hmmm

df.optimize().npartitions
# 2
df.repartition(npartitions=5).optimize().npartitions
# 5

@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

Yikes, sorry about the misinformation, that only happens with the arrow filesystem. There is some issue that's going on with partition pruning that's not working properly with a threaded scheduler. That is expected and something we are unlikely to fix. You should always create a local cluster to get good performance anyway. Could you try instantiating a client when reading from the hugging faces dataset with:

if __name__ == "__main__":
    client = Client()
    import dask.dataframe as dd
    df = dd.read_parquet("memory://*.parquet")
    df.repartition(npartitions=5).head().to_dict(orient="records")
    # [{'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}, {'i': 0}]

    fs.delete("1.parquet")
    df.repartition(npartitions=5).head().to_dict(orient="records")
    # works

That will properly drop tasks that you don't need and only read from one file.

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

Yesssss that works ! It solves the issue with the memory:// small parquet files and also the slowness with real HF datasets :) Thanks a lot for the help

I'll make sure to mention this in HF docs in the future. May also I suggest to show users a warning when they are in such a case maybe ?

@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

I'll make sure to mention this in HF docs in the future. May also I suggest to show users a warning when they are in such a case maybe ?

This is something we've discussed here and there. The threaded scheduler is generally not a good idea for actual workloads (testing is fine), but using a proper client requires the distributed package. So there is a little bit of a tradeoff involved, pushing users more aggressively towards a proper cluster is something we want to do though

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

I would argue that the threaded scheduler is not fine for testing because of this behavior 😬

@phofl
Copy link
Collaborator

phofl commented Jan 2, 2025

Talking about unit tests for correctness, not performance

@lhoestq
Copy link
Author

lhoestq commented Jan 2, 2025

I see ! Thanks for all the answers :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants