Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Test increasing stack size as workaround for stack overflow errors in CI #1001

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7e14f91
Add avro as a datasource, file and table provider
Igosuki Aug 17, 2021
0f940ab
wip
Igosuki Aug 17, 2021
2ca7ccb
Added support composite identifiers for struct type.
jorgecarleitao Jun 26, 2021
fcbf43b
Fixed build.
jorgecarleitao Jun 27, 2021
bae2f00
cheat and add unions to valid composite column types
Igosuki Aug 17, 2021
a352dad
Implement the AvroArrayReader
Igosuki Aug 18, 2021
d42cdd1
Add binary types
Igosuki Aug 18, 2021
cbdc9c7
Enable Avro as a FileType
Igosuki Aug 18, 2021
7d6078e
Enable registering an avro table in the sql parsing
Igosuki Aug 18, 2021
44cfa87
Change package name for datafusion/avro
Igosuki Aug 19, 2021
6ef92d8
Implement Avro datasource tests and fix avro_rs::Value resolution to …
Igosuki Aug 20, 2021
01cbb18
Test for AvroExec::try_from_path
Igosuki Aug 20, 2021
537577a
external table avro test
Igosuki Aug 20, 2021
75566d1
Basic schema conversion tests
Igosuki Aug 20, 2021
7fdf233
Complete test for avro_to_arrow_reader on alltypes_dictionnary
Igosuki Aug 20, 2021
697e3a9
fix_stable: .rewind is 'unstable'
Igosuki Aug 22, 2021
e1d6df8
Fix license files and remove the unused avro-converter crate
Igosuki Aug 24, 2021
66a5901
fix example test in avro_to_arrow
Igosuki Aug 24, 2021
9ea942c
add avro_sql test to default workflow
Igosuki Aug 24, 2021
84ee28a
Adress clippies
Igosuki Aug 24, 2021
9edac57
Enable avro as a valid datasource for client execution
Igosuki Aug 24, 2021
e8a6206
Add avro to available logical plan nodes
Igosuki Aug 24, 2021
45eac7c
Add ToTimestampMillis as a scalar function in protos
Igosuki Aug 24, 2021
b4340ac
Allow Avro in PhysicalPlan nodes
Igosuki Aug 24, 2021
408f759
Remove remaining confusing references to 'json' in avro mod
Igosuki Aug 24, 2021
f34b995
rename 'parquet' words in avro test and examples
Igosuki Aug 25, 2021
6ce0904
Handle Union of nested lists in arrow reader
Igosuki Aug 25, 2021
e1e40ef
test timestamp arrays
Igosuki Aug 25, 2021
1c79ffb
remove debug statement
Igosuki Aug 25, 2021
c96b781
Make avro optional
Igosuki Aug 26, 2021
060c644
Remove debug statement
Igosuki Aug 26, 2021
72ad35c
Remove GetField usage (see #628)
Igosuki Aug 26, 2021
3c8f6ce
Fix docstring in parser tests
Igosuki Aug 26, 2021
0db3013
Test batch output rather than just rows individually
Igosuki Aug 26, 2021
faa2152
Remove 'csv' from error strings in physical_plan::avro
Igosuki Aug 26, 2021
a6cd7f1
Avro sample sql and explain queries tests in sql.rs
Igosuki Aug 26, 2021
8b4fcae
Activate avro feature for cargo tests in github workflow
Igosuki Aug 26, 2021
b2b7915
Add a test for avro registering multiple files in a single table
Igosuki Aug 26, 2021
8acf062
Switch to Result instead of Option for resolve_string
Igosuki Aug 26, 2021
65c9b9e
Address missing clippy warning should_implement_trait in arrow_to_avr…
Igosuki Aug 26, 2021
1668b96
Add fmt display implementation for AvroExec
Igosuki Aug 30, 2021
c9218e2
ci: fix cargo sql run example, use datafusion/avro feature instead of…
Igosuki Aug 30, 2021
be09a64
license: missing license file for avro_to_arrow/schema.rs
Igosuki Aug 30, 2021
68df268
only run avro datasource tests if features have 'avro'
Igosuki Aug 31, 2021
09c4ecb
refactor: rename infer_avro_schema_from_reader to read_avro_schema_fr…
Igosuki Sep 2, 2021
94b9dcb
Pass None as props to avro schema schema_to_field_with_props until fu…
Igosuki Sep 2, 2021
a03a3c2
Change schema inferance to FixedSizeBinary(16) for Uuid
Igosuki Sep 2, 2021
405c63a
schema: prefix metadata coming from avro with 'avro'
Igosuki Sep 2, 2021
291637a
make num traits optional and part of the avro feature flag
Igosuki Sep 7, 2021
00f109c
Fix avro schema tests regarding external props
Igosuki Sep 7, 2021
ea31e02
split avro physical plan test feature wise and add a non-implemented …
Igosuki Sep 7, 2021
a264858
submodule: switch back to apache/arrow-testing
Igosuki Sep 12, 2021
c62d931
fix_test: columns are now prefixed in the plan
Igosuki Sep 12, 2021
aa45189
avro_test: fix clippy warning cmp-owned
Igosuki Sep 12, 2021
0b746db
avro: move statistics to the physical plan
Igosuki Sep 14, 2021
a709fd9
Use larger stack for tests on CI
alamb Sep 14, 2021
9e6f134
try a different command and size
alamb Sep 14, 2021
3c91748
16MB stack
alamb Sep 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ jobs:
env:
# Disable full debug symbol generation to speed up CI build and keep memory down
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
# Use zstack-size to work around stack overflow issues in debug builds
# https://github.com/apache/arrow-datafusion/issues/419
RUSTFLAGS: "-C debuginfo=1 -C link-args=-Wl,-zstack-size=16000000"
steps:
- uses: actions/checkout@v2
with:
Expand All @@ -105,13 +107,14 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
# run tests on all workspace members with default feature list
cargo test
# run tests on all workspace members with default feature list + avro
cargo test --features=avro
# test datafusion examples
cd datafusion-examples
cargo test --no-default-features
cargo run --example csv_sql
cargo run --example parquet_sql
cargo run --example avro_sql --features=datafusion/avro
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand Down
40 changes: 40 additions & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::sql::parser::FileType;

Expand Down Expand Up @@ -125,6 +126,30 @@ impl BallistaContext {
})
}

