-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce the DynamicFileCatalog
in datafusion-catalog
#11035
Introduce the DynamicFileCatalog
in datafusion-catalog
#11035
Conversation
@@ -305,10 +306,18 @@ impl SessionContext { | |||
|
|||
/// Creates a new `SessionContext` using the provided [`SessionState`] | |||
pub fn new_with_state(state: SessionState) -> Self { | |||
let state_ref = Arc::new(RwLock::new(state.clone())); | |||
state | |||
.schema_for_ref("datafusion.public.xx") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just a workaround to get DynamicFileSchemaProvider
. I'll refactor it later.
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { | ||
let inner_table = self.inner.table(name).await?; | ||
if inner_table.is_some() { | ||
return Ok(inner_table); | ||
} | ||
let optimized_url = substitute_tilde(name.to_owned()); | ||
let table_url = ListingTableUrl::parse(optimized_url.as_str())?; | ||
let state = &self | ||
.state_store | ||
.get_state() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaProvider::table
won't have SessionState
by default. We should get it from StateStore
.
let df = ctx | ||
.sql( | ||
format!( | ||
r#"SELECT column_1, MIN(column_12), MAX(column_12) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column is c1
actually. There're some issues about getting CSV header automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I found that this is the default behavior for ListTable
. The dynamic query in datafusion-cli is the same as this. I think we don't need to change it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior is controlled by the config option in the current session context. We can create the session context like
let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true");
let ctx = SessionContext::new_with_config(cfg);
to enable the header scanning.
@@ -19,7 +19,7 @@ | |||
|
|||
# Make a table with multiple input partitions | |||
statement ok | |||
CREATE TABLE data AS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there're a file or directory called data
in the working directory. However, I think it's really weird. IMO, we should only match a string literal like select * from '/xxx/aaa/data.csv'
but the normal identifier of a table also be matched.
I'll do more research for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I did an experimental for DataFusion v39.0.0 like
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ mkdir datas
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ cp aggregate_test_100.csv datas
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ datafusion-cli
DataFusion CLI v39.0.0
> select * from datas;
+----------+----------+----------+----------+-------------+----------------------+----------+----------+------------+----------------------+-------------+---------------------+--------------------------------+
| column_1 | column_2 | column_3 | column_4 | column_5 | column_6 | column_7 | column_8 | column_9 | column_10 | column_11 | column_12 | column_13 |
+----------+----------+----------+----------+-------------+----------------------+----------+----------+------------+----------------------+-------------+---------------------+--------------------------------+
I created a folder datas
and place a csv in this, then start the cli in this path. I can directly query this folder without single-quote string literal.
For a single file, it doesn't work
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ cp aggregate_test_100.csv data
(base) jax@jax-canner:~/git/datafusion/testing/data/csv (master~4) $ datafusion-cli
DataFusion CLI v39.0.0
> select * from data;
Error during planning: table 'datafusion.public.data' not found
I think it should be an issue to match only the string literal as the dynamic file table name.
@@ -64,6 +72,13 @@ SELECT * FROM arrow_partitioned ORDER BY f0; | |||
3 baz true 456 | |||
4 NULL NULL 456 | |||
|
|||
# dynamic select arrow file in the folder | |||
query ITB | |||
SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dynamic query doesn't support the partitioned folder. It could be an enhancement issue for it.
DynamicFileSchemaProvider
in the coreDynamicFileSchemaProvider
in the core
Thanks @goldmedal -- I hope to review this PR this wekeend, likely tomorrow |
I am sorry for the delay -- I plan to review this tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @goldmedal -- this is really cool
I have a few thoughts
Security implications
I worry about its security implications (this will now allow anything built on DataFusion 's SessionContext to read arbitrary local files even if https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SQLOptions.html#method.with_allow_dml is disabled
I think we should therefore not register this provider with the SessionContext by default and instead add some examples showing how to register it by itself
Structure / dependencies
As the code is currently structured (not related to this PR) it seems like this has to go into the core crate. It seems to me it would be better if we could find some way to start breaking up the core (e.g. remove the catalog providers, etc)
I am still thinking about this
Thanks @goldmedal -- I'll try and review this shortly |
7fef748
to
f7b4b8c
Compare
datafusion-cli/Cargo.lock
Outdated
@@ -584,9 +584,9 @@ dependencies = [ | |||
|
|||
[[package]] | |||
name = "aws-sdk-sts" | |||
version = "1.41.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we execute cargo update
, the version will be updated to 1.41.0
which isn't compatible with Rust 1.76, then the msrv check will fail. I'm not sure if there is a better way to limit the version in the cargo toml file to avoid the update
command updates an invalid version 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have actually worked around this on #12032 (comment)
I will review this PR over the next day or two |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is I think this PR is really nice @goldmedal -- the code is well structured and commented, and I think this will be a very useful feature for users of DataFusion
What I would like to do is to try and create an example in datafusion-examples
of a program that run queries from arbitrary URLs. SELECT * from 'my-s3://foo.com/data'
I'll try and work on that today or tomorrow -- I suspect in trying to make that example it will become apparent what, if any, gaps remain in the APIs in terms of registering ObjectStore
es etc.
// dynamic query by the file path | ||
ctx.enable_url_table(); | ||
let df = ctx | ||
.sql(format!(r#"SELECT * FROM '{}' LIMIT 10"#, &path).as_str()) | ||
.await?; | ||
|
||
// print the results | ||
df.show().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @alamb. I added a simple s3 example here. I hope it is what you want or that it could inspire you for a new example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No actually this is perfect -- thank you @goldmedal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @goldmedal
I plan to leave this PR open until Monday so anyone else who is interested can take a look at it prior to merge.
I left some suggestions on how to potentially improve the comments / documentation, but I think we (I can do it) as a follow on PR too
#[derive(Default)] | ||
pub struct DynamicListTableFactory { | ||
/// The session store that contains the current session. | ||
session_store: Arc<SessionStore>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW since the SessionStore has an Arc inside it, this extra level of Arc may not be necessary
/// # use datafusion::{error::Result, assert_batches_eq}; | ||
/// # #[tokio::main] | ||
/// # async fn main() -> Result<()> { | ||
/// let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @korowa changed the default so that has_header
is true by default now so this line is unecessary
/// Enable the dynamic file query for the current session. | ||
/// See [DynamicFileCatalog] for more details | ||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Enable the dynamic file query for the current session. | |
/// See [DynamicFileCatalog] for more details | |
/// | |
/// Enable dynamic file querying for the current session. | |
/// | |
/// This allows queries to directly access arbitrary file names via SQL like | |
/// `SELECT * from 'my_file.parquet'` | |
/// so it should only be enabled for systems that such access is not a security risk | |
/// | |
/// See [DynamicFileCatalog] for more details | |
/// |
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); | ||
let path = path.join("tests/tpch-csv/customer.csv"); | ||
let url = format!("file://{}", path.display()); | ||
let cfg = SessionConfig::new().set_str("datafusion.catalog.has_header", "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise here it isn't necessary to set the has_header
flag anymore I don't think
@@ -43,6 +43,10 @@ SELECT * FROM arrow_simple | |||
3 baz false | |||
4 NULL true | |||
|
|||
# url table is only supported by DynamicFileCatalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding context here about why this is an important test would make it easier to understand for future readers
# url table is only supported by DynamicFileCatalog | |
# Ensure that local files can not be read by default (a potential security issue) | |
# (url table is only supported when DynamicFileCatalog is enabled) |
~id9~ ~value9~ | ||
|
||
query TTT | ||
DESCRIBE '../core/tests/data/aggregate_simple.csv'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
//! dynamic_file contains [`DynamicFileCatalog`] that creates tables from file paths | ||
//! if the wrapped [`CatalogProviderList`] doesn't have the table provider. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest making module level documentation (which is rendered typically as a single line) point to the main struct/trati
//! dynamic_file contains [`DynamicFileCatalog`] that creates tables from file paths | |
//! if the wrapped [`CatalogProviderList`] doesn't have the table provider. | |
//! [`DynamicFileCatalog`] that creates tables from file paths |
/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path. | ||
/// | ||
/// The provider will try to create a table provider from the file path if the table provider | ||
/// isn't exist in the inner schema provider. The required object store must be registered in the session context. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the reference to object_store here -- it is up to the UrlTableFactory to handle resolving urls to TableProviders, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for example, how DynamicListTableFactory
resolves the URL depends on what the session context registers as the object store.
Indeed, it's a legacy comment (before moving to datafusion-catalog
). We don't need to mention the object store here. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove the part of the object store here. Thanks for mentioning it.
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# | |
# Note: This file runs with a SessionContext that has the `enable_url_table` flag set | |
# |
let session_state = SessionStateBuilder::new() | ||
.with_default_features() | ||
.with_config(cfg) | ||
.build(); | ||
let ctx = SessionContext::new_with_state(session_state).enable_url_table(); | ||
let result = plan_and_collect( | ||
&ctx, | ||
format!("select c_name from '{}' limit 3;", &url).as_str(), | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be possible here to add enable_url_table
as a argument to SessionStateBuilder
So it could look like
let session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(cfg)
.enable_url_table()
.build();
let ctx = SessionContext::from(session_state);
🤔
(we can do this as a follow on as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I think it's a good idea. We can have a flag in the builder. If it's true, we can wrap the catalog list implicitly. It may need some additional implementation and test cases. I prefer to do it in the follow-up PR.
👍
Thanks @alamb I try to address the suggestions here. Only about the comment related to |
Which issue does this PR close?
Closes #10986 .
Rationale for this change
Follow the idea of #4838 to implement
DynamicFileCatalog
indatafusion-catalog
. Users can enable this feature for an existingSessionContext
bySessoinContext::enable_url_table
.What changes are included in this PR?
SessionStore
to accessSessionState
for he runtime table building.datafusion-cli
,DynamicFileCatalog
is renamed toDynamicObjectStoreCatalog
because it's responsible for registering the required object store now.Are these changes tested?
yes
Are there any user-facing changes?
SessoinContext
:enable_url_table
home_dir
for the URL table.