Skip to content

Commit

Permalink
Introduce the DynamicFileCatalog in datafusion-catalog (#11035)
Browse files Browse the repository at this point in the history
* early draft

* fmt

* add example for dynamic file query

* add test and refactor

* clippy and add doc

* cargo fmt

* extract substitute_tilde function

* fix the error handling

* fmt and clippy

* fix test

* fix sqllogictests

* ignore dirs for windows test

* enhance the test for every file format

* disable the test for windows

* make dynamic file query configurable

* revert array_query.slt

* modified the test and add example

* make dirs be optional

* enable dynamic file query in cli

* cargo fmt

* modified example

* fix test

* fix merge conflict

* tmp

* tmp

* tmp

* fix the catalog and schema

* move dynamic file catalog to datafusion-catalog

* add copyright

* fix tests

* rename catalog in cli and update lock

* enable home_dir feature

* update lock

* fix compile

* fix clippy

* fmt toml

* fix doc test and add more doc

* fix clippy

* add home_dir feature doc

* rollback the unused changed

* update lock

* fix sqllogictest

* separate dynamic file test to another slt

* add test for querying url table but disabled this feature

* add dynamic_file.slt

* remove home_dir feature

* update cli lock

* fix msrv check

* fix msrv check

* rollback the lock change

* address review comment and enhance the doc

* remove the legacy comment

* add missing doc
  • Loading branch information
goldmedal authored Sep 9, 2024
1 parent f56d6d0 commit 9bc39a0
Show file tree
Hide file tree
Showing 20 changed files with 590 additions and 52 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 35 additions & 41 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use std::sync::{Arc, Weak};
use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};

use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
Expand All @@ -34,14 +33,13 @@ use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::RwLock;

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
pub struct DynamicFileCatalog {
/// Wraps another catalog, automatically register require object stores for the file locations
pub struct DynamicObjectStoreCatalog {
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileCatalog {
impl DynamicObjectStoreCatalog {
pub fn new(
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -50,7 +48,7 @@ impl DynamicFileCatalog {
}
}

impl CatalogProviderList for DynamicFileCatalog {
impl CatalogProviderList for DynamicObjectStoreCatalog {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -69,19 +67,19 @@ impl CatalogProviderList for DynamicFileCatalog {

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let state = self.state.clone();
self.inner
.catalog(name)
.map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _)
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _
})
}
}

/// Wraps another catalog provider
struct DynamicFileCatalogProvider {
struct DynamicObjectStoreCatalogProvider {
inner: Arc<dyn CatalogProvider>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileCatalogProvider {
impl DynamicObjectStoreCatalogProvider {
pub fn new(
inner: Arc<dyn CatalogProvider>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -90,7 +88,7 @@ impl DynamicFileCatalogProvider {
}
}

impl CatalogProvider for DynamicFileCatalogProvider {
impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -101,9 +99,9 @@ impl CatalogProvider for DynamicFileCatalogProvider {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let state = self.state.clone();
self.inner
.schema(name)
.map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _)
self.inner.schema(name).map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
}

fn register_schema(
Expand All @@ -115,13 +113,14 @@ impl CatalogProvider for DynamicFileCatalogProvider {
}
}

/// Wraps another schema provider
struct DynamicFileSchemaProvider {
/// Wraps another schema provider. [DynamicObjectStoreSchemaProvider] is responsible for registering the required
/// object stores for the file locations.
struct DynamicObjectStoreSchemaProvider {
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
}

impl DynamicFileSchemaProvider {
impl DynamicObjectStoreSchemaProvider {
pub fn new(
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
Expand All @@ -131,7 +130,7 @@ impl DynamicFileSchemaProvider {
}

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
impl SchemaProvider for DynamicObjectStoreSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -149,9 +148,11 @@ impl SchemaProvider for DynamicFileSchemaProvider {
}

async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return Ok(inner_table);
let inner_table = self.inner.table(name).await;
if inner_table.is_ok() {
if let Some(inner_table) = inner_table? {
return Ok(Some(inner_table));
}
}

// if the inner schema provider didn't have a table by
Expand Down Expand Up @@ -201,16 +202,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
state.runtime_env().register_object_store(url, store);
}
}

let config = match ListingTableConfig::new(table_url).infer(&state).await {
Ok(cfg) => cfg,
Err(_) => {
// treat as non-existing
return Ok(None);
}
};

Ok(Some(Arc::new(ListingTable::try_new(config)?)))
self.inner.table(name).await
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand All @@ -221,7 +213,8 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.table_exist(name)
}
}
fn substitute_tilde(cur: String) -> String {

pub fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
if cur.starts_with('~') && !usr_dir.is_empty() {
Expand All @@ -231,22 +224,22 @@ fn substitute_tilde(cur: String) -> String {
}
cur
}

#[cfg(test)]
mod tests {

use super::*;

use datafusion::catalog::SchemaProvider;
use datafusion::prelude::SessionContext;

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
)));

let provider = &DynamicFileCatalog::new(
let provider = &DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
) as &dyn CatalogProviderList;
Expand All @@ -269,7 +262,7 @@ mod tests {
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
Expand All @@ -293,7 +286,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -315,7 +308,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await.unwrap();
let table = schema.table(&location).await?;
assert!(table.is_none());

let store = ctx
Expand All @@ -337,6 +330,7 @@ mod tests {

assert!(schema.table(location).await.is_err());
}

#[cfg(not(target_os = "windows"))]
#[test]
fn test_substitute_tilde() {
Expand Down
10 changes: 6 additions & 4 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::{
exec,
Expand Down Expand Up @@ -173,11 +173,13 @@ async fn main_inner() -> Result<()> {

let runtime_env = create_runtime_env(rt_config.clone())?;

// enable dynamic file query
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
.enable_url_table();
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that knows how to open files
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
ctx.state().catalog_list().clone(),
ctx.state_weak_ref(),
)));
Expand Down
6 changes: 6 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ async fn main() -> Result<()> {
.await?;
parquet_df.describe().await.unwrap().show().await?;

let dyn_ctx = ctx.enable_url_table();
let df = dyn_ctx
.sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap()))
.await?;
df.show().await?;

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,14 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// dynamic query by the file path
ctx.enable_url_table();
let df = ctx
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str())
.await?;

// print the results
df.show().await?;

Ok(())
}
1 change: 1 addition & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
parking_lot = { workspace = true }

[lints]
workspace = true
Loading

0 comments on commit 9bc39a0

Please sign in to comment.