Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): introduce async at the top level #6830

Closed
wants to merge 1 commit into from

Conversation

winding-lines
Copy link
Contributor

@winding-lines winding-lines commented Feb 13, 2023

@ritchie46 I have been looking for ways to make progress on the object_store integration. This PR provides more of a high level proof of concept, I kept it simple to make it easier to see the proposed change.

I think that the fundamental problem is that we need the Tokio runtime to be initialize much closer to the root of tree of the Executors, at least in order to enable async. This will allow all async tasks to be part of one run time and data flow-in the Execution tree as it is being fetched.

I propose in this PR a possible way forward:

  1. instead of starting threads use a Tokio runtime - internally it will also start threads. It seems that the main entry point from Python is the Union executor so this PR starts there.
  2. use the Tokio primitives
  3. define an execute_async function in the Executor thread, we can provide a default implementation for the async method on top of the regular execute method
  4. Parquet and other executors can override the default implementation and use async code, on the async path. This change is not part of this PR, if you like the overall approach I can merge this in my earlier PR-6426, or chain the 2 PRs together.

What do you think about this approach? With something like this we can make more progress integrating cloud storage in Polars.

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature rust Related to Rust Polars labels Feb 13, 2023
@ritchie46
Copy link
Member

I really don't see this working well. All our executors work on complete DataFrames, not on batches. The executors themselves parallizes via rayon. This adds a lot of complexity/compiler bloat to the execution flow with not much added benefit.

Maybe we could add it in our push based engine. But I have to think on this for a while.

@winding-lines
Copy link
Contributor Author

I agree that Rayon is a good fit for Polars and provides a lot of performance benefits. I am not suggesting replacing Rayon or changing the way it is used today. I am suggesting replacing the top level thread POOLs, like the one in Union. The unit of parallelization would not change, it would still be the data frame. What async/tokio adds is the ability to chain io-cpu through out the tree of executors.

To be more precise in my use case most of the files are on cloud storage, so first the data needs to be downloaded. As it is downloaded from the network Rayon can kick in to decode it, project, filter and so on. I see the interplay between IO and CPU a lot more dynamic: as the downloads proceed and some data arrives early it makes sense to process that data before waiting for additional downloads since the CPU may be idle.

Maybe my use case represents a minority and the added complexity doesn't belong in Polars. For successful, complex projects it is ok to say no to certain features.

Let me know what you think :)

@ritchie46
Copy link
Member

Right.. I thought you were aiming to make all the executor API's async.

I agree that on the Union it does make much more sense. As this is the typical place where we concatenate many files from IO.

Is there a possibility to let tokio use the rayon thread pool?

@winding-lines
Copy link
Contributor Author

Let me play with this and evaluate any performance impact. We should be able to easily evaluate any performance impact by comparing runs using local paths versus nearly identical runs using the file: scheme. What tests should I run?

@ritchie46
Copy link
Member

Let me play with this and evaluate any performance impact. We should be able to easily evaluate any performance impact by comparing runs using local paths versus nearly identical runs using the file: scheme. What tests should I run?

Something with many files and much compute? Maybe groupby / agg over many parquet files with the same schema?

@jgmartin
Copy link
Contributor

jgmartin commented Feb 17, 2023

I am working with data on the order of many gigabytes of parquet hosted on s3. Part of our process involves sinking partitions to arrow files on disk. Having to spawn_blocking() around every LazyFrame::scan_parquet() does seem to slow things down when we're spawning 1000s of threads here. I'd think that if this allowed for an async scan_parquet() that didn't require spawning that would be a benefit (at least for us, in this crazy case of thousands of files).

if there's anything I could do to test with this large dataset (up to terabyte+ of compressed parquet, partitioned into ~1gb partitions) please lmk.

@winding-lines
Copy link
Contributor Author

@jgmartin happy to collaborate on this 👍

I am now looking at the proper way to integrate Tokio and Rayon, this looks promising https://github.com/andybarron/tokio-rayon/blob/main/src/async_thread_pool.rs

