Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collect_async is blocking #18718

Open
2 tasks done
winstxnhdw opened this issue Sep 12, 2024 · 12 comments
Open
2 tasks done

collect_async is blocking #18718

winstxnhdw opened this issue Sep 12, 2024 · 12 comments
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@winstxnhdw
Copy link

winstxnhdw commented Sep 12, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

from typing import TypedDict, Awaitable, Iterable
from polars import DataFrame, LazyFrame


class Word(TypedDict):
    id: int
    text: str
    start_time: float
    end_time: float

class Speaker(TypedDict):
    speaker: str
    start_time: float
    end_time: float


def get_intersection(transcription: Iterable[Word], diarisation: Iterable[Speaker]) -> Awaitable[DataFrame]:
    intersection_expression = min_horizontal(
        'end_time',
        'end_time_right',
    ) - max_horizontal(
        'start_time',
        'start_time_right',
    )

    return (
        LazyFrame(transcription)
        .join(LazyFrame(diarisation), how='cross')
        .with_columns(intersection_expression.alias('intersection'))
        .collect_async()
    )

Log output

No response

Issue description

In polars, Most of the CPU-bound activities happen in Rust where the Python GIL is dropped. Ideally, collect_async should take advantage of this for polars to maximise CPU usage. As of right now, collect_async will block the main event loop and stop your single worker server from handling any more requests until the DataFrame is created.

EDIT:
DataFrame creation is blocking the event loop. We can fix this by running it in a separate thread.

Expected behavior

collect_async should not block the main event loop and act as an actual async function that will allow Python to perform context switching and process other tasks that drop the GIL concurrently.

Installed versions

Polars:              1.7.0
Index type:          UInt32
Platform:            Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.40
Python:              3.12.5 (main, Aug  9 2024, 08:20:41) [GCC 14.2.1 20240805]

----Optional dependencies----
adbc_driver_manager  <not installed>
altair               <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               2024.9.0
gevent               <not installed>
great_tables         <not installed>
matplotlib           3.9.2
nest_asyncio         <not installed>
numpy                1.26.4
openpyxl             <not installed>
pandas               2.2.2
pyarrow              16.1.0
pydantic             2.9.1
pyiceberg            <not installed>
sqlalchemy           2.0.34
torch                2.3.1+cu121
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@winstxnhdw winstxnhdw added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Sep 12, 2024
@Object905
Copy link
Contributor

Well I happen to implement that 😃

About until the DataFrame is created - is the bottleneck here is creation of LazyFrames?
Because LazyFrames are actually always created eagerly from python (in init it creates regular DataFrame by iterating over all objects, then calls .lazy()).
And later collect_async does only join+with_columns part asynchronously.
To support something like this LazyFrames has to support "scanning" python iterables/generators, etc, which would actually be a great feature.

It does use polars threadpool here that runs collect on background, and then only resolves the future with GIL acquired.
It should work with scan_parquet, scan_scv, scan_*, because they're executed on rust side in .collect and not eagerly.

@winstxnhdw
Copy link
Author

winstxnhdw commented Sep 12, 2024

About until the DataFrame is created - is the bottleneck here is creation of LazyFrames?

Hmm.. Is there a way I can verify this? The LazyFrame only has ~50 rows but it is blocking the event loop for more than 10 seconds, which I think is an usually long time for creating such a trivial DataFrame.

It does use polars threadpool here that runs collect on background, and then only resolves the future with GIL acquired.

I see that you are spawning the pool in Rust. I am not sure about PyO3 but in my experience, I found that spawning a separate background thread in a C++ extension would still block the event loop as Python still has to constantly poll the job for completion (which is blocking unless you spawn a thread from Python and wrap the future).

@Object905
Copy link
Contributor

Well it's a different story then if it's only 50 rows. Care to provide data to reproduce?

Theoretically It shouldn't block event loop, because, because it's only resolving a future by calling this callback here at the end of exectuon of .collect on rust side,
And does not hold GIL during actual collect.

@winstxnhdw
Copy link
Author

I will get the data for you tomorrow.

not hold GIL during actual collect

Not holding the GIL is one thing, but it still needs to run in a separate thread for it to not block the event loop. If resolving the future involves some kind of polling of the results, the event loop will still be blocked.

@Object905
Copy link
Contributor

Internally it may pool on the results, but if it were to pool by blocking whole loop that would defeat the whole purpose of Future

@Object905
Copy link
Contributor

Hmm.. Is there a way I can verify this? (LazyFrame creation)

You can try to create LazyFrames on separate lines and time only that (or event just print-debug). Because init is sync.

@winstxnhdw
Copy link
Author

You are right. LazyFrame's creation is indeed the one responsible for blocking the event loop. I took your advice and tried using scan_ndjson but it was still blocking the event loop.

def get_intersection(transcription: IO[bytes], diarisation: IO[bytes]) -> Awaitable[DataFrame]:
    intersection_expression = min_horizontal(
        'end_time',
        'end_time_right',
    ) - max_horizontal(
        'start_time',
        'start_time_right',
    )

    return (
        scan_ndjson(transcription)
        .join(scan_ndjson(diarisation), how='cross')
        .with_columns(intersection_expression.alias('intersection'))
        .collect_async()
    )

@deanm0000
Copy link
Collaborator

When is the blocking happening? The output of the function is an awaitable so it seems like we need more info.

Are you doing something simple like this?

df_awaitable= get_intersection(tr, di)
df = await df

more like this?

asyncio.get_running_loop().run_until_complete(df_awaitable)

something else?

@winstxnhdw
Copy link
Author

winstxnhdw commented Sep 13, 2024

I am doing the former. I expect Awaitables to completely use the event loop. It defeats the purpose to use collect_async if I were to also run it in a thread pool. I could just use collect then.

@deanm0000
Copy link
Collaborator

You still have the issue that you're scanning an IO object so even though the polars binary would be non-blocking, it's getting its data from python which is blocking.

Try either making the inputs eager DFs or files.

In the case where the input is LazyFrame(transcription: dict) you're going to have to wait for polars to parse through all its dicts to send to arrow memory.

In the case where the input is scan_ndjson(transcription: IO) every time scan is looking at data it needs python which (I think) would be blocking.

So maybe try one of these

def get_intersection(transcription: pl.DataFrame, diarisation: pl.DataFrame) -> Awaitable[DataFrame]:
    transcription=transcription.lazy()
    diarisation=diarisation.lazy()

or

def get_intersection(transcription: Path, diarisation: Path) -> Awaitable[DataFrame]:
    transcription=pl.scan_ndjson(transcription)
    diarisation=pl.scan_ndjson(diarisation)

@winstxnhdw
Copy link
Author

The problem is that polars should be non-blocking end-to-end. This should be possible since reading from IO is async/await’s strength.

For now, collect_async is redundant in practice, and running collect in a thread pool + wrap futures would have the same effect. I suggest we add this to the documentation to reduce the amount of time wasted on this issue.

@Object905
Copy link
Contributor

Object905 commented Sep 21, 2024

Remembered that there is #17939, have not tried myself, but looks promising.
It doesn't support async and is more about releasing GIL for threading, but in future it may be possible to extend this approach to asyncio with PyO3/pyo3#1632.
pyo3-asyncio seems to be plausible too, but it lags in maintenance a bit.

Then it would be possible to have something, like scan_python which will correspond to scanning python (async)generators, just like scan_csv,parquet,etc. does.

After some search, since the problem is actual DataFrame creation, this seems to be a duplicate/related #4351

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

3 participants