Skip to content

Commit

Permalink
[fix] get_by_uri also returns path
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 5, 2021
1 parent fe19d62 commit 8e8fd98
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
15 changes: 9 additions & 6 deletions datafusion/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ impl ListingOptions {
object_store_registry: Arc<ObjectStoreRegistry>,
uri: &str,
) -> Result<SchemaRef> {
let object_store = object_store_registry.get_by_uri(uri)?;
let (object_store, path) = object_store_registry.get_by_uri(uri)?;
let file_stream = object_store
.list_file_with_suffix(uri, &self.file_extension)
.list_file_with_suffix(path, &self.file_extension)
.await?
.map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
Expand Down Expand Up @@ -137,9 +137,9 @@ impl TableProvider for ListingTable {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODO object_store_registry should be provided as param here
let object_store = self.object_store_registry.get_by_uri(&self.uri)?;
let (object_store, path) = self.object_store_registry.get_by_uri(&self.uri)?;
let (partitioned_file_lists, statistics) = self
.list_files_for_scan(Arc::clone(&object_store), filters, limit)
.list_files_for_scan(Arc::clone(&object_store), path, filters, limit)
.await?;
// create the execution plan
self.options
Expand Down Expand Up @@ -169,13 +169,14 @@ impl ListingTable {
async fn list_files_for_scan<'a>(
&'a self,
object_store: Arc<dyn ObjectStore>,
path: &'a str,
filters: &'a [Expr],
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
// list files (with partitions)
let file_list = pruned_partition_list(
object_store.as_ref(),
&self.uri,
path,
filters,
&self.options.file_extension,
&self.options.partitions,
Expand Down Expand Up @@ -387,7 +388,9 @@ mod tests {
opt,
);

let (file_list, _) = table.list_files_for_scan(mock_store, &[], None).await?;
let (file_list, _) = table
.list_files_for_scan(mock_store, "bucket/key-prefix", &[], None)
.await?;

assert_eq!(file_list.len(), output_partitioning);

Expand Down
20 changes: 13 additions & 7 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,28 @@ impl ObjectStoreRegistry {
}

/// Get a suitable store for the URI based on it's scheme. For example:
/// URI with scheme file or no schema will return the default LocalFS store,
/// URI with scheme s3 will return the S3 store if it's registered.
pub fn get_by_uri(&self, uri: &str) -> Result<Arc<dyn ObjectStore>> {
if let Some((scheme, _)) = uri.split_once(':') {
/// - URI with scheme `file://` or no schema will return the default LocalFS store
/// - URI with scheme `s3://` will return the S3 store if it's registered
/// Returns a tuple with the store and the path of the file in that store
/// (URI=scheme://path).
pub fn get_by_uri<'a>(
&self,
uri: &'a str,
) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
if let Some((scheme, path)) = uri.split_once("://") {
let stores = self.object_stores.read().unwrap();
stores
let store = stores
.get(&*scheme.to_lowercase())
.map(Clone::clone)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
scheme
))
})
})?;
Ok((store, path))
} else {
Ok(Arc::new(LocalFileSystem))
Ok((Arc::new(LocalFileSystem), uri))
}
}
}

0 comments on commit 8e8fd98

Please sign in to comment.