Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust,python): Enable object store in scan_parquet python #6426

Conversation

winding-lines
Copy link
Contributor

This PR enables the object_store downloads in python when using scan_parquet. This is opt-in, and "object_store":True needs to be passed in the storage_options parameter.

For example

import polars
from time import time


aws_url = "s3://your-bucket/polars/datasets/foods1.parquet"
start = time()
df = polars.scan_parquet(
    aws_url,
    storage_options={
        "object_store": True,
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)
print(df.head())
print(f"Time: {time() - start}")

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Jan 25, 2023
@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch 3 times, most recently from 8e63d30 to d16c4db Compare January 26, 2023 17:23
@ritchie46
Copy link
Member

Thanks a lot for the PR @winding-lines. I hope to get to this one tomorrow. 💪

@winding-lines
Copy link
Contributor Author

Sorry @ritchie46, can you wait until the weekend/next week?

The code is working but the speedup is not significant - 6 seconds faster on this 42 second test that I am using.

It may well be that the speedup will not be significant when done and that the only benefit is that we can make the planner more efficient, but I want to push a bit more.

@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch from d16c4db to 58cafde Compare January 27, 2023 13:40
@winding-lines winding-lines changed the title feat(rust,python): Enable feature store in scan_parquet python feat(rust,python): Enable object store in scan_parquet python Jan 27, 2023
@winding-lines
Copy link
Contributor Author

@ritchie46 here is an update on this PR:

  1. this PR is working as described in the python snippet above. I think it is worthwhile for you to review it and based on your feedback we can then merge it.
  2. the current Python implementation of scan_parquet("s3://../*.parquet") uses fsspec.open(). In this case fsspec just opens the first file and the subsequent planning phase in polars is also fast. Fast is good but for most users opening just one file is unexpected behavior.
  3. the proposed Rust implementation in this PR actually lists all the files and then fetches the file_info in the planning phase.
    a. In my first implementation the async code was more self-contained but this leads to serializing the downloads for the metadata.
    b. In my current implementation of this PR I bring together the listing and downloading phases which cuts about 6 seconds from 42 seconds on a test with 80 parquet files.
  4. This PR enables use cases with low tens of parquet files. It is not usable with hundreds of files or more which I think we should also enable in subsequent PRs.

This Dask thread dask/community#234 (comment) has some good pointers on what we could do next. The general idea is to load the metadata from a centralized system, and for a general library as polars it may make sense to support multiple mechanisms. The first one could be the Hive _common_metadata describe in the link.

There is a secondary long term design question: assuming that 1 year from now polars supports Hive, Iceberg, Delta Table, Hudi, where should all of those implementation go? My personal preference is to have them supported natively but I know that adding dependencies to Polars is also not desirable.

@talawahtech
Copy link

Great work @winding-lines! Some unsolicited feedback on the function signature, what do you think of making it a little more generic e.g.

df = polars.scan_parquet(
    aws_url,
    storage_options={
        "storage_handler": "object_store",
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)

or

df = polars.scan_parquet(
    aws_url,
    storage_handler="object_store",
    storage_options={
        "region": "us-west-2",
        "aws_access_key_id": "xxxx",
        "aws_secret_access_key": "yyyy",
    },
)

If the storage_handler (or maybe storage_backend) param is missing, it defaults to fsspec.

@kylebarron
Copy link
Contributor

kylebarron commented Jan 28, 2023

The first one could be the Hive _common_metadata describe in the link.

It's not a part of the parquet spec, but systems that write directories of Parquet files often produce both _common_metadata and _metadata files. From the pyarrow docs:

Those files include information about the schema of the full dataset (for _common_metadata) and potentially all row group metadata of all files in the partitioned dataset as well (for _metadata). The actual files are metadata-only Parquet files. Note this is not a Parquet standard, but a convention set in practice by those frameworks.

This is especially helpful because you can read the _metadata file and then without any additional fetches you can do predicate pushdowns because you have the column statistics for every row group, and then you also have the exact byte ranges you want to fetch from. On that same page in the pyarrow docs they mention how to read and write.

On the Rust side, arrow2/parquet2 also have support for these _metadata files, see jorgecarleitao/parquet2#146 and jorgecarleitao/arrow2#1063.

Comment on lines +103 to +129
pub fn scan_parquet_async(
uri: String,
n_rows: Option<usize>,
cache: bool,
parallel: polars_io::parquet::ParallelStrategy,
row_count: Option<RowCount>,
rechunk: bool,
low_memory: bool,
cloud_options: Option<CloudOptions>,
) -> BoxFuture<'static, PolarsResult<Self>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that this would also likely be much faster once jorgecarleitao/parquet2#159 is implemented. Parquet stores its metadata in the footer of the file, and the last bytes of the file describe how many bytes the metadata take up. Right now arrow2/parquet2 require a HEAD request first to get the content length of the parquet file and then try to load the metadata. But this would be faster by:

  1. Fetching the last N bytes of a Parquet file using a range request, where N is something like 64kb which often includes the entire metadata.
  2. Checking the last 4 bytes of the buffer to get the length of the metadata
  3. If N wasn't enough bytes, do one more fetch to get all the metadata.
  4. Then parse the metadata

Maybe I'll find some time to try and make a PR for that in parquet2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kylebarron I am working on a native polars parquet reader. Hope we can include this.

@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch from 2df2697 to c0cb61a Compare January 29, 2023 04:18
@winding-lines
Copy link
Contributor Author

Current status:

  • the scan_parquet code is DONE
  • when collect-ing from python the physical_plan scan also needs to download the files, also DONE
  • MISSING: an Tokio run-time needs to be started on the collect path, working on it

@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch 2 times, most recently from dbbf410 to 5af7516 Compare January 29, 2023 13:22
@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch from 5af7516 to b46b979 Compare January 29, 2023 13:27
@winding-lines
Copy link
Contributor Author

@ritchie46 this is ready for review. I am happy to change the API as per @talawahtech 's great feedback, if you agree with it. I am not yet familiar with the python API you desire for Polars :)

])
.with_predicate_pushdown(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All those optimizations are default. We don't have to specify them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks for letting me know.

polars/polars-io/src/lib.rs Show resolved Hide resolved
polars/polars-io/src/parquet/read_impl.rs Outdated Show resolved Hide resolved
py-polars/Cargo.toml Outdated Show resolved Hide resolved
// List all the files in the bucket.
let files = async_glob(url, cloud_options.as_ref()).await?;
if files.len() > 10 {
println!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not print the stdout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. The intent is to give some feedback for what is likely a slow operation.

I changed this to stderr, but if this is not acceptable I am happy to remove.

Comment on lines +103 to +129
pub fn scan_parquet_async(
uri: String,
n_rows: Option<usize>,
cache: bool,
parallel: polars_io::parquet::ParallelStrategy,
row_count: Option<RowCount>,
rechunk: bool,
low_memory: bool,
cloud_options: Option<CloudOptions>,
) -> BoxFuture<'static, PolarsResult<Self>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kylebarron I am working on a native polars parquet reader. Hope we can include this.

polars/polars-io/src/parquet/read_impl.rs Outdated Show resolved Hide resolved
.unwrap_or_else(|| read::read_metadata(&mut reader))?;
let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();
let column_store = mmap::ColumnStore::Local(bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think ColumnStore is a good name. Can't local store whole row-groups?

Copy link
Contributor Author

@winding-lines winding-lines Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ritchie46 thanks for looking at the PR and I agree it's not a good name. To specifically answer you question, in local mode we don't actually store anything, it's just a pass through to the mmap-ed file.

MORE CONTEXT (sorry this is a bit long)

We are a bit in a tight spot:

  • when running locally we want to use mmap
  • when running remote we want to download only the parts of the file of interest (footer, metadata and then columns - this can be optimized a bit as per one of the comments)
  • we currently want the async code to be as self-contained as possible

The ColumnStore is an abstraction that I introduced in an earlier PR to allow us to keep most of the code as-is and specially limit the spread of async. To achieve this this layer provides a level of indirection where the reader uses linear addressing, so the code stays as is.

For the local use case the ColumnStore is pass through:

  • it maps the address requested by the reader to the actual address in the file
  • the OS is doing its magic
  • the code that you are looking at here is a no-op.

For the remote use case:

  • we pre-download the regions of the file that we know will be accessed later on.
  • This allows us to go in async mode for the period of the download and then come back to non-async mode.
  • All the downloaded areas get store in a hashmap.
  • When the reader executes and accesses an address we return the downloaded buffer.

I see a couple of ways forward:

  1. allow the reader to run in async mode and actually do the download in the reader when the region of the file is needed
  2. keep the current async island but come with a better name for the CloudStore

#1 is a bit of a departure from the current code architecture and you seem to prefer threads to async. Over the long run it may be inevitable though since we need to teach the planner the cost of operations.

If you want to go with #2 I could rename this to ColumMapper or CloudMapper, or some other name that resonates more with you.

Let me know what you think 🤗

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay (really busy). Great explanation. Could we add this as comments on top of the ColumnStore. Maybe we call it CloudMapper indeed.

I think we should first go for #2. I see some opportunities for some async in the streaming engine. But I don't want to overhaul our design and gradually experiment with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot Ritchie! Let's chain this PR on PR-6830. After we evaluate and possibly land PR-6830 we can come here and do a better job with the integration. This code will be much simpler if we have a Tokio runtime available when making it down to the download layer.

@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch 3 times, most recently from 3cb1df1 to ff360a6 Compare February 5, 2023 05:03
@winding-lines winding-lines force-pushed the feature/enable-object-store-in-python branch from ff360a6 to fb5502b Compare March 5, 2023 16:21
@stinodego
Copy link
Contributor

I'm closing this pull request due to inactivity. Feel free to rebase and reopen and continue your work!

For reference, this is now possible in Python through fsspec. Though a native Rust implementation of Cloud integration would be welcome.

@stinodego stinodego closed this Aug 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants