Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests #9

Merged
merged 1 commit into from
Sep 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
Expand All @@ -91,7 +90,6 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ data set.

```rust,no_run
use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::arrow::io::print;
use datafusion::prelude::CsvReadOptions;

#[tokio::main]
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn main() -> Result<()> {

// collect the results and print them to stdout
let results = df.collect().await?;
pretty::print_batches(&results)?;
print::print(&results);
Ok(())
}
```
88 changes: 79 additions & 9 deletions datafusion/src/physical_plan/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,91 @@ use std::sync::Arc;

use super::ColumnarValue;

fn array_array(arrays: &[&dyn Array]) -> Result<FixedSizeListArray> {
fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
assert!(!arrays.is_empty());
let first = arrays[0];
assert!(arrays.iter().all(|x| x.len() == first.len()));
assert!(arrays.iter().all(|x| x.data_type() == first.data_type()));

let size = arrays.len();

let values = concat::concatenate(arrays)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array_array is to zip elements at the same index for each array into a FixedSizeArray, therefore not identical to concatenate that chain all arrays end to end.

let data_type = FixedSizeListArray::default_datatype(first.data_type().clone(), size);
Ok(FixedSizeListArray::from_data(
data_type,
values.into(),
None,
))
macro_rules! array {
($PRIMITIVE: ty, $ARRAY: ty, $DATA_TYPE: path) => {{
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(first.len() * size, $DATA_TYPE);
let mut array = MutableFixedSizeListArray::new(array, size);
// for each entry in the array
for index in 0..first.len() {
let values = array.mut_values();
for arg in arrays {
let arg = arg.as_any().downcast_ref::<$ARRAY>().unwrap();
if arg.is_null(index) {
values.push(None);
} else {
values.push(Some(arg.value(index)));
}
}
}
Ok(array.as_arc())
}};
}

macro_rules! array_string {
($OFFSET: ty) => {{
let array = MutableUtf8Array::<$OFFSET>::with_capacity(first.len() * size);
let mut array = MutableFixedSizeListArray::new(array, size);
// for each entry in the array
for index in 0..first.len() {
let values = array.mut_values();
for arg in arrays {
let arg = arg.as_any().downcast_ref::<Utf8Array<$OFFSET>>().unwrap();
if arg.is_null(index) {
values.push::<&str>(None);
} else {
values.push(Some(arg.value(index)));
}
}
}
Ok(array.as_arc())
}};
}


match first.data_type() {
DataType::Boolean => {
let array = MutableBooleanArray::with_capacity(first.len() * size);
let mut array = MutableFixedSizeListArray::new(array, size);
// for each entry in the array
for index in 0..first.len() {
let values = array.mut_values();
for arg in arrays {
let arg = arg.as_any().downcast_ref::<BooleanArray>().unwrap();
if arg.is_null(index) {
values.push(None);
} else {
values.push(Some(arg.value(index)));
}
}
}
Ok(array.as_arc())
},
DataType::UInt8 => array!(u8, PrimitiveArray<u8>, DataType::UInt8),
DataType::UInt16 => array!(u16, PrimitiveArray<u16>, DataType::UInt16),
DataType::UInt32 => array!(u32, PrimitiveArray<u32>, DataType::UInt32),
DataType::UInt64 => array!(u64, PrimitiveArray<u64>, DataType::UInt64),
DataType::Int8 => array!(i8, PrimitiveArray<i8>, DataType::Int8),
DataType::Int16 => array!(i16, PrimitiveArray<i16>, DataType::Int16),
DataType::Int32 => array!(i32, PrimitiveArray<i32>, DataType::Int32),
DataType::Int64 => array!(i64, PrimitiveArray<i64>, DataType::Int64),
DataType::Float32 => array!(f32, PrimitiveArray<f32>, DataType::Float32),
DataType::Float64 => array!(f64, PrimitiveArray<f64>, DataType::Float64),
DataType::Utf8 => array_string!(i32),
DataType::LargeUtf8 => array_string!(i64),
data_type => Err(DataFusionError::NotImplemented(format!(
"Array is not implemented for type '{:?}'.",
data_type
))),
}

}

/// put values in an array.
Expand All @@ -57,7 +127,7 @@ pub fn array(values: &[ColumnarValue]) -> Result<ColumnarValue> {
})
.collect::<Result<_>>()?;

Ok(ColumnarValue::Array(array_array(&arrays).map(Arc::new)?))
Ok(ColumnarValue::Array(array_array(&arrays)?))
}

/// Currently supported types by the array function.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ impl ExecutionPlan for CsvExec {
});

Ok(Box::pin(CsvStream::new(
self.schema.clone(),
self.projected_schema.clone(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cause for failed tests csv_query_window_with_empty_over and csv_query_window_with_order_by. due to wrong schema provided.

ReceiverStream::new(response_rx),
)))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ impl ScalarValue {
/// Example
/// ```
/// use datafusion::scalar::ScalarValue;
/// use arrow::array::BooleanArray;
/// use arrow::array::{BooleanArray, Array};
///
/// let scalars = vec![
/// ScalarValue::Boolean(Some(true)),
Expand Down
17 changes: 6 additions & 11 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3114,7 +3114,7 @@ async fn query_array() -> Result<()> {
ctx.register_table("test", Arc::new(table))?;
let sql = "SELECT array(c1, cast(c2 as varchar)) FROM test";
let actual = execute(&mut ctx, sql).await;
let expected = vec![vec!["[,0]"], vec!["[a,1]"], vec!["[aa,]"], vec!["[aaa,3]"]];
let expected = vec![vec!["[, 0]"], vec!["[a, 1]"], vec!["[aa, ]"], vec!["[aaa, 3]"]];
assert_eq!(expected, actual);
Ok(())
}
Expand Down Expand Up @@ -4323,16 +4323,9 @@ async fn test_cast_expressions_error() -> Result<()> {
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
let result = collect(plan).await;

match result {
Ok(_) => panic!("expected error"),
Err(e) => {
assert_contains!(e.to_string(),
"Cast error: Cannot cast string 'c' to value of arrow::datatypes::types::Int32Type type"
);
}
}
let actual = execute(&mut ctx, sql).await;
let expected = vec![vec![""]; 100];
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast string to Int32 is possible in arrow2, cast failure will result in NULL

https://github.com/jorgecarleitao/arrow2/blob/main/src/compute/cast/mod.rs#L129

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, filed apache#1056

assert_eq!(expected, actual);

Ok(())
}
Expand Down Expand Up @@ -4538,6 +4531,8 @@ async fn like_on_string_dictionaries() -> Result<()> {
}

#[tokio::test]
#[ignore]
// FIXME: https://github.com/apache/arrow-datafusion/issues/1035
async fn test_regexp_is_match() -> Result<()> {
let input = Utf8Array::<i32>::from(vec![
Some("foo"),
Expand Down