/// Create a DataFrame representing an Avro table scan

pub fn read_avro(
&self,
path: &str,
options: AvroReadOptions,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
Ok(df)
}

/// Create a DataFrame representing a Parquet table scan

pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
Expand Down Expand Up @@ -193,6 +218,17 @@ impl BallistaContext {
self.register_table(name, df.as_ref())
}

pub fn register_avro(
&self,
name: &str,
path: &str,
options: AvroReadOptions,
) -> Result<()> {
let df = self.read_avro(path, options)?;
self.register_table(name, df.as_ref())?;
Ok(())
}

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
Expand Down Expand Up @@ -240,6 +276,10 @@ impl BallistaContext {
self.register_parquet(name, location)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ enum ScalarFunction {
SHA384 = 32;
SHA512 = 33;
LN = 34;
TOTIMESTAMPMILLIS = 35;
}

message ScalarFunctionNode {
Expand Down Expand Up @@ -253,6 +254,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
AvroTableScanNode avro_scan = 16;
}
}

Expand Down Expand Up @@ -296,6 +298,15 @@ message ParquetTableScanNode {
repeated LogicalExprNode filters = 4;
}

message AvroTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
Expand Down Expand Up @@ -340,6 +351,7 @@ enum FileType{
NdJson = 0;
Parquet = 1;
CSV = 2;
Avro = 3;
}

message AnalyzeNode {
Expand Down Expand Up @@ -456,6 +468,7 @@ message PhysicalPlanNode {
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
CrossJoinExecNode cross_join = 19;
AvroScanExecNode avro_scan = 20;
}
}

Expand Down Expand Up @@ -609,6 +622,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}

message AvroScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
uint32 batch_size = 5;

// partition filenames
repeated string filename = 8;
}

enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
Expand Down
29 changes: 29 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::logical_plan::{
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -171,6 +172,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
LogicalPlanType::AvroScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let options = AvroReadOptions {
schema: Some(Arc::new(schema.clone())),
file_extension: &scan.file_extension,
};

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_avro_with_name(
&scan.path,
options,
projection,
&scan.table_name,
)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan = convert_box_required!(sort.input)?;
let sort_expr: Vec<Expr> = sort
Expand Down Expand Up @@ -1193,6 +1220,7 @@ impl TryFrom<i32> for protobuf::FileType {
_x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
_x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
_x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
_x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
invalid => Err(BallistaError::General(format!(
"Attempted to convert invalid i32 to protobuf::Filetype: {}",
invalid
Expand All @@ -1209,6 +1237,7 @@ impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
protobuf::FileType::NdJson => FileType::NdJson,
protobuf::FileType::Parquet => FileType::Parquet,
protobuf::FileType::Csv => FileType::CSV,
protobuf::FileType::Avro => FileType::Avro,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,12 @@ mod roundtrip_tests {

let df_schema_ref = schema.to_dfschema_ref()?;

let filetypes: [FileType; 3] =
[FileType::NdJson, FileType::Parquet, FileType::CSV];
let filetypes: [FileType; 4] = [
FileType::NdJson,
FileType::Parquet,
FileType::CSV,
FileType::Avro,
];

for file in filetypes.iter() {
let create_table_node = LogicalPlan::CreateExternalTable {
Expand Down
30 changes: 27 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
use datafusion::datasource::avro::AvroFile;
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Expand Down Expand Up @@ -793,6 +794,19 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
)),
})
} else if let Some(avro) = source.downcast_ref::<AvroFile>() {
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::AvroScan(
protobuf::AvroTableScanNode {
table_name: table_name.to_owned(),
path: avro.path().to_owned(),
projection,
schema: Some(schema),
file_extension: avro.file_extension().to_string(),
filters,
},
)),
})
} else {
Err(BallistaError::General(format!(
"logical plan to_proto unsupported table provider {:?}",
Expand Down Expand Up @@ -974,6 +988,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
FileType::NdJson => protobuf::FileType::NdJson,
FileType::Parquet => protobuf::FileType::Parquet,
FileType::CSV => protobuf::FileType::Csv,
FileType::Avro => protobuf::FileType::Avro,
};

Ok(protobuf::LogicalPlanNode {
Expand Down Expand Up @@ -1098,7 +1113,13 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
)
}
};
let arg = &args[0];
let arg_expr: Option<Box<protobuf::LogicalExprNode>> = if !args.is_empty()
{
let arg = &args[0];
Some(Box::new(arg.try_into()?))
} else {
None
};
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
Expand All @@ -1111,7 +1132,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
protobuf::window_expr_node::WindowFrame::Frame(window_frame.into())
});
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
expr: arg_expr,
window_function: Some(window_function),
partition_by,
order_by,
Expand Down Expand Up @@ -1284,7 +1305,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::Wildcard => Ok(protobuf::LogicalExprNode {
expr_type: Some(protobuf::logical_expr_node::ExprType::Wildcard(true)),
}),
Expr::TryCast { .. } => unimplemented!(),
_ => unimplemented!(),
}
}
}
Expand Down Expand Up @@ -1473,6 +1494,9 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
BuiltinScalarFunction::SHA256 => Ok(protobuf::ScalarFunction::Sha256),
BuiltinScalarFunction::SHA384 => Ok(protobuf::ScalarFunction::Sha384),
BuiltinScalarFunction::SHA512 => Ok(protobuf::ScalarFunction::Sha512),
BuiltinScalarFunction::ToTimestampMillis => {
Ok(protobuf::ScalarFunction::Totimestampmillis)
}
_ => Err(BallistaError::General(format!(
"logical_plan::to_proto() unsupported scalar function {:?}",
self
Expand Down
17 changes: 17 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::avro::{AvroExec, AvroReadOptions};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand Down Expand Up @@ -153,6 +154,21 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
None,
)))
}
PhysicalPlanType::AvroScan(scan) => {
let schema = Arc::new(convert_required!(scan.schema)?);
let options = AvroReadOptions {
schema: Some(schema),
file_extension: &scan.file_extension,
};
let projection = scan.projection.iter().map(|i| *i as usize).collect();
Ok(Arc::new(AvroExec::try_from_path(
&scan.path,
options,
Some(projection),
scan.batch_size as usize,
None,
)?))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(coalesce_batches.input)?;
Expand Down Expand Up @@ -544,6 +560,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Sha384 => BuiltinScalarFunction::SHA384,
ScalarFunction::Sha512 => BuiltinScalarFunction::SHA512,
ScalarFunction::Ln => BuiltinScalarFunction::Ln,
ScalarFunction::Totimestampmillis => BuiltinScalarFunction::ToTimestampMillis,
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::execution_plans::{
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
use datafusion::physical_plan::avro::AvroExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
use datafusion::physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -285,6 +286,28 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
path: exec.path().to_owned(),
filename: exec.filenames().to_vec(),
projection: exec
.projection()
.ok_or_else(|| {
BallistaError::General(
"projection in AvroExec doesn't exist.".to_owned(),
)
})?
.iter()
.map(|n| *n as u32)
.collect(),
file_extension: exec.file_extension().to_owned(),
schema: Some(exec.file_schema().as_ref().into()),
batch_size: exec.batch_size() as u32,
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
let mut partition = vec![];
for location in &exec.partition {
Expand Down
4 changes: 4 additions & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ keywords = [ "arrow", "query", "sql" ]
edition = "2018"
publish = false

[[example]]
name = "avro_sql"
path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "^5.3" }
Expand Down
Loading