Just to set expectations: in my tests I have 80 parquet files on GCP and fetching their metadata is slow. I get about 0.5 seconds per parquet file. My current thinking is that for thousands of files you will need to centralize the stats for the files so that you fetch all the stats in a small number of operations. See the related comment here #6426 (comment)

@winding-lines
Copy link
Contributor Author

Hm, I have been testing on the orders table from TPC-H. With the multi-threaded union the mean duration over 10 runs is 1.810 sec. Switching to Tokio async increases the time to 2.007. I think we need something a bit more sophisticated here.

In related news I have been integrating in my other PR. The Tokio-Rayon integration is documented in the Tokio docs. Basically Tokio just want to do async on a small number of threads and the developer must run the CPU heavy code outside of Tokio. Rayon is one of the recommended approaches and using a onshot channel is the recommended integration.

Will push some more on this but no easy wins yet.

from typing import Final
import polars as pl
from time import monotonic


FILES: Final = "/data/datasets/tpc-h-v3.0.1/tripl-ai-tpch/parquet/orders/*.parquet"


def test_group():
    """Group by a column."""
    df = pl.scan_parquet(FILES)
    result = df.groupby("o_custkey").agg("o_totalprice").collect()
    assert result.shape == (99996, 2)
    result = df.groupby("o_custkey").agg(pl.col("o_totalprice").sum()).collect()
    assert result.shape == (99996, 2)
    current = [int(v.to_list()[0]) for k, v in result.sum().to_dict().items()]
    expected = [7499749087, 226829132781]
    assert current == expected, f"current {current} != {expected} expected"
    result = df.groupby("o_clerk").agg(pl.col("o_shippriority").mean()).collect()
    assert result.shape == (1000, 2)


if __name__ == "__main__":
    times = []
    attempts = 1
    print(f"Running {attempts} times...")
    for _ in range(attempts):
        start = monotonic()
        test_group()
        times.append(monotonic() - start)
    print(f"Mean time: {sum(times) / len(times)}, Max time: {max(times)}")

@winding-lines
Copy link
Contributor Author

winding-lines commented Feb 27, 2023

One more weekend invested in learning the ecosystem and the internals of polars. Polars is a complex, highly efficient multi threaded application and the architecture change has been alluded me so far.

My current thinking has now changed. Up to this point my attempt has been to:

  1. introduce async at the top level
  2. this ensure that we have Tokio runtime when needed
  3. just hook the async functions and block in the top level runtime

The problems with this approach are:

  1. changing the top level to async instead of threads loses about 20% performance
  2. a lot of the complex data structures downstream cannot easily be moved in and out of of block_on
  3. it is not really clear how to orchestrate the pipelining for the parquet files
    a. we want to fetch the data in parallel with the rayon decoding
    b. we need to implement a policy on how to do the download

My thinking has now shifted to the following approach:

  1. leave the Polars code as-is when it comes to the multi threading approach
  2. separate the async code in a persistent/separate Tokio runtime
  3. communicate between the two runtimes with messages, for parquet we need just to types of messages:
    a. file_info
    b. vec of DataFrame

In this separate runtime we use the following approach:

  1. a HashMap associates a parquet URL with the the ParquetAsyncReader and the CloudMapper
  2. when scan_parquet() is called in the LazyFileReader we initiate an async flow that downloads
    a. the metadata
    b. opportunistically downloads row_groups
    c. save them to disk to reduce memory pressure
    d. there will be a tunable limit on numbers of pre-fetches
  3. when get_batches() is called from the pipeline executor we access the pre-fetched data

FYI @ritchie46 still pushing on this but still no easy wins. I am hopeful that I will be able to push over the finish line the approach I just described.

@stinodego
Copy link
Contributor

I'm closing this pull request due to inactivity. Feel free to rebase and reopen and continue your work!

@stinodego stinodego closed this Aug 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants