-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: refactoring AWS code out of the core crate #1995
Conversation
6f92689
to
1d64783
Compare
00c56ed
to
b49ebe2
Compare
crates/deltalake-aws/src/logstore.rs
Outdated
source: Box::new(err), | ||
}, | ||
LockClientError::ProvisionedThroughputExceeded => todo!( | ||
"deltalake-aws does not yet handle DynamoDB providioned throughput errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo?
crates/deltalake-aws/src/logstore.rs
Outdated
TransactionError::LogStoreError { | ||
msg: format!( | ||
"lock table '{}' not found", | ||
self.lock_client.get_lock_table_name() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you reuse table_name here?
pub struct S3ObjectStoreFactory {} | ||
|
||
impl S3ObjectStoreFactory { | ||
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason we are not using the S3Builder here and then from_env()? https://docs.rs/object_store/latest/object_store/aws/struct.AmazonS3Builder.html#method.from_env
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specifics of why this code does things these way are a little outside of my realm of understanding 😆
I was mostly moving code around, with some refactoring here and there. This code is largely doing the same thing it did when it lived in deltalake-core. That said, this takes StorageOptions
rather than S3StorageOptions
because the result of this is intended to be passed into the ObjectStoreFactory.parse_url_opts
which is a more generic trait and just operates on the StorageOptions
struct
pub struct S3ObjectStoreFactory {} | ||
|
||
impl S3ObjectStoreFactory { | ||
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this take the S3StorageOptions struct?
] | ||
datafusion-ext = ["datafusion"] | ||
gcs = ["object_store/gcp"] | ||
hdfs = ["datafusion-objectstore-hdfs"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this dependency removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of the functionality tied to the hdfs
feature is totally untested, it's ignored code, so I removed the feature for now. The hdfs
code needs to be moved into its own crate and probably its own repo tbh
impl LogStoreFactory for DefaultLogStoreFactory {} | ||
|
||
/// Registry of [LogStoreFactory] instances | ||
pub type FactoryRegistry = Arc<DashMap<Url, Arc<dyn LogStoreFactory>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding, what does DashMap solve here instead of a normal hashmap? I read the docs but I don't grasp what it's purpose is here : P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dashmap supports lock-free concurrency, so it can be used across threads in the process. HashMap would need to be put into a Mutex<T>
first in order to do the same.
} | ||
|
||
Ok(DeltaTableBuilder::from_uri(table_uri)) | ||
let url = ensure_table_uri(&table_uri).expect("The specified table_uri is not valid"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like unnecessarily check if you do from_uri
already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module level ensure_table_uri
does some behaviors that needed to be preserved for file URLs in particular. DeltaTableBuilder::from_uri
and from_valid_uri
both have slightly different functionality which I do think we might want to get rid of before 1.0
@@ -483,6 +411,8 @@ mod tests { | |||
assert!(uri.is_ok()); | |||
let _uri = ensure_table_uri("./nonexistent"); | |||
assert!(uri.is_ok()); | |||
let _uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); | |||
assert!(uri.is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not checking _uri here
OnelakeAbfs, | ||
} | ||
|
||
impl StorageIntegration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these things not used anymore for the other cloud stores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the other cloud stores they should be implementing the trait. I don't actually know why our integration tests were implemented the way they that they were, basically with everything in kind of hard-coded lists with feature flags.
The model used in deltalake-aws/tests/common.rs
is what I would expect to see more of for the other cloud provider traits
@rtyler - took the liberty to disable auto-merge since I think we should squash that one. The indivisula commits don't seem to be valuable on their own. More importantly though, I started to do a proper review. Off the bat I was wondering if we need a dedicated test crate to place all the data, or if we can just keep that in the core crate, and then test integrations. In any case i the way things are going, it makes sense to just move the example table to the root of the repository, rather then in a crate. Eventually we would likely add or even replace with the DAT tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exited to see we are working to untagle all this!!
Left some initial comments mainly orounf how we set up the registries.
Haven't full worked throught the factiories / registires yet, but at first glace it seems like we may be doing a bit too much work inside there.. My thinking would be users need to register a fuly functioning store for a given ObjectStoreUrl
(just scheme and host) - where we may or may not wnat to define our own ObjectStoreUrl
.
Maybe it looks already like that, just dumping some thought :).
debug!("build_storage() with {}", &self.options.table_uri); | ||
let location = Url::parse(&self.options.table_uri).map_err(|_| { | ||
DeltaTableError::NotATable(format!( | ||
"Could not turn {} into a URL", | ||
self.options.table_uri | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like in practice we should never hit this, if the builder was crated via a rout that ensure URIs are fine ... since we do have to dela with relative paths and the likes, how about we make the field on DeltaLoadOptios
a URL, and avoid throwing here?
return logstore_with(store, location, options); | ||
} | ||
Err(DeltaTableError::InvalidTableLocation( | ||
location.clone().into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this clone, as this is the last usage?
let scheme = Url::parse(&format!("{}://", location.scheme())) | ||
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; | ||
|
||
if let Some(factory) = logstores().get(&scheme) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we need to consider both scheme and host in the registries. Otherwise we cannot register stores for differnt buckets with differnet credentials.
The file:///
stores will always have an empty host, except in some arcane windows cases, that I have not yet encountered in the wild...
pub type FactoryRegistry = Arc<DashMap<Url, Arc<dyn ObjectStoreFactory>>>; | ||
|
||
/// TODO | ||
pub fn factories() -> FactoryRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to create global state for this? Ie. could we just place this on DeltaTable
?
My main concer is the integration with datafusion and other query engines, where such state is usually handled on a session. I also plan to to leverage the datafusion interns etc for our SCAN et. al. operations. In think in these cases having a global registry might limit hopw we can integrate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exited to see we are working to untagle all this!!
Left some initial comments mainly orounf how we set up the registries.
Haven't full worked throught the factiories / registires yet, but at first glace it seems like we may be doing a bit too much work inside there.. My thinking would be users need to register a fuly functioning store for a given ObjectStoreUrl
(just scheme and host) - where we may or may not wnat to define our own ObjectStoreUrl
.
Maybe it looks already like that, just dumping some thought :).
4649034
to
241db34
Compare
There are a number of changes here to untangle the coupling inside of deltalake-core to allow deltalake-aws to be separated properly
10db6c3
to
e288b61
Compare
ae1d379
to
3de10c3
Compare
3de10c3
to
581c381
Compare
5185517
to
11c1883
Compare
13126ab
to
bc2092f
Compare
bc2092f
to
56a2f8f
Compare
This test fails on main but passes in this branch because the URL handling logic introduced properly encodes file URLs. No need for object_store updates here Fixes delta-io#1806
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This change was quite time-consuming and as such changes a lot of files to ensure that unit and integration tests are cleaned up properly.
What should be done in a follow up pull request is the creation of the
deltalake-gcs
anddeltalake-azure
crates to contain their objectstore registration code and integration tests. I consider that out of the scope of this change.This is a prerequisite for my work on #1601