diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 99bfbbad9d10..9d58465191e1 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -43,7 +43,7 @@ use arrow_schema::{ArrowError, Schema, SchemaRef}; use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; @@ -129,7 +129,7 @@ impl FileFormat for ArrowFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ArrowFileSink::new(conf)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index a7849258329b..84235cde0f5d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::{ }; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -267,7 +267,7 @@ impl FileFormat for CsvFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 0cc38bbb5554..efc0aa4328d8 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::FileGroupDisplay; use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics, }; @@ -177,7 +177,7 @@ impl FileFormat for JsonFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(JsonSink::new(conf, writer_options)); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..1d41f015121d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -37,7 +37,7 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::{ Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, Statistics, @@ -279,7 +279,7 @@ impl FileFormat for ParquetFormat { let sink_schema = conf.output_schema().clone(); let sink = Arc::new(ParquetSink::new(conf, self.options.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, sink_schema, diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 608a46144da3..42e05ebeb33f 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -26,7 +26,7 @@ use crate::datasource::{TableProvider, TableType}; use crate::error::Result; use crate::execution::context::SessionState; use crate::logical_expr::Expr; -use crate::physical_plan::insert::{DataSink, FileSinkExec}; +use crate::physical_plan::insert::{DataSink, DataSinkExec}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{ @@ -279,7 +279,7 @@ impl TableProvider for MemTable { return not_impl_err!("Overwrite not implemented for MemoryTable yet"); } let sink = Arc::new(MemSink::new(self.batches.clone())); - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, sink, self.schema.clone(), diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 100011952b3b..7c58aded3108 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -269,10 +269,10 @@ pub trait TableProvider: Sync + Send { /// /// # See Also /// - /// See [`FileSinkExec`] for the common pattern of inserting a + /// See [`DataSinkExec`] for the common pattern of inserting a /// streams of `RecordBatch`es as files to an ObjectStore. /// - /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec + /// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec async fn insert_into( &self, _state: &SessionState, diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 079c1a891d14..2059a5ffcfe4 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -34,7 +34,7 @@ use datafusion_common::{plan_err, Constraints, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; @@ -277,7 +277,7 @@ impl TableProvider for StreamTable { None => None, }; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(StreamWrite(self.0.clone())), self.0.schema.clone(), diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index f0233264f280..e3f9f2c76d31 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -74,10 +74,13 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { ) -> Result; } +#[deprecated(since = "38.0.0", note = "Use [`DataSinkExec`] instead")] +pub type FileSinkExec = DataSinkExec; + /// Execution plan for writing record batches to a [`DataSink`] /// /// Returns a single row with the number of values written -pub struct FileSinkExec { +pub struct DataSinkExec { /// Input plan that produces the record batches to be written. input: Arc, /// Sink to which to write @@ -91,13 +94,13 @@ pub struct FileSinkExec { cache: PlanProperties, } -impl fmt::Debug for FileSinkExec { +impl fmt::Debug for DataSinkExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "FileSinkExec schema: {:?}", self.count_schema) + write!(f, "DataSinkExec schema: {:?}", self.count_schema) } } -impl FileSinkExec { +impl DataSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc, @@ -190,7 +193,7 @@ impl FileSinkExec { } } -impl DisplayAs for FileSinkExec { +impl DisplayAs for DataSinkExec { fn fmt_as( &self, t: DisplayFormatType, @@ -198,16 +201,16 @@ impl DisplayAs for FileSinkExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "FileSinkExec: sink=")?; + write!(f, "DataSinkExec: sink=")?; self.sink.fmt_as(t, f) } } } } -impl ExecutionPlan for FileSinkExec { +impl ExecutionPlan for DataSinkExec { fn name(&self) -> &'static str { - "FileSinkExec" + "DataSinkExec" } /// Return a reference to Any that can be used for downcasting @@ -269,7 +272,7 @@ impl ExecutionPlan for FileSinkExec { context: Arc, ) -> Result { if partition != 0 { - return internal_err!("FileSinkExec can only be called on partition 0!"); + return internal_err!("DataSinkExec can only be called on partition 0!"); } let data = self.execute_input_stream(0, context.clone())?; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4d5d6cadad17..4d95c847bf99 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -58,7 +58,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion::physical_plan::joins::{ CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, @@ -1033,7 +1033,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1063,7 +1063,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1093,7 +1093,7 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|item| PhysicalSortRequirement::from_sort_exprs(&item)) }) .transpose()?; - Ok(Arc::new(FileSinkExec::new( + Ok(Arc::new(DataSinkExec::new( input, Arc::new(data_sink), Arc::new(sink_schema), @@ -1879,7 +1879,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }); } - if let Some(exec) = plan.downcast_ref::() { + if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a74b1a38935b..f97cfea765bf 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -48,7 +48,7 @@ use datafusion::physical_plan::expressions::{ NotExpr, NthValue, PhysicalSortExpr, StringAgg, Sum, }; use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::insert::FileSinkExec; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode, }; @@ -861,7 +861,7 @@ fn roundtrip_json_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -896,7 +896,7 @@ fn roundtrip_csv_sink() -> Result<()> { }), )]; - let roundtrip_plan = roundtrip_test_and_return(Arc::new(FileSinkExec::new( + let roundtrip_plan = roundtrip_test_and_return(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), @@ -906,7 +906,7 @@ fn roundtrip_csv_sink() -> Result<()> { let roundtrip_plan = roundtrip_plan .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let csv_sink = roundtrip_plan .sink() @@ -948,7 +948,7 @@ fn roundtrip_parquet_sink() -> Result<()> { }), )]; - roundtrip_test(Arc::new(FileSinkExec::new( + roundtrip_test(Arc::new(DataSinkExec::new( input, data_sink, schema.clone(), diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 75f1ccb07aac..0991868cdf5d 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -149,7 +149,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (format.compression zstd(10)) --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case @@ -163,7 +163,7 @@ logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: () --TableScan: source_table projection=[col1, col2] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Copy more files to directory via query diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index b7ad36dace16..a38d254e051f 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] physical_plan -FileSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } +DataSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index b3fbb33e68e7..b16a169598e7 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -64,7 +64,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -175,7 +175,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=MemoryTable (partitions=8) +DataSinkExec: sink=MemoryTable (partitions=8) --ProjectionExec: expr=[a1@0 as a1, a2@1 as a2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1] @@ -217,7 +217,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=MemoryTable (partitions=1) +DataSinkExec: sink=MemoryTable (partitions=1) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 5f100953aff4..0033b070ec1a 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test] --Projection: column1 AS a, column2 AS b ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), (Int64(7), Int64(8)), (Int64(7), Int64(9))... physical_plan -FileSinkExec: sink=CsvSink(file_groups=[]) +DataSinkExec: sink=CsvSink(file_groups=[]) --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC] ----ProjectionExec: expr=[column1@0 as a, column2@1 as b] ------ValuesExec @@ -353,7 +353,7 @@ Dml: op=[Insert Into] table=[table_without_values] --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ----------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2] ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST] ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1] @@ -415,7 +415,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --CoalescePartitionsExec ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] @@ -458,7 +458,7 @@ Dml: op=[Insert Into] table=[table_without_values] ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] physical_plan -FileSinkExec: sink=ParquetSink(file_groups=[]) +DataSinkExec: sink=ParquetSink(file_groups=[]) --SortExec: expr=[c1@0 ASC NULLS LAST] ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true