Skip to content

Commit

Permalink
Fix DataFusion test and try to make ballista compile (#4)
Browse files Browse the repository at this point in the history
* wip

* more

* Make scalar.rs compile

* Fix various compilation error due to API difference

* Make datafusion core compile

* fmt

* wip

* wip: compile ballista

* Pass all datafusion tests

* Compile ballista
  • Loading branch information
yjshen authored Sep 18, 2021
1 parent 478f606 commit caf5b22
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 62 deletions.
3 changes: 2 additions & 1 deletion ballista/rust/client/src/columnar_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl ColumnarValue {

pub fn memory_size(&self) -> usize {
match self {
ColumnarValue::Columnar(array) => array.get_array_memory_size(),
// ColumnarValue::Columnar(array) => array.get_array_memory_size(),
ColumnarValue::Columnar(_array) => 0,
_ => 0,
}
}
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ enum TimeUnit{
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
MonthDayNano = 2;
}

message Decimal{
Expand Down
45 changes: 23 additions & 22 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! will use the ShuffleReaderExec to read these results.
use std::fs::File;
use std::iter::Iterator;
use std::iter::{Iterator, FromIterator};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Instant;
Expand Down Expand Up @@ -54,6 +54,7 @@ use futures::StreamExt;
use hashbrown::HashMap;
use log::{debug, info};
use uuid::Uuid;
use std::cell::RefCell;

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
Expand Down Expand Up @@ -227,16 +228,16 @@ impl ShuffleWriterExec {
for (output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let indices = partition_indices.into();

// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
take::take(c.as_ref(),
&PrimitiveArray::<u64>::from_slice(&partition_indices))
.map_err(|e| {
DataFusionError::Execution(e.to_string())
}).map(ArrayRef::from)
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

Expand Down Expand Up @@ -354,7 +355,7 @@ impl ExecutionPlan for ShuffleWriterExec {
// build metadata result batch
let num_writers = part_loc.len();
let mut partition_builder = UInt32Vec::with_capacity(num_writers);
let mut path_builder = MutableUtf8Array::with_capacity(num_writers);
let mut path_builder = MutableUtf8Array::<i32>::with_capacity(num_writers);
let mut num_rows_builder = UInt64Vec::with_capacity(num_writers);
let mut num_batches_builder = UInt64Vec::with_capacity(num_writers);
let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers);
Expand All @@ -368,21 +369,19 @@ impl ExecutionPlan for ShuffleWriterExec {
}

// build arrays
let partition_num: ArrayRef = Arc::new(partition_builder.finish());
let path: ArrayRef = Arc::new(path_builder.finish());
let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
Box::new(num_rows_builder),
Box::new(num_batches_builder),
Box::new(num_bytes_builder),
let partition_num: ArrayRef = partition_builder.into_arc();
let path: ArrayRef = path_builder.into_arc();
let field_builders: Vec<Arc<dyn Array>> = vec![
num_rows_builder.into_arc(),
num_batches_builder.into_arc(),
num_bytes_builder.into_arc(),
];
let mut stats_builder = StructBuilder::new(
PartitionStats::default().arrow_struct_fields(),
let stats_builder = StructArray::from_data(
DataType::Struct(PartitionStats::default().arrow_struct_fields()),
field_builders,
None,
);
for _ in 0..num_writers {
stats_builder.append(true)?;
}
let stats = Arc::new(stats_builder.finish());
let stats = Arc::new(stats_builder);

// build result batch containing metadata
let schema = result_schema();
Expand Down Expand Up @@ -459,7 +458,9 @@ impl<'a> ShuffleWriter<'a> {
let num_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.map(|_array| 0)
// TODO: add arrow2 with array_memory_size capability and enable this.
// .map(|array| array.get_array_memory_size())
.sum();
self.num_bytes += num_bytes as u64;
Ok(())
Expand Down Expand Up @@ -505,7 +506,7 @@ mod tests {
assert_eq!(2, batch.num_rows());
let path = batch.columns()[1]
.as_any()
.downcast_ref::<StringArray>()
.downcast_ref::<Utf8Array<i32>>()
.unwrap();

let file0 = path.value(0);
Expand Down Expand Up @@ -582,7 +583,7 @@ mod tests {
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("hello"), Some("world")])),
Arc::new(Utf8Array::<i32>::from(vec![Some("hello"), Some("world")])),
],
)?;
let partition = vec![batch.clone(), batch];
Expand Down
20 changes: 16 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ mod roundtrip_tests {
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
],
None,
false,
),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Expand All @@ -425,7 +428,10 @@ mod roundtrip_tests {
]),
true,
),
]),
],
None,
false,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down Expand Up @@ -556,7 +562,10 @@ mod roundtrip_tests {
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Field::new("datatype", DataType::Binary, false),
]),
],
None,
false,
),
DataType::Union(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
Expand All @@ -570,7 +579,10 @@ mod roundtrip_tests {
]),
true,
),
]),
],
None,
false,
),
DataType::Dictionary(
Box::new(DataType::Utf8),
Box::new(DataType::Struct(vec![
Expand Down
12 changes: 9 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl protobuf::IntervalUnit {
match interval_unit {
IntervalUnit::YearMonth => protobuf::IntervalUnit::YearMonth,
IntervalUnit::DayTime => protobuf::IntervalUnit::DayTime,
IntervalUnit::MonthDayNano => protobuf::IntervalUnit::MonthDayNano,
}
}

Expand All @@ -62,6 +63,7 @@ impl protobuf::IntervalUnit {
Some(interval_unit) => Ok(match interval_unit {
protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
}),
None => Err(proto_error(
"Error converting i32 to DateUnit: Passed invalid variant",
Expand Down Expand Up @@ -235,7 +237,7 @@ impl TryInto<DataType> for &protobuf::ArrowType {
.iter()
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?;
DataType::Union(union_types)
DataType::Union(union_types, None, false)
}
protobuf::arrow_type::ArrowTypeEnum::Dictionary(boxed_dict) => {
let dict_ref = boxed_dict.as_ref();
Expand Down Expand Up @@ -389,7 +391,7 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
.map(|field| field.into())
.collect::<Vec<_>>(),
}),
DataType::Union(union_types) => ArrowTypeEnum::Union(protobuf::Union {
DataType::Union(union_types, _, _) => ArrowTypeEnum::Union(protobuf::Union {
union_types: union_types
.iter()
.map(|field| field.into())
Expand All @@ -407,6 +409,8 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fractional: *fractional as u64,
})
}
DataType::Extension(_, _, _) =>
panic!("DataType::Extension is not supported")
}
}
}
Expand Down Expand Up @@ -535,14 +539,16 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _) => {
return Err(proto_error(format!(
"Error converting to Datatype to scalar type, {:?} is invalid as a datafusion scalar.",
val
)))
}
DataType::Extension(_, _, _) =>
panic!("DataType::Extension is not supported")
};
Ok(scalar_value)
}
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -243,6 +244,8 @@ impl TryInto<datafusion::arrow::datatypes::DataType>
.iter()
.map(|field| field.try_into())
.collect::<Result<Vec<_>, _>>()?,
None,
false,
),
arrow_type::ArrowTypeEnum::Dictionary(dict) => {
let pb_key_datatype = dict
Expand Down
18 changes: 11 additions & 7 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,17 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
PhysicalPlanType::Window(window_agg) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(window_agg.input)?;
let input_schema = window_agg.input_schema.ok_or_else(|| {
BallistaError::General(
"input_schema in WindowAggrNode is missing.".to_owned(),
)
})?;

let physical_schema = Arc::new(input_schema);
let input_schema = window_agg
.input_schema
.as_ref()
.ok_or_else(|| {
BallistaError::General(
"input_schema in WindowAggrNode is missing.".to_owned(),
)
})?
.clone();
let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);

