Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename FileSinkExec to DataSinkExec #10065

Merged
merged 4 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
Expand Down Expand Up @@ -129,7 +129,7 @@ impl FileFormat for ArrowFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ArrowFileSink::new(conf));

Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::{
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

Expand Down Expand Up @@ -267,7 +267,7 @@ impl FileFormat for CsvFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(conf, writer_options));

Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::datasource::physical_plan::FileGroupDisplay;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -177,7 +177,7 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, writer_options));

Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
Statistics,
Expand Down Expand Up @@ -279,7 +279,7 @@ impl FileFormat for ParquetFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));

Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{
Expand Down Expand Up @@ -279,7 +279,7 @@ impl TableProvider for MemTable {
return not_impl_err!("Overwrite not implemented for MemoryTable yet");
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
sink,
self.schema.clone(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,10 @@ pub trait TableProvider: Sync + Send {
///
/// # See Also
///
/// See [`FileSinkExec`] for the common pattern of inserting a
/// See [`DataSinkExec`] for the common pattern of inserting a
/// streams of `RecordBatch`es as files to an ObjectStore.
///
/// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec
/// [`DataSinkExec`]: crate::physical_plan::insert::DataSinkExec
async fn insert_into(
&self,
_state: &SessionState,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
Expand Down Expand Up @@ -277,7 +277,7 @@ impl TableProvider for StreamTable {
None => None,
};

Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(StreamWrite(self.0.clone())),
self.0.schema.clone(),
Expand Down
21 changes: 12 additions & 9 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync {
) -> Result<u64>;
}

#[deprecated(since = "38.0.0", note = "Use [`DataSinkExec`] instead")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

pub type FileSinkExec = DataSinkExec;

/// Execution plan for writing record batches to a [`DataSink`]
///
/// Returns a single row with the number of values written
pub struct FileSinkExec {
pub struct DataSinkExec {
/// Input plan that produces the record batches to be written.
input: Arc<dyn ExecutionPlan>,
/// Sink to which to write
Expand All @@ -91,13 +94,13 @@ pub struct FileSinkExec {
cache: PlanProperties,
}

impl fmt::Debug for FileSinkExec {
impl fmt::Debug for DataSinkExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FileSinkExec schema: {:?}", self.count_schema)
write!(f, "DataSinkExec schema: {:?}", self.count_schema)
}
}

impl FileSinkExec {
impl DataSinkExec {
/// Create a plan to write to `sink`
pub fn new(
input: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -190,24 +193,24 @@ impl FileSinkExec {
}
}

impl DisplayAs for FileSinkExec {
impl DisplayAs for DataSinkExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "FileSinkExec: sink=")?;
write!(f, "DataSinkExec: sink=")?;
self.sink.fmt_as(t, f)
}
}
}
}

impl ExecutionPlan for FileSinkExec {
impl ExecutionPlan for DataSinkExec {
fn name(&self) -> &'static str {
"FileSinkExec"
"DataSinkExec"
}

/// Return a reference to Any that can be used for downcasting
Expand Down Expand Up @@ -269,7 +272,7 @@ impl ExecutionPlan for FileSinkExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return internal_err!("FileSinkExec can only be called on partition 0!");
return internal_err!("DataSinkExec can only be called on partition 0!");
}
let data = self.execute_input_stream(0, context.clone())?;

Expand Down
10 changes: 5 additions & 5 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::insert::FileSinkExec;
use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion::physical_plan::joins::{
CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec,
Expand Down Expand Up @@ -1033,7 +1033,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|item| PhysicalSortRequirement::from_sort_exprs(&item))
})
.transpose()?;
Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
Expand Down Expand Up @@ -1063,7 +1063,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|item| PhysicalSortRequirement::from_sort_exprs(&item))
})
.transpose()?;
Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
Expand Down Expand Up @@ -1093,7 +1093,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|item| PhysicalSortRequirement::from_sort_exprs(&item))
})
.transpose()?;
Ok(Arc::new(FileSinkExec::new(
Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(data_sink),
Arc::new(sink_schema),
Expand Down Expand Up @@ -1879,7 +1879,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
});
}

if let Some(exec) = plan.downcast_ref::<FileSinkExec>() {
if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::physical_plan::expressions::{
NotExpr, NthValue, PhysicalSortExpr, StringAgg, Sum,
};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::insert::FileSinkExec;
use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
};
Expand Down Expand Up @@ -861,7 +861,7 @@ fn roundtrip_json_sink() -> Result<()> {
}),
)];

roundtrip_test(Arc::new(FileSinkExec::new(
roundtrip_test(Arc::new(DataSinkExec::new(
input,
data_sink,
schema.clone(),
Expand Down Expand Up @@ -896,7 +896,7 @@ fn roundtrip_csv_sink() -> Result<()> {
}),
)];

let roundtrip_plan = roundtrip_test_and_return(Arc::new(FileSinkExec::new(
let roundtrip_plan = roundtrip_test_and_return(Arc::new(DataSinkExec::new(
input,
data_sink,
schema.clone(),
Expand All @@ -906,7 +906,7 @@ fn roundtrip_csv_sink() -> Result<()> {

let roundtrip_plan = roundtrip_plan
.as_any()
.downcast_ref::<FileSinkExec>()
.downcast_ref::<DataSinkExec>()
.unwrap();
let csv_sink = roundtrip_plan
.sink()
Expand Down Expand Up @@ -948,7 +948,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
}),
)];

roundtrip_test(Arc::new(FileSinkExec::new(
roundtrip_test(Arc::new(DataSinkExec::new(
input,
data_sink,
schema.clone(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ logical_plan
CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (format.compression zstd(10))
--TableScan: source_table projection=[col1, col2]
physical_plan
FileSinkExec: sink=ParquetSink(file_groups=[])
DataSinkExec: sink=ParquetSink(file_groups=[])
--MemoryExec: partitions=1, partition_sizes=[1]

# Error case
Expand All @@ -163,7 +163,7 @@ logical_plan
CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: ()
--TableScan: source_table projection=[col1, col2]
physical_plan
FileSinkExec: sink=ParquetSink(file_groups=[])
DataSinkExec: sink=ParquetSink(file_groups=[])
--MemoryExec: partitions=1, partition_sizes=[1]

# Copy more files to directory via query
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table]
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
physical_plan
FileSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. }
DataSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. }
--SortExec: expr=[c1@0 ASC NULLS LAST]
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true

Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Dml: op=[Insert Into] table=[table_without_values]
--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
FileSinkExec: sink=MemoryTable (partitions=1)
DataSinkExec: sink=MemoryTable (partitions=1)
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, c1@0 as c1]
Expand Down Expand Up @@ -125,7 +125,7 @@ Dml: op=[Insert Into] table=[table_without_values]
----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
FileSinkExec: sink=MemoryTable (partitions=1)
DataSinkExec: sink=MemoryTable (partitions=1)
--CoalescePartitionsExec
----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted]
Expand Down Expand Up @@ -175,7 +175,7 @@ Dml: op=[Insert Into] table=[table_without_values]
--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
FileSinkExec: sink=MemoryTable (partitions=8)
DataSinkExec: sink=MemoryTable (partitions=8)
--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
Expand Down Expand Up @@ -217,7 +217,7 @@ Dml: op=[Insert Into] table=[table_without_values]
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------TableScan: aggregate_test_100 projection=[c1]
physical_plan
FileSinkExec: sink=MemoryTable (partitions=1)
DataSinkExec: sink=MemoryTable (partitions=1)
--SortExec: expr=[c1@0 ASC NULLS LAST]
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true

Expand Down
Loading