Skip to content

Commit

Permalink
chore: upgrade to DataFusion 37.0.0 & Arrow 51.0.0 (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo authored Apr 6, 2024
1 parent 3545919 commit 8456eb9
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 1,375 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ members = [
"examples",
"sources/sql",
"sources/flight-sql",
"sql-writer",
]

[patch.crates-io]
Expand All @@ -22,5 +21,5 @@ readme = "README.md"

[workspace.dependencies]
async-trait = "0.1.77"
datafusion = "34.0.0"
datafusion-substrait = "34.0.0"
datafusion = "37.0.0"
datafusion-substrait = "37.0.0"
2 changes: 1 addition & 1 deletion datafusion-federation/src/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl FederationAnalyzerRule {
})
.collect::<Result<Vec<_>>>()?;

let new_plan = plan.with_new_inputs(&new_inputs)?;
let new_plan = plan.with_new_exprs(plan.expressions(), new_inputs)?;

Ok((Some(new_plan), None))
}
Expand Down
8 changes: 4 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ license.workspace = true
readme.workspace = true

[dev-dependencies]
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
tokio = "1.35.1"
async-trait.workspace = true
datafusion.workspace = true
datafusion-federation.path = "../datafusion-federation"
datafusion-federation-sql.path = "../sources/sql"
datafusion-federation-flight-sql.path = "../sources/flight-sql"
connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [
connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [
"dst_arrow",
"src_sqlite",
"src_sqlite"
] }
tonic = "0.10.2"
tonic = "0.11.0"

[dependencies]
async-std = "1.12.0"
Expand Down
8 changes: 4 additions & 4 deletions examples/examples/sqlite-partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ impl SchemaProvider for MultiSchemaProvider {
self.children.iter().flat_map(|p| p.table_names()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
for child in &self.children {
if let Some(table) = child.table(name).await {
return Some(table);
if let Ok(Some(table)) = child.table(name).await {
return Ok(Some(table));
}
}
None
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
Expand Down
6 changes: 3 additions & 3 deletions sources/flight-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ datafusion-substrait.workspace = true
datafusion-federation.path = "../../datafusion-federation"
datafusion-federation-sql.path = "../sql"
futures = "0.3.30"
tonic = {version="0.10.2", features=["tls"] }
tonic = {version="0.11.0", features=["tls"] }
prost = "0.12.3"
arrow = "49.0.0"
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
arrow = "51.0.0"
arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
log = "0.4.20"
4 changes: 2 additions & 2 deletions sources/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ path = "src/lib.rs"
async-trait.workspace = true
# connectorx = { version = "0.3.2", features = ["src_sqlite"] }
# https://github.com/sfu-db/connector-x/pull/555
connectorx = { git = "https://github.com/sfu-db/connector-x.git", rev = "fa0fc7bc", features = [
connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [
"dst_arrow",
"src_sqlite"
] }
datafusion.workspace = true
datafusion-federation.path = "../../datafusion-federation"
datafusion-sql-writer.path = "../../sql-writer"
# derive_builder = "0.13.0"
futures = "0.3.30"
tokio = "1.35.1"
4 changes: 1 addition & 3 deletions sources/sql/src/connectorx/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl SQLExecutor for CXExecutor {
fn compute_context(&self) -> Option<String> {
Some(self.context.clone())
}
fn execute(&self, sql: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
let conn = self.conn.clone();
let query: CXQuery = sql.into();

Expand All @@ -69,8 +69,6 @@ impl SQLExecutor for CXExecutor {
))));
};

let schema = schema_to_lowercase(dst.arrow_schema());

Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

Expand Down
36 changes: 23 additions & 13 deletions sources/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ use datafusion::{
execution::{context::SessionState, TaskContext},
logical_expr::{Extension, LogicalPlan},
optimizer::analyzer::{Analyzer, AnalyzerRule},
physical_expr::PhysicalSortExpr,
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream,
},
sql::unparser::plan_to_sql,
};
use datafusion_federation::{FederatedPlanNode, FederationPlanner, FederationProvider};

mod schema;
use datafusion_sql_writer::from_df_plan;
pub use schema::*;

pub mod connectorx;
Expand Down Expand Up @@ -112,11 +115,22 @@ impl FederationPlanner for SQLFederationPlanner {
struct VirtualExecutionPlan {
plan: LogicalPlan,
executor: Arc<dyn SQLExecutor>,
props: PlanProperties,
}

impl VirtualExecutionPlan {
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>) -> Self {
Self { plan, executor }
let schema: Schema = plan.schema().as_ref().into();
let props = PlanProperties::new(
EquivalenceProperties::new(Arc::new(schema)),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
Self {
plan,
executor,
props,
}
}

fn schema(&self) -> SchemaRef {
Expand All @@ -140,14 +154,6 @@ impl ExecutionPlan for VirtualExecutionPlan {
self.schema()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand All @@ -164,9 +170,13 @@ impl ExecutionPlan for VirtualExecutionPlan {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let ast = from_df_plan(&self.plan, self.executor.dialect())?;
let ast = plan_to_sql(&self.plan)?;
let query = format!("{ast}");

self.executor.execute(query.as_str(), self.schema())
}

fn properties(&self) -> &PlanProperties {
&self.props
}
}
14 changes: 7 additions & 7 deletions sources/sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ impl SchemaProvider for SQLSchemaProvider {
self.tables.iter().map(|s| s.table_name.clone()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
if let Some(source) = self
.tables
.iter()
.find(|s| s.table_name.eq_ignore_ascii_case(name))
{
let adaptor = FederatedTableProviderAdaptor::new(source.clone());
return Some(Arc::new(adaptor));
return Ok(Some(Arc::new(adaptor)));
}
None
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
Expand Down Expand Up @@ -93,13 +93,13 @@ impl SchemaProvider for MultiSchemaProvider {
self.children.iter().flat_map(|p| p.table_names()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
for child in &self.children {
if let Some(table) = child.table(name).await {
return Some(table);
if let Ok(Some(table)) = child.table(name).await {
return Ok(Some(table));
}
}
None
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
Expand Down
17 changes: 0 additions & 17 deletions sql-writer/Cargo.toml

This file was deleted.

35 changes: 0 additions & 35 deletions sql-writer/examples/expr.rs

This file was deleted.

41 changes: 0 additions & 41 deletions sql-writer/examples/plan.rs

This file was deleted.

Loading

0 comments on commit 8456eb9

Please sign in to comment.