let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
.window_expr
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl PartitionStats {
let values = vec![num_rows, num_batches, num_bytes];

Ok(Arc::new(StructArray::from_data(
self.arrow_struct_fields(),
DataType::Struct(self.arrow_struct_fields()),
values,
None,
)))
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ pub async fn write_stream_to_disk(
let batch_size_bytes: usize = batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
// .map(|array| array.get_array_memory_size())
.map(|_array| 0)
.sum();
num_batches += 1;
num_rows += batch.num_rows();
Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,12 @@ mod tests {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;

let expected = vec!["++", "||", "++", "++"];
let expected = vec![
"+----+--------------+",
"| c1 | AVG(test.c2) |",
"+----+--------------+",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ mod tests {
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |",
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
],
&df
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ mod tests {
let expr = col("b1").not().eq(lit(true));
let p = PruningPredicate::try_new(&expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, vec![true, false, false, true, true]);
assert_eq!(result, vec![true, true, false, true, true]);
}

/// Creates setup for int32 chunk pruning
Expand Down
7 changes: 5 additions & 2 deletions datafusion/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ mod tests {
let expression = cast_with_options(col("a", &schema)?, &schema, $TYPE)?;

// verify that its display is correct
assert_eq!(format!("CAST(a AS {:?})", $TYPE), format!("{}", expression));
assert_eq!(
format!("CAST(a@0 AS {:?})", $TYPE),
format!("{}", expression)
);

// verify that the expression's type is correct
assert_eq!(expression.data_type(&schema)?, $TYPE);
Expand Down Expand Up @@ -235,7 +238,7 @@ mod tests {
#[test]
fn invalid_cast() {
// Ensure a useful error happens at plan time if invalid casts are used
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]);

let result = cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary);
result.expect_err("expected Invalid CAST");
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/expressions/try_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {
#[test]
fn invalid_cast() {
// Ensure a useful error happens at plan time if invalid casts are used
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]);

let result = try_cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary);
result.expect_err("expected Invalid CAST");
Expand Down
Loading

0 comments on commit caf5b22

Please sign in to comment.