Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 3, 2021
1 parent 321fda4 commit fe6b846
Show file tree
Hide file tree
Showing 114 changed files with 2,851 additions and 4,822 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
[workspace]
members = [
"datafusion",
"benchmarks",
"datafusion-cli",
"datafusion-examples",
"benchmarks",
"ballista/rust/client",
"ballista/rust/core",
"ballista/rust/executor",
"ballista/rust/scheduler",
]

exclude = ["python"]
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand All @@ -92,8 +92,8 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow2::util::pretty::print_batches;
use arrow2::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }

arrow = { version = "4.0" }
arrow-flight = { version = "4.0" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }
arrow-flight = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }

datafusion = { path = "../../../datafusion" }

Expand Down
65 changes: 32 additions & 33 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
unimplemented,
};

use arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
Expand Down Expand Up @@ -299,9 +299,9 @@ impl TryInto<datafusion::logical_plan::DFSchemaRef> for protobuf::Schema {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
impl TryInto<DataType> for &protobuf::scalar_type::Datatype {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::scalar_type::Datatype;
Ok(match self {
Datatype::Scalar(scalar_type) => {
Expand Down Expand Up @@ -332,17 +332,18 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
))
})?;
//Because length is checked above it is safe to unwrap .last()
let mut scalar_type =
arrow::datatypes::DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
pb_scalar_type.into(),
true,
)));
let mut scalar_type = DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
pb_scalar_type.into(),
true,
)));
//Iterate over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let new_datatype = arrow::datatypes::DataType::List(Box::new(
Field::new(name.as_str(), scalar_type, true),
));
let new_datatype = DataType::List(Box::new(Field::new(
name.as_str(),
scalar_type,
true,
)));
scalar_type = new_datatype;
}
scalar_type
Expand All @@ -351,11 +352,11 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::scalar_type::Datatype {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnum {
impl TryInto<DataType> for &protobuf::arrow_type::ArrowTypeEnum {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
use arrow::datatypes::DataType;
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::arrow_type;
use DataType;
Ok(match self {
arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
Expand Down Expand Up @@ -467,9 +468,9 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::arrow_type::ArrowTypeEnu
}

#[allow(clippy::from_over_into)]
impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> arrow::datatypes::DataType {
use arrow::datatypes::DataType;
impl Into<DataType> for protobuf::PrimitiveScalarType {
fn into(self) -> DataType {
use DataType;
match self {
protobuf::PrimitiveScalarType::Bool => DataType::Boolean,
protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8,
Expand All @@ -486,10 +487,10 @@ impl Into<arrow::datatypes::DataType> for protobuf::PrimitiveScalarType {
protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8,
protobuf::PrimitiveScalarType::Date32 => DataType::Date32,
protobuf::PrimitiveScalarType::TimeMicrosecond => {
DataType::Time64(arrow::datatypes::TimeUnit::Microsecond)
DataType::Time64(TimeUnit::Microsecond)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond)
DataType::Time64(TimeUnit::Nanosecond)
}
protobuf::PrimitiveScalarType::Null => DataType::Null,
}
Expand Down Expand Up @@ -746,9 +747,9 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarListValue {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
impl TryInto<DataType> for &protobuf::ScalarListType {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
use protobuf::PrimitiveScalarType;
let protobuf::ScalarListType {
deepest_type,
Expand All @@ -762,7 +763,7 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
));
}

let mut curr_type = arrow::datatypes::DataType::List(Box::new(Field::new(
let mut curr_type = DataType::List(Box::new(Field::new(
//Since checked vector is not empty above this is safe to unwrap
field_names.last().unwrap(),
PrimitiveScalarType::from_i32(*deepest_type)
Expand All @@ -774,9 +775,8 @@ impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarListType {
)));
//Iterates over field names in reverse order except for the last item in the vector
for name in field_names.iter().rev().skip(1) {
let temp_curr_type = arrow::datatypes::DataType::List(Box::new(Field::new(
name, curr_type, true,
)));
let temp_curr_type =
DataType::List(Box::new(Field::new(name, curr_type, true)));
curr_type = temp_curr_type;
}
Ok(curr_type)
Expand Down Expand Up @@ -876,8 +876,7 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()?;
let scalar_type: arrow::datatypes::DataType =
pb_scalar_type.try_into()?;
let scalar_type: DataType = pb_scalar_type.try_into()?;
ScalarValue::List(Some(typechecked_values), scalar_type)
}
protobuf::scalar_value::Value::NullListValue(v) => {
Expand Down Expand Up @@ -1169,9 +1168,9 @@ fn from_proto_binary_op(op: &str) -> Result<Operator, BallistaError> {
}
}

impl TryInto<arrow::datatypes::DataType> for &protobuf::ScalarType {
impl TryInto<DataType> for &protobuf::ScalarType {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::DataType, Self::Error> {
fn try_into(self) -> Result<DataType, Self::Error> {
let pb_scalartype = self.datatype.as_ref().ok_or_else(|| {
proto_error("ScalarType message missing required field 'datatype'")
})?;
Expand Down Expand Up @@ -1202,16 +1201,16 @@ impl TryInto<Schema> for &protobuf::Schema {
}
}

impl TryInto<arrow::datatypes::Field> for &protobuf::Field {
impl TryInto<Field> for &protobuf::Field {
type Error = BallistaError;
fn try_into(self) -> Result<arrow::datatypes::Field, Self::Error> {
fn try_into(self) -> Result<Field, Self::Error> {
let pb_datatype = self.arrow_type.as_ref().ok_or_else(|| {
proto_error(
"Protobuf deserialization error: Field message missing required field 'arrow_type'",
)
})?;

Ok(arrow::datatypes::Field::new(
Ok(Field::new(
self.name.as_str(),
pb_datatype.as_ref().try_into()?,
self.nullable,
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod roundtrip_tests {

use super::super::{super::error::Result, protobuf};
use crate::error::BallistaError;
use arrow::datatypes::{DataType, Field, Schema};
use core::panic;
use datafusion::arrow2::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::functions::BuiltinScalarFunction::Sqrt;
use datafusion::{
logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder},
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{

use crate::datasource::DfTableAdapter;
use crate::serde::{protobuf, BallistaError};
use arrow::datatypes::{DataType, Schema};
use datafusion::arrow2::datatypes::{DataType, Schema};
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{Expr, JoinType, LogicalPlan};
use datafusion::physical_plan::aggregates::AggregateFunction;
Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
})?
.clone();

let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);
let physical_schema = Arc::new(input_schema);

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
Expand Down
8 changes: 2 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod roundtrip_tests {
use datafusion::physical_plan::hash_utils::JoinType;
use std::{convert::TryInto, sync::Arc};

use arrow::datatypes::{DataType, Schema};
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::ColumnarValue;
use datafusion::physical_plan::{
empty::EmptyExec,
Expand Down Expand Up @@ -75,7 +75,6 @@ mod roundtrip_tests {

#[test]
fn roundtrip_hash_join() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
let field_a = Field::new("col", DataType::Int64, false);
let schema_left = Schema::new(vec![field_a.clone()]);
let schema_right = Schema::new(vec![field_a]);
Expand All @@ -95,7 +94,6 @@ mod roundtrip_tests {

#[test]
fn rountrip_hash_aggregate() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a"), "unused".to_string())];

Expand All @@ -120,7 +118,6 @@ mod roundtrip_tests {

#[test]
fn roundtrip_filter_with_not_and_in_list() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::Operator;
use datafusion::physical_plan::{
expressions::{binary, lit, InListExpr, NotExpr},
Expand Down Expand Up @@ -149,8 +146,7 @@ mod roundtrip_tests {

#[test]
fn roundtrip_sort() -> Result<()> {
use arrow::compute::kernels::sort::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::compute::sort::SortOptions;
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
Expand Down
6 changes: 2 additions & 4 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use std::{collections::HashMap, sync::Arc};

use arrow::array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow2::array::*;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::Serialize;
Expand Down
1 change: 0 additions & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client" }
structopt = { version = "0.3", default-features = false }
tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread"] }
futures = "0.3"
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/bin/nyctaxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::path::PathBuf;
use std::process;
use std::time::Instant;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::util::pretty;
use datafusion::arrow2::datatypes::{DataType, Field, Schema};
use datafusion::arrow2::io::print;

use datafusion::error::Result;
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
let physical_plan = ctx.create_physical_plan(&plan)?;
let result = collect(physical_plan).await?;
if debug {
pretty::print_batches(&result)?;
print::print(&result)?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit fe6b846

Please sign in to comment.