Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the DynamicFileCatalog in datafusion-catalog #11035

Merged
merged 60 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
37b5526
early draft
goldmedal Jun 20, 2024
ad1a854
fmt
goldmedal Jun 20, 2024
97ea11c
add example for dynamic file query
goldmedal Jun 21, 2024
2729c49
add test and refactor
goldmedal Jun 21, 2024
6f86577
clippy and add doc
goldmedal Jun 21, 2024
c91cdc6
cargo fmt
goldmedal Jun 21, 2024
3306df6
extract substitute_tilde function
goldmedal Jun 21, 2024
d82b273
fix the error handling
goldmedal Jun 21, 2024
c0491d5
fmt and clippy
goldmedal Jun 21, 2024
a60eeea
fix test
goldmedal Jun 21, 2024
9fa01aa
fix sqllogictests
goldmedal Jun 22, 2024
2ab3639
ignore dirs for windows test
goldmedal Jun 22, 2024
a8ee733
enhance the test for every file format
goldmedal Jun 22, 2024
7faab9f
disable the test for windows
goldmedal Jun 22, 2024
e1f3908
make dynamic file query configurable
goldmedal Jul 1, 2024
cf73ba2
revert array_query.slt
goldmedal Jul 1, 2024
c641e6b
modified the test and add example
goldmedal Jul 1, 2024
0806263
make dirs be optional
goldmedal Jul 1, 2024
f4d24e6
enable dynamic file query in cli
goldmedal Jul 1, 2024
4b71e59
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Jul 1, 2024
9964150
cargo fmt
goldmedal Jul 1, 2024
da1e5d3
modified example
goldmedal Jul 1, 2024
ed670fe
fix test
goldmedal Jul 1, 2024
ea5816e
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 13, 2024
fb8b9e0
fix merge conflict
goldmedal Aug 13, 2024
fa73ae7
tmp
goldmedal Aug 14, 2024
04cc155
tmp
goldmedal Aug 14, 2024
1ede35e
tmp
goldmedal Aug 14, 2024
51b1d41
fix the catalog and schema
goldmedal Aug 15, 2024
75b0b84
move dynamic file catalog to datafusion-catalog
goldmedal Aug 15, 2024
3e8d094
add copyright
goldmedal Aug 15, 2024
4eb8ca5
fix tests
goldmedal Aug 16, 2024
9913405
rename catalog in cli and update lock
goldmedal Aug 16, 2024
5d861b8
enable home_dir feature
goldmedal Aug 16, 2024
ea1c075
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 16, 2024
16be2e7
update lock
goldmedal Aug 16, 2024
db90c28
fix compile
goldmedal Aug 16, 2024
9353123
fix clippy
goldmedal Aug 16, 2024
daa7ed8
fmt toml
goldmedal Aug 16, 2024
e4a2174
fix doc test and add more doc
goldmedal Aug 16, 2024
506d1d6
fix clippy
goldmedal Aug 16, 2024
72ce464
add home_dir feature doc
goldmedal Aug 16, 2024
fb1b6ce
rollback the unused changed
goldmedal Aug 16, 2024
8f0952d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 18, 2024
f062fec
update lock
goldmedal Aug 18, 2024
b1baa84
fix sqllogictest
goldmedal Aug 18, 2024
76d7fee
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 20, 2024
f0f070b
separate dynamic file test to another slt
goldmedal Aug 20, 2024
6b77b6b
add test for querying url table but disabled this feature
goldmedal Aug 20, 2024
4e51a77
add dynamic_file.slt
goldmedal Aug 20, 2024
fafc9dc
remove home_dir feature
goldmedal Aug 20, 2024
a3a4f4d
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Aug 21, 2024
7dc238f
Merge branch 'main' into feature/10986-dynamic-table-provider
goldmedal Sep 4, 2024
f7b4b8c
update cli lock
goldmedal Sep 4, 2024
25d0ff6
fix msrv check
goldmedal Sep 4, 2024
a78bd3c
fix msrv check
goldmedal Sep 4, 2024
edeff33
rollback the lock change
goldmedal Sep 4, 2024
b1a922c
address review comment and enhance the doc
goldmedal Sep 7, 2024
e5ab14d
remove the legacy comment
goldmedal Sep 7, 2024
87d7503
add missing doc
goldmedal Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 18 additions & 74 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@
use std::any::Any;
use std::sync::{Arc, Weak};

use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use async_trait::async_trait;
use datafusion::catalog::dynamic_file_schema::substitute_tilde;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;

use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::RwLock;

use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
pub struct DynamicFileCatalog {
Expand Down Expand Up @@ -115,7 +112,8 @@ impl CatalogProvider for DynamicFileCatalogProvider {
}
}

/// Wraps another schema provider
/// Wraps another schema provider. [DynamicFileSchemaProvider] is responsible for registering the required
/// object stores for the file locations.
struct DynamicFileSchemaProvider {
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
Expand Down Expand Up @@ -149,9 +147,11 @@ impl SchemaProvider for DynamicFileSchemaProvider {
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return Ok(inner_table);
let inner_table = self.inner.table(name).await;
if inner_table.is_ok() {
if let Some(inner_table) = inner_table? {
return Ok(Some(inner_table));
}
}

// if the inner schema provider didn't have a table by
Expand Down Expand Up @@ -195,16 +195,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
state.runtime_env().register_object_store(url, store);
}
}

let config = match ListingTableConfig::new(table_url).infer(&state).await {
Ok(cfg) => cfg,
Err(_) => {
// treat as non-existing
return Ok(None);
}
};

Ok(Some(Arc::new(ListingTable::try_new(config)?)))
self.inner.table(name).await
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -215,24 +206,14 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.table_exist(name)
}
}
fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
if cur.starts_with('~') && !usr_dir.is_empty() {
return cur.replacen('~', usr_dir, 1);
}
}
}
cur
}

#[cfg(test)]
mod tests {
use super::*;

use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;

use super::*;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
Expand Down Expand Up @@ -262,7 +243,7 @@ mod tests {
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
Expand All @@ -286,7 +267,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -308,7 +289,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -330,41 +311,4 @@ mod tests {

assert!(schema.table(location).await.is_err());
}
#[cfg(not(target_os = "windows"))]
#[test]
fn test_substitute_tilde() {
use std::env;
use std::path::MAIN_SEPARATOR;
let original_home = home_dir();
let test_home_path = if cfg!(windows) {
"C:\\Users\\user"
} else {
"/home/user"
};
env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
test_home_path,
);
let input = "~/Code/datafusion/benchmarks/data/tpch_sf1/part/part-0.parquet";
let expected = format!(
"{}{}Code{}datafusion{}benchmarks{}data{}tpch_sf1{}part{}part-0.parquet",
test_home_path,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR
);
let actual = substitute_tilde(input.to_string());
assert_eq!(actual, expected);
match original_home {
Some(home_path) => env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
home_path.to_str().unwrap(),
),
None => env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }),
}
}
}
38 changes: 26 additions & 12 deletions datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,39 @@ use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();
let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
let ctx = SessionContext::new_with_config(cfg);

let testdata = datafusion::test_util::arrow_test_data();

let path = &format!("{testdata}/csv/aggregate_test_100.csv");
// register csv file with the execution context
ctx.register_csv(
"aggregate_test_100",
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;
ctx.register_csv("aggregate_test_100", path, CsvReadOptions::new())
.await?;

// execute the query
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
r#"SELECT c1, MIN(c12), MAX(c12)
FROM aggregate_test_100
WHERE c11 > 0.1 AND c11 < 0.9
GROUP BY c1"#,
)
.await?;

// print the results
df.show().await?;

// query the file by the path dynamically.
let df = ctx
goldmedal marked this conversation as resolved.
Show resolved Hide resolved
.sql(
format!(
r#"SELECT c1, MIN(c12), MAX(c12)
FROM '{}'
WHERE c11 > 0.1 AND c11 < 0.9
GROUP BY c1"#,
path
)
.as_str(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,13 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// dynamic query by the file path
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;

// print the results
df.show().await?;

Ok(())
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
dirs = "4.0.0"
goldmedal marked this conversation as resolved.
Show resolved Hide resolved
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
Expand Down
Loading
Loading