Skip to content

Commit

Permalink
[fix] adding extra partitioning col for csv avro json
Browse files Browse the repository at this point in the history
parquet exec still TODO
  • Loading branch information
rdettai committed Oct 23, 2021
1 parent 3913979 commit 4ef66aa
Show file tree
Hide file tree
Showing 32 changed files with 608 additions and 454 deletions.
5 changes: 4 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ message ListingTableScanNode {
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
repeated string partitions = 7;
repeated string table_partition_dims = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
Expand Down Expand Up @@ -613,6 +613,7 @@ message ParquetScanExecNode {
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
repeated string table_partition_dims = 9;
}

message CsvScanExecNode {
Expand All @@ -624,6 +625,7 @@ message CsvScanExecNode {
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
repeated string table_partition_dims = 9;
}

message AvroScanExecNode {
Expand All @@ -633,6 +635,7 @@ message AvroScanExecNode {
repeated uint32 projection = 6;
ScanLimit limit = 7;
Statistics statistics = 8;
repeated string table_partition_dims = 9;
}

enum PartitionMode {
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
partitions: scan.partitions.clone(),
table_partition_dims: scan.table_partition_dims.clone(),
collect_stat: scan.collect_stat,
target_partitions: scan.target_partitions as usize,
};
Expand Down
5 changes: 4 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
.options()
.file_extension
.clone(),
partitions: listing_table.options().partitions.clone(),
table_partition_dims: listing_table
.options()
.table_partition_dims
.clone(),
path: listing_table.table_path().to_owned(),
schema: Some(schema),
projection,
Expand Down
67 changes: 39 additions & 28 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::file_format::PhysicalPlanConfig;
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
use datafusion::datasource::PartitionedFile;
Expand Down Expand Up @@ -124,18 +125,22 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(CsvExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
schema,
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: scan
.file_groups
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
projection: Some(projection),
batch_size: scan.batch_size as usize,
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_dims: vec![],
},
scan.has_header,
str_to_byte(&scan.delimiter)?,
Some(projection),
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
}
PhysicalPlanType::ParquetScan(scan) => {
Expand All @@ -144,37 +149,43 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(ParquetExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|p| p.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
schema,
Some(projection),
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: scan
.file_groups
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
projection: Some(projection),
batch_size: scan.batch_size as usize,
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_dims: scan.table_partition_dims.clone(),
},
// TODO predicate should be de-serialized
None,
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
}
PhysicalPlanType::AvroScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(AvroExec::new(
Arc::new(LocalFileSystem {}),
scan.file_groups
Ok(Arc::new(AvroExec::new(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: scan
.file_groups
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
statistics,
schema,
Some(projection),
scan.batch_size as usize,
scan.limit.as_ref().map(|sl| sl.limit as usize),
)))
projection: Some(projection),
batch_size: scan.batch_size as usize,
limit: scan.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_dims: vec![],
})))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
has_header: exec.has_header(),
delimiter: byte_to_string(exec.delimiter())?,
batch_size: exec.batch_size() as u32,
table_partition_dims: exec.table_partition_dims().to_vec(),
},
)),
})
Expand All @@ -298,6 +299,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
.map(|n| *n as u32)
.collect(),
batch_size: exec.batch_size() as u32,
table_partition_dims: exec.table_partition_dims().to_vec(),
},
)),
})
Expand Down Expand Up @@ -328,6 +330,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
.collect(),
schema: Some(exec.file_schema().as_ref().into()),
batch_size: exec.batch_size() as u32,
table_partition_dims: exec.table_partition_dims().to_vec(),
},
)),
})
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ fn get_table(
file_extension: extension.to_owned(),
target_partitions,
collect_stat: true,
partitions: vec![],
table_partition_dims: vec![],
};

Ok(Arc::new(ListingTable::new(
Expand Down
39 changes: 18 additions & 21 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use super::{FileFormat, PhysicalPlanConfig};
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::AvroExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -61,16 +62,9 @@ impl FileFormat for AvroFormat {
async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(
conf.object_store,
conf.files,
conf.statistics,
conf.schema,
conf.projection,
conf.batch_size,
conf.limit,
);
let exec = AvroExec::new(conf);
Ok(Arc::new(exec))
}
}
Expand Down Expand Up @@ -346,26 +340,29 @@ mod tests {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, file_name);
let format = AvroFormat {};
let schema = format
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.await
.expect("Stats inference");
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
schema,
files,
statistics,
projection: projection.clone(),
batch_size,
filters: vec![],
limit,
})
.create_physical_plan(
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
batch_size,
limit,
table_partition_dims: vec![],
},
&[],
)
.await?;
Ok(exec)
}
Expand Down
41 changes: 18 additions & 23 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::StreamExt;
use super::{FileFormat, PhysicalPlanConfig};
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::CsvExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -123,18 +124,9 @@ impl FileFormat for CsvFormat {
async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf.object_store,
conf.files,
conf.statistics,
conf.schema,
self.has_header,
self.delimiter,
conf.projection,
conf.batch_size,
conf.limit,
);
let exec = CsvExec::new(conf, self.has_header, self.delimiter);
Ok(Arc::new(exec))
}
}
Expand Down Expand Up @@ -260,26 +252,29 @@ mod tests {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/csv/{}", testdata, file_name);
let format = CsvFormat::default();
let schema = format
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.await
.expect("Stats inference");
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
schema,
files,
statistics,
projection: projection.clone(),
batch_size,
filters: vec![],
limit,
})
.create_physical_plan(
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
batch_size,
limit,
table_partition_dims: vec![],
},
&[],
)
.await?;
Ok(exec)
}
Expand Down
39 changes: 18 additions & 21 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::FileFormat;
use super::PhysicalPlanConfig;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -93,16 +94,9 @@ impl FileFormat for JsonFormat {
async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = NdJsonExec::new(
conf.object_store,
conf.files,
conf.statistics,
conf.schema,
conf.projection,
conf.batch_size,
conf.limit,
);
let exec = NdJsonExec::new(conf);
Ok(Arc::new(exec))
}
}
Expand Down Expand Up @@ -211,26 +205,29 @@ mod tests {
) -> Result<Arc<dyn ExecutionPlan>> {
let filename = "tests/jsons/2.json";
let format = JsonFormat::default();
let schema = format
let file_schema = format
.infer_schema(local_object_reader_stream(vec![filename.to_owned()]))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(local_object_reader(filename.to_owned()))
.await
.expect("Stats inference");
let files = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
schema,
files,
statistics,
projection: projection.clone(),
batch_size,
filters: vec![],
limit,
})
.create_physical_plan(
PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
batch_size,
limit,
table_partition_dims: vec![],
},
&[],
)
.await?;
Ok(exec)
}
Expand Down
Loading

0 comments on commit 4ef66aa

Please sign in to comment.