Skip to content

Commit

Permalink
Require Debug for TableProvider, TableProviderFactory and `Part…
Browse files Browse the repository at this point in the history
…itionStream` (#12557)

* Require `Debug` for `TableProvider, ``TableProviderFactory` and `PartitionStream`

* add another debug

* fix doc test

* Update to use non deprecated API
  • Loading branch information
alamb authored Sep 25, 2024
1 parent 4a3c09a commit a98ffdd
Show file tree
Hide file tree
Showing 22 changed files with 48 additions and 9 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ pub fn display_all_functions() -> Result<()> {
}

/// PARQUET_META table function
#[derive(Debug)]
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn main() -> Result<()> {
/// Usage: `read_csv(filename, [limit])`
///
/// [`read_csv`]: https://duckdb.org/docs/data/csv/overview.html
#[derive(Debug)]
struct LocalCsvTable {
schema: SchemaRef,
limit: Option<usize>,
Expand Down
8 changes: 5 additions & 3 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;
use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

use crate::session::Session;
Expand All @@ -31,7 +32,7 @@ use datafusion_physical_plan::ExecutionPlan;

/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
pub trait TableProvider: Debug + Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down Expand Up @@ -193,6 +194,7 @@ pub trait TableProvider: Sync + Send {
/// # use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
/// # use datafusion_physical_plan::ExecutionPlan;
/// // Define a struct that implements the TableProvider trait
/// #[derive(Debug)]
/// struct TestDataSource {}
///
/// #[async_trait]
Expand All @@ -212,7 +214,7 @@ pub trait TableProvider: Sync + Send {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// between_expr.expr
/// .try_into_col()
/// .try_as_col()
/// .map(|column| {
/// if column.name == "c1" {
/// TableProviderFilterPushDown::Exact
Expand Down Expand Up @@ -283,7 +285,7 @@ pub trait TableProvider: Sync + Send {
/// For example, this can be used to create a table "on the fly"
/// from a directory of files only when that name is referenced.
#[async_trait]
pub trait TableProviderFactory: Sync + Send {
pub trait TableProviderFactory: Debug + Sync + Send {
/// Create a TableProvider with the given url
async fn create(
&self,
Expand Down
22 changes: 18 additions & 4 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
//!
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::{any::Any, sync::Arc};

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::fmt::{Debug, Formatter};
use std::{any::Any, sync::Arc};

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -75,6 +75,15 @@ struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogProviderList>,
}

impl Debug for InformationSchemaConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InformationSchemaConfig")
// TODO it would be great to print the catalog list here
// but that would require CatalogProviderList to implement Debug
.finish_non_exhaustive()
}
}

impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(
Expand Down Expand Up @@ -246,6 +255,7 @@ impl SchemaProvider for InformationSchemaProvider {
}
}

#[derive(Debug)]
struct InformationSchemaTables {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -337,6 +347,7 @@ impl InformationSchemaTablesBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaViews {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -424,6 +435,7 @@ impl InformationSchemaViewBuilder {
}
}

#[derive(Debug)]
struct InformationSchemaColumns {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -640,6 +652,7 @@ impl InformationSchemaColumnsBuilder {
}
}

#[derive(Debug)]
struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down Expand Up @@ -741,6 +754,7 @@ impl PartitionStream for InformationSchemata {
}
}

#[derive(Debug)]
struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,7 @@ impl DataFrame {
}
}

#[derive(Debug)]
struct DataFrameTableProvider {
plan: LogicalPlan,
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/cte_worktable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::datasource::{TableProvider, TableType};
/// The temporary working table where the previous iteration of a recursive query is stored
/// Naming is based on PostgreSQL's implementation.
/// See here for more details: www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
#[derive(Debug)]
pub struct CteWorkTable {
/// The name of the CTE work table
// WIP, see https://github.com/apache/datafusion/issues/462
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};

/// An empty plan that is useful for testing and generating plans
/// without mapping them to actual data.
#[derive(Debug)]
pub struct EmptyTable {
schema: SchemaRef,
partitions: usize,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ impl ListingOptions {
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
/// File fields only
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl StreamConfig {
///
/// [Hadoop]: https://hadoop.apache.org/
/// [`ListingTable`]: crate::datasource::listing::ListingTable
#[derive(Debug)]
pub struct StreamTable(Arc<StreamConfig>);

impl StreamTable {
Expand Down Expand Up @@ -370,6 +371,7 @@ impl TableProvider for StreamTable {
}
}

#[derive(Debug)]
struct StreamRead(Arc<StreamConfig>);

impl PartitionStream for StreamRead {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion_expr::{Expr, TableType};
use log::debug;

/// A [`TableProvider`] that streams a set of [`PartitionStream`]
#[derive(Debug)]
pub struct StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_optimizer::Analyzer;
use crate::datasource::{TableProvider, TableType};

/// An implementation of `TableProvider` that uses another logical plan.
#[derive(Debug)]
pub struct ViewTable {
/// LogicalPlan of the view
logical_plan: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,7 @@ mod tests {

#[test]
fn test_streaming_table_after_projection() -> Result<()> {
#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ pub fn csv_exec_sorted(
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub(crate) struct TestStreamPartition {
pub schema: SchemaRef,
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn populate_csv_partitions(
}

/// TableFactory for tests
#[derive(Default, Debug)]
pub struct TestTableFactory {}

#[async_trait]
Expand All @@ -191,6 +192,7 @@ impl TableProviderFactory for TestTableFactory {
}

/// TableProvider for testing purposes
#[derive(Debug)]
pub struct TestTableProvider {
/// URL of table files or folder
pub url: String,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ macro_rules! TEST_CUSTOM_RECORD_BATCH {

//--- Custom source dataframe tests ---//

#[derive(Debug)]
struct CustomTableProvider;

#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ExecutionPlan for CustomPlan {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
struct CustomProvider {
zero_batch: RecordBatch,
one_batch: RecordBatch,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ fn batches_byte_size(batches: &[RecordBatch]) -> usize {
batches.iter().map(|b| b.get_array_memory_size()).sum()
}

#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
batches: Vec<RecordBatch>,
Expand All @@ -798,6 +799,7 @@ impl PartitionStream for DummyStreamPartition {
}

/// Wrapper over a TableProvider that can provide ordering information
#[derive(Debug)]
struct SortedTableProvider {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

#[derive(Debug)]
struct DummyStreamPartition {
schema: SchemaRef,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ async fn test_deregister_udtf() -> Result<()> {
Ok(())
}

#[derive(Debug)]
struct SimpleCsvTable {
schema: SchemaRef,
exprs: Vec<Expr>,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties};
Expand All @@ -42,7 +43,7 @@ use log::debug;
/// Combined with [`StreamingTableExec`], you can use this trait to implement
/// [`ExecutionPlan`] for a custom source with less boiler plate than
/// implementing `ExecutionPlan` directly for many use cases.
pub trait PartitionStream: Send + Sync {
pub trait PartitionStream: Debug + Send + Sync {
/// Returns the schema of this partition
fn schema(&self) -> &SchemaRef;

Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn mem_exec(partitions: usize) -> MemoryExec {
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub struct TestPartitionStream {
pub schema: SchemaRef,
pub batches: Vec<RecordBatch>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ pub async fn register_partition_table(test_ctx: &mut TestContext) {

// registers a LOCAL TEMPORARY table.
pub async fn register_temp_table(ctx: &SessionContext) {
#[derive(Debug)]
struct TestTable(TableType);

#[async_trait]
Expand Down

0 comments on commit a98ffdd

Please sign in to comment.