Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DataFusion test and try to make ballista compile #4

Merged
merged 13 commits into from
Sep 18, 2021
24 changes: 11 additions & 13 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,21 +368,19 @@ impl ExecutionPlan for ShuffleWriterExec {
}

// build arrays
let partition_num: ArrayRef = Arc::new(partition_builder.finish());
let path: ArrayRef = Arc::new(path_builder.finish());
let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
Box::new(num_rows_builder),
Box::new(num_batches_builder),
Box::new(num_bytes_builder),
let partition_num: ArrayRef = partition_builder.into_arc();
let path: ArrayRef = partition_builder.into_arc();
let field_builders: Vec<Arc<dyn Array>> = vec![
num_rows_builder.into_arc(),
num_batches_builder.into_arc(),
num_bytes_builder.into_arc(),
];
let mut stats_builder = StructBuilder::new(
PartitionStats::default().arrow_struct_fields(),
let stats_builder = StructArray::from_data(
DataType::Struct(PartitionStats::default().arrow_struct_fields()),
field_builders,
None,
);
for _ in 0..num_writers {
stats_builder.append(true)?;
}
let stats = Arc::new(stats_builder.finish());
let stats = Arc::new(stats_builder);

// build result batch containing metadata
let schema = result_schema();
Expand Down Expand Up @@ -582,7 +580,7 @@ mod tests {
schema.clone(),
vec![
Arc::new(UInt32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("hello"), Some("world")])),
Arc::new(Utf8Array::from(vec![Some("hello"), Some("world")])),
],
)?;
let partition = vec![batch.clone(), batch];
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl PartitionStats {
let values = vec![num_rows, num_batches, num_bytes];

Ok(Arc::new(StructArray::from_data(
self.arrow_struct_fields(),
DataType::Struct(self.arrow_struct_fields()),
values,
None,
)))
Expand Down
7 changes: 6 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,12 @@ mod tests {
let results =
execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;

let expected = vec!["++", "||", "++", "++"];
let expected = vec![
"+----+--------------+",
"| c1 | AVG(test.c2) |",
"+----+--------------+",
"+----+--------------+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ mod tests {
"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is due to float nature of inaccuracy, therefore acceptable.

"+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+",
],
&df
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ mod tests {
let expr = col("b1").not().eq(lit(true));
let p = PruningPredicate::try_new(&expr, schema).unwrap();
let result = p.prune(&statistics).unwrap();
assert_eq!(result, vec![true, false, false, true, true]);
assert_eq!(result, vec![true, true, false, true, true]);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, looks like arrow2 fixed a bug that exists in arrow-rs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get too much background on this, since this is a test against pruning of a != true, I think the current behavior is expected

}

/// Creates setup for int32 chunk pruning
Expand Down
7 changes: 5 additions & 2 deletions datafusion/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ mod tests {
let expression = cast_with_options(col("a", &schema)?, &schema, $TYPE)?;

// verify that its display is correct
assert_eq!(format!("CAST(a AS {:?})", $TYPE), format!("{}", expression));
assert_eq!(
format!("CAST(a@0 AS {:?})", $TYPE),
format!("{}", expression)
);

// verify that the expression's type is correct
assert_eq!(expression.data_type(&schema)?, $TYPE);
Expand Down Expand Up @@ -235,7 +238,7 @@ mod tests {
#[test]
fn invalid_cast() {
// Ensure a useful error happens at plan time if invalid casts are used
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]);

let result = cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary);
result.expect_err("expected Invalid CAST");
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/expressions/try_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod tests {
#[test]
fn invalid_cast() {
// Ensure a useful error happens at plan time if invalid casts are used
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Schema::new(vec![Field::new("a", DataType::Null, false)]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int32 -> LargeBinary cast is valid in arrow2, therefore I change to another unacceptable case Null here


let result = try_cast(col("a", &schema).unwrap(), &schema, DataType::LargeBinary);
result.expect_err("expected Invalid CAST");
Expand Down
33 changes: 22 additions & 11 deletions datafusion/src/physical_plan/math_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,42 @@ use rand::{thread_rng, Rng};
use std::iter;
use std::sync::Arc;

use arrow::array::Float32Array;
use arrow::array::Float64Array;
use arrow::compute::arity::unary;
use arrow::datatypes::DataType;

use super::{ColumnarValue, ScalarValue};
use crate::error::{DataFusionError, Result};

macro_rules! downcast_compute_op {
($ARRAY:expr, $NAME:expr, $FUNC:ident, $TYPE:ident, $DT: path) => {{
let n = $ARRAY.as_any().downcast_ref::<$TYPE>();
match n {
Some(array) => {
let res: $TYPE =
unary(array, |x| x.$FUNC(), $DT);
Ok(Arc::new(res))
}
_ => Err(DataFusionError::Internal(format!(
"Invalid data type for {}",
$NAME
))),
}
}};
}

macro_rules! unary_primitive_array_op {
($VALUE:expr, $NAME:expr, $FUNC:ident) => {{
match ($VALUE) {
ColumnarValue::Array(array) => match array.data_type() {
DataType::Float32 => {
let array = array.as_any().downcast_ref().unwrap();
let array = unary::<f32, _, f64>(
array,
|x| x.$FUNC() as f64,
DataType::Float32,
);
Ok(ColumnarValue::Array(Arc::new(array)))
let result = downcast_compute_op!(array, $NAME, $FUNC, Float32Array, DataType::Float32);
Ok(ColumnarValue::Array(result?))
}
DataType::Float64 => {
let array = array.as_any().downcast_ref().unwrap();
let array =
unary::<f64, _, f64>(array, |x| x.$FUNC(), DataType::Float64);
Ok(ColumnarValue::Array(Arc::new(array)))
let result = downcast_compute_op!(array, $NAME, $FUNC, Float64Array, DataType::Float64);
Ok(ColumnarValue::Array(result?))
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for function {}",
Expand Down
8 changes: 4 additions & 4 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl SortPreservingMergeStream {
}

// emit current batch of rows for current buffer
array_data.extend(buffer_idx, start_row_idx, end_row_idx);
array_data.extend(buffer_idx, start_row_idx, end_row_idx - start_row_idx);

// start new batch of rows
buffer_idx = next_buffer_idx;
Expand All @@ -520,7 +520,7 @@ impl SortPreservingMergeStream {
}

// emit final batch of rows
array_data.extend(buffer_idx, start_row_idx, end_row_idx);
array_data.extend(buffer_idx, start_row_idx, end_row_idx - start_row_idx);
array_data.as_arc()
})
.collect();
Expand Down Expand Up @@ -965,7 +965,7 @@ mod tests {
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c7", &schema).unwrap(),
expr: col("c12", &schema).unwrap(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original c7 is not distinguishable enough, i.e. same value exists many times.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 The same thing happens in arrow 6.0 (as pointed out by @houqp ) -- more details here: https://github.com/apache/arrow-datafusion/pull/984/files#r705557467

options: SortOptions::default(),
},
];
Expand Down Expand Up @@ -1180,7 +1180,7 @@ mod tests {
async fn test_async() {
let schema = test::aggr_test_schema();
let sort = vec![PhysicalSortExpr {
expr: col("c7", &schema).unwrap(),
expr: col("c12", &schema).unwrap(),
options: SortOptions::default(),
}];

Expand Down