Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): add OBJECT_STORE_CONCURRENCY_LIMIT setting for ObjectStoreFactory #2458

Merged
merged 6 commits into from
May 1, 2024

Conversation

vigimite
Copy link
Contributor

@vigimite vigimite commented Apr 27, 2024

Description

This PR adds a configuration to control concurrent access to the underlying object store. It also includes a visibility change to the S3LogStoreFactory to align it with all other provider implementations.

Related Issue(s)

Documentation

https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Apr 27, 2024
@vigimite
Copy link
Contributor Author

If this should be turned into a generalized solution, let me know how you would want me to implement this as there would be multiple ways to do this :)

@ion-elgreco
Copy link
Collaborator

If this should be turned into a generalized solution, let me know how you would want me to implement this as there would be multiple ways to do this :)

Generalized for what? Isn't this just an issue for AWS?

@vigimite
Copy link
Contributor Author

This is not necessarily just an AWS issue, I checked and Azure also uses HTTP1 for Blob storage so parallel requests would also open a bunch of file handles in that case, which, if the executor has a smaller ulimit, would have the same outcome.

@ion-elgreco
Copy link
Collaborator

@zZKato in that case, indeed a generic one that works on all providers is preferred

@vigimite
Copy link
Contributor Author

I am currently testing this out in our production code base, once i can verify this works well, I would need some suggestions on where to implement this. We could make a general StorageConfig in the storage module but then it would mean a hierarchical application of the config and every provider would need to implement this behavior. Another option would be to provide a new interface to the ObjectStoreFactory that could handle this case like parse_url_opts_with_limit, the downside there would be that it would coexist with the already existing storage_options map. Open for any suggestions or preferences.

@ion-elgreco
Copy link
Collaborator

@zZKato maybe you make a function that takes a generic T which is the inner object store, and put in deltalake storage, and then every factory calls that at the end

@vigimite
Copy link
Contributor Author

Can confirm it works, had to actually go all the way down to a concurrency limit of 50 to access 11 delta tables but it didn't seem to affect run time that much! I will over the next couple of days refactor this PR for a generic solution.

@vigimite vigimite changed the title feat(python, rust): add AWS_S3_LIMIT_CONCURRENCY setting for S3ObjectStoreFactory feat(python, rust): add OBJECT_STORE_CONCURRENCY_LIMIT setting for ObjectStoreFactory Apr 29, 2024
@vigimite
Copy link
Contributor Author

vigimite commented Apr 29, 2024

@ion-elgreco let me know if you like this approach. I stuck to the already existing pattern of the url_prefix_handler.

Edit: I also fixed some lints, hope thats ok


assert_eq!(
String::from("LimitStore(500, InMemory)"),
format!("{limited}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the fact that each ObjectStore has a Display trait implementation, I didn't find a better way to test this

@rtyler
Copy link
Member

rtyler commented May 1, 2024

@zZKato thanks for putting the effort in here. I am quite curious to some day learn more about how many tables are being queried at once those Lambdas 😄

The generalized approach as written makes sense to me, however we have really struggled with an abundance of storage configuration parameters and documenting all the flags.

Can you share some reasoning on what the challenge might be to implementing a custom S3ObjectStoreFactory style implementation which wraps the object store in your application, and then overriding the built in file scheme handler with your custom one (see register_handlers()

I can imagine this would be tricky if your Lambdas are all Python-based, but it's not clear from the issue linked whether or not that is the case.

@vigimite
Copy link
Contributor Author

vigimite commented May 1, 2024

@rtyler this lambda is just building a report out of 11 different aggregated metrics which are stored in individual tables so not that 🌶️.

I totally understand that that is an issue, just while writing this PR, it was difficult to try to understand all the layers of options and thinking about how to best implement it. I think that perhaps the interface needs some refactoring. My understanding is that the factories are just a way to simplify the handling of object stores and provide additional error handling when missing configs.

Maybe removing the factories for object stores as a concept and instead working directly with the underlying ObjectStore trait might remove some of that complexity. Something like making a single register_handler function that takes the underlying oject store as a parameter and the delta implementation with the convenience functions would just be internal detail on how to compose the specific ObjectStore impl (basically decoupling those from eachother, somewhat of a dependecy inversion (?)). This would allow rust consumers to compose their own ObjectStore impl.

I am in the process of writing a library to simplify the creation of these kinds of ETL pipelines with AWS lambda + datafusion + deltalake and am running into the same issue of creating a bunch of helpers to simplify the registration of object stores mainly around path handling. You could argue that this is an issue with the object_store API itself.

We're using rust lambdas so no concerns there, however I think that this implementation being part of delta-rs makes sense as this issue can be quite easy to come accross. Deltalake specifically has the potential to open a large amount HTTP/1 connections depending on the query and the size of the delta table. The suggested solution is to just increase the ulimit which isn't always an option (like for aws lambda) and imo not the best approach to deal with this.

We've been running this branch in production for a couple of days and it seems to not have decreased performance at all suprisingly and in fact there have been some nice side effects like the significant reduction of the number of backpressures in the underlying S3 client (e.g. "Encountered transport error backing off for 0.1 seconds, retry 1 of 10: error sending request for url (<s3 url>): connection closed before message completed"). These kinds of logs basically never appear anymore.
I could imagine that, counterintuitively, this could in some cases even increase performance due to not opening as many HTTP/1 connections, causing less system overhead but thats just a hunch and would need further investigation.

Either way is fine for me and I can just adjust the way we handle this :) and thanks for doing the stuff you all do, I really love the delta-rs (and deltalake) project.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented May 1, 2024

@zZKato thanks for putting the effort in here. I am quite curious to some day learn more about how many tables are being queried at once those Lambdas 😄

The generalized approach as written makes sense to me, however we have really struggled with an abundance of storage configuration parameters and documenting all the flags.

Can you share some reasoning on what the challenge might be to implementing a custom S3ObjectStoreFactory style implementation which wraps the object store in your application, and then overriding the built in file scheme handler with your custom one (see register_handlers()

I can imagine this would be tricky if your Lambdas are all Python-based, but it's not clear from the issue linked whether or not that is the case. If someone doesn't pass this storage_option the original object_Store is just passed through

I think this will be valuable for python users as well, most of the users are on that side as well.

@zZKato can you update the docs pages to include this

@rtyler rtyler enabled auto-merge (squash) May 1, 2024 15:50
@rtyler rtyler merged commit 0c8e5d5 into delta-io:main May 1, 2024
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate
Projects
None yet
3 participants