Skip to content

Commit

Permalink
Avro Table Provider (#910)
Browse files Browse the repository at this point in the history
* Add avro as a datasource, file and table provider

* wip

* Added support composite identifiers for struct type.

* Fixed build.

* cheat and add unions to valid composite column types

* Implement the AvroArrayReader

* Add binary types

* Enable Avro as a FileType

* Enable registering an avro table in the sql parsing

* Change package name for datafusion/avro

* Implement Avro datasource tests and fix avro_rs::Value resolution to Arrow types

* Test for AvroExec::try_from_path

* external table avro test

* Basic schema conversion tests

* Complete test for avro_to_arrow_reader on alltypes_dictionnary

* fix_stable: .rewind is 'unstable'

* Fix license files and remove the unused avro-converter crate

* fix example test in avro_to_arrow

* add avro_sql test to default workflow

* Adress clippies

* Enable avro as a valid datasource for client execution

* Add avro to available logical plan nodes

* Add ToTimestampMillis as a scalar function in protos

* Allow Avro in PhysicalPlan nodes

* Remove remaining confusing references to 'json' in avro mod

* rename 'parquet' words in avro test and examples

* Handle Union of nested lists in arrow reader

* test timestamp arrays

* remove debug statement

* Make avro optional

* Remove debug statement

* Remove GetField usage (see #628)

* Fix docstring in parser tests

* Test batch output rather than just rows individually

* Remove 'csv' from error strings in physical_plan::avro

* Avro sample sql and explain queries tests in sql.rs

* Activate avro feature for cargo tests in github workflow

* Add a test for avro registering multiple files in a single table

* Switch to Result instead of Option for resolve_string

* Address missing clippy warning should_implement_trait in arrow_to_avro/reader

* Add fmt display implementation for AvroExec

* ci: fix cargo sql run example, use datafusion/avro feature instead of 'avro'

* license: missing license file for avro_to_arrow/schema.rs

* only run avro datasource tests if features have 'avro'

* refactor: rename infer_avro_schema_from_reader to read_avro_schema_from_reader

* Pass None as props to avro schema schema_to_field_with_props until further notice

* Change schema inferance to FixedSizeBinary(16) for Uuid

* schema: prefix metadata coming from avro with 'avro'

* make num traits optional and part of the avro feature flag

* Fix avro schema tests regarding external props

* split avro physical plan test feature wise and add a non-implemented test

* submodule: switch back to apache/arrow-testing

* fix_test: columns are now prefixed in the plan

* avro_test: fix clippy warning cmp-owned

* avro: move statistics to the physical plan

* Increase min stack size for cargo tests

Co-authored-by: Jorge C. Leitao <[email protected]>
  • Loading branch information
Igosuki and jorgecarleitao authored Sep 15, 2021
1 parent 195b699 commit 6402200
Show file tree
Hide file tree
Showing 33 changed files with 3,258 additions and 38 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
# run tests on all workspace members with default feature list
cargo test
# run tests on all workspace members with default feature list + avro
RUST_MIN_STACK=10485760 cargo test --features=avro
# test datafusion examples
cd datafusion-examples
cargo test --no-default-features
cargo run --example csv_sql
cargo run --example parquet_sql
cargo run --example avro_sql --features=datafusion/avro
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand Down
40 changes: 40 additions & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::sql::parser::FileType;

Expand Down Expand Up @@ -125,6 +126,30 @@ impl BallistaContext {
})
}

/// Create a DataFrame representing an Avro table scan
pub fn read_avro(
&self,
path: &str,
options: AvroReadOptions,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
Ok(df)
}

/// Create a DataFrame representing a Parquet table scan
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
Expand Down Expand Up @@ -193,6 +218,17 @@ impl BallistaContext {
self.register_table(name, df.as_ref())
}

pub fn register_avro(
&self,
name: &str,
path: &str,
options: AvroReadOptions,
) -> Result<()> {
let df = self.read_avro(path, options)?;
self.register_table(name, df.as_ref())?;
Ok(())
}

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
Expand Down Expand Up @@ -240,6 +276,10 @@ impl BallistaContext {
self.register_parquet(name, location)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ enum ScalarFunction {
SHA384 = 32;
SHA512 = 33;
LN = 34;
TOTIMESTAMPMILLIS = 35;
}

message ScalarFunctionNode {
Expand Down Expand Up @@ -253,6 +254,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
AvroTableScanNode avro_scan = 16;
}
}

Expand Down Expand Up @@ -296,6 +298,15 @@ message ParquetTableScanNode {
repeated LogicalExprNode filters = 4;
}

message AvroTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
Expand Down Expand Up @@ -340,6 +351,7 @@ enum FileType{
NdJson = 0;
Parquet = 1;
CSV = 2;
Avro = 3;
}

message AnalyzeNode {
Expand Down Expand Up @@ -456,6 +468,7 @@ message PhysicalPlanNode {
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
CrossJoinExecNode cross_join = 19;
AvroScanExecNode avro_scan = 20;
}
}

Expand Down Expand Up @@ -609,6 +622,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}

message AvroScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
uint32 batch_size = 5;

// partition filenames
repeated string filename = 8;
}

enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
Expand Down
29 changes: 29 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::logical_plan::{
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -171,6 +172,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
LogicalPlanType::AvroScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let options = AvroReadOptions {
schema: Some(Arc::new(schema.clone())),
file_extension: &scan.file_extension,
};

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_avro_with_name(
&scan.path,
options,
projection,
&scan.table_name,
)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan = convert_box_required!(sort.input)?;
let sort_expr: Vec<Expr> = sort
Expand Down Expand Up @@ -1193,6 +1220,7 @@ impl TryFrom<i32> for protobuf::FileType {
_x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
_x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
_x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
_x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
invalid => Err(BallistaError::General(format!(
"Attempted to convert invalid i32 to protobuf::Filetype: {}",
invalid
Expand All @@ -1209,6 +1237,7 @@ impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
protobuf::FileType::NdJson => FileType::NdJson,
protobuf::FileType::Parquet => FileType::Parquet,
protobuf::FileType::Csv => FileType::CSV,
protobuf::FileType::Avro => FileType::Avro,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,12 @@ mod roundtrip_tests {

let df_schema_ref = schema.to_dfschema_ref()?;

let filetypes: [FileType; 3] =
[FileType::NdJson, FileType::Parquet, FileType::CSV];
let filetypes: [FileType; 4] = [
FileType::NdJson,
FileType::Parquet,
FileType::CSV,
FileType::Avro,
];

for file in filetypes.iter() {
let create_table_node = LogicalPlan::CreateExternalTable {
Expand Down
30 changes: 27 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
use datafusion::datasource::avro::AvroFile;
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Expand Down Expand Up @@ -793,6 +794,19 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
)),
})
} else if let Some(avro) = source.downcast_ref::<AvroFile>() {
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::AvroScan(
protobuf::AvroTableScanNode {
table_name: table_name.to_owned(),
path: avro.path().to_owned(),
projection,
schema: Some(schema),
file_extension: avro.file_extension().to_string(),
filters,
},
)),
})
} else {
Err(BallistaError::General(format!(
"logical plan to_proto unsupported table provider {:?}",
Expand Down Expand Up @@ -974,6 +988,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
FileType::NdJson => protobuf::FileType::NdJson,
FileType::Parquet => protobuf::FileType::Parquet,
FileType::CSV => protobuf::FileType::Csv,
FileType::Avro => protobuf::FileType::Avro,
};

Ok(protobuf::LogicalPlanNode {
Expand Down Expand Up @@ -1098,7 +1113,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
)
}
};
let arg = &args[0];
let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if !args.is_empty()
{
let arg = &args[0];
Some(Box::new(arg.try_into()?))
} else {
None
};
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
Expand All @@ -1111,7 +1132,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
protobuf::window_expr_node::WindowFrame::Frame(window_frame.into())
});
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
expr: arg_expr,
window_function: Some(window_function),
partition_by,
order_by,
Expand Down Expand Up @@ -1284,7 +1305,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::Wildcard => Ok(protobuf::LogicalExprNode {
expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
}),
Expr::TryCast { .. } => unimplemented!(),
_ => unimplemented!(),
}
}
}
Expand Down Expand Up @@ -1473,6 +1494,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
BuiltinScalarFunction::ToTimestampMillis => {
Ok(protobuf::ScalarFunction::Totimestampmillis)
}
_ => Err(BallistaError::General(format!(
"logical_plan::to_proto() unsupported scalar function {:?}",
self
Expand Down
17 changes: 17 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand Down Expand Up @@ -153,6 +154,21 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
None,
)))
}
PhysicalPlanType::AvroScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let options = AvroReadOptions {
schema: Some(schema),
file_extension: &scan.file_extension,
};
let projection = scan.projection.iter().map(|i| *i as usize).collect();
Ok(Arc::new(AvroExec::try_from_path(
&scan.path,
options,
Some(projection),
scan.batch_size as usize,
None,
)?))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(coalesce_batches.input)?;
Expand Down Expand Up @@ -544,6 +560,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Sha384 => BuiltinScalarFunction::SHA384,
ScalarFunction::Sha512 => BuiltinScalarFunction::SHA512,
ScalarFunction::Ln => BuiltinScalarFunction::Ln,
ScalarFunction::Totimestampmillis => BuiltinScalarFunction::ToTimestampMillis,
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::execution_plans::{
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
use datafusion::physical_plan::avro::AvroExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
use datafusion::physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -285,6 +286,28 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
path: exec.path().to_owned(),
filename: exec.filenames().to_vec(),
projection: exec
.projection()
.ok_or_else(|| {
BallistaError::General(
"projection in AvroExec doesn't exist.".to_owned(),
)
})?
.iter()
.map(|n| *n as u32)
.collect(),
file_extension: exec.file_extension().to_owned(),
schema: Some(exec.file_schema().as_ref().into()),
batch_size: exec.batch_size() as u32,
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
let mut partition = vec![];
for location in &exec.partition {
Expand Down
4 changes: 4 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ keywords = [ "arrow", "query", "sql" ]
edition = "2018"
publish = false

[[example]]
name = "avro_sql"
path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "^5.3" }
Expand Down
Loading

0 comments on commit 6402200

Please sign in to comment.