Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore last test, fix cargo clippy, format and pass integration tests #10

Merged
merged 4 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ members = [
exclude = ["python"]

[patch.crates-io]
arrow2 = { path = "/home/houqp/Documents/code/arrow/arrow2" }
arrow-flight = { path = "/home/houqp/Documents/code/arrow/arrow2/arrow-flight" }
parquet2 = { path = "/home/houqp/Documents/code/arrow/parquet2" }
arrow2 = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }
arrow-flight = { git = "https://github.com/houqp/arrow2.git", branch = "qp_ord" }
2 changes: 1 addition & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ mod tests {
fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> {
self.fields()
.iter()
.position(|c| c.name() == &column_name)
.position(|c| c.name() == column_name)
.map(|pos| self.values()[pos].borrow())
}
}
Expand Down
7 changes: 4 additions & 3 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;

use arrow::io::parquet::write::{Compression, Version, WriteOptions};
use ballista::prelude::{
BallistaConfig, BallistaContext, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -179,7 +182,7 @@ async fn main() -> Result<()> {
env_logger::init();
match TpchOpt::from_args() {
TpchOpt::Benchmark(BallistaBenchmark(opt)) => {
todo!() //benchmark_ballista(opt).await.map(|_| ())
benchmark_ballista(opt).await.map(|_| ())
}
TpchOpt::Benchmark(DataFusionBenchmark(opt)) => {
benchmark_datafusion(opt).await.map(|_| ())
Expand Down Expand Up @@ -239,7 +242,6 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
Ok(result)
}

/*
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
println!("Running benchmarks with the following options: {:?}", opt);

Expand Down Expand Up @@ -316,7 +318,6 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {

Ok(())
}
*/

fn get_query_sql(query: usize) -> Result<String> {
if query > 0 && query < 23 {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use datafusion::arrow::{
record_batch::RecordBatch,
};

use arrow::array::Array;
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;
use arrow::array::Array;

// create local execution context with an in-memory table
fn create_context() -> Result<ExecutionContext> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ fn create_record_batch(
vec![
Arc::new(Utf8Array::<i32>::from_slice(keys)),
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
Arc::new(Float64Array::from_slice(values)),
Arc::new(UInt64Array::from_slice(integer_values_wide)),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(UInt64Array::from_slice(integer_values_narrow)),
],
)
Expand Down
14 changes: 7 additions & 7 deletions datafusion/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use criterion::{BatchSize, Criterion};
extern crate arrow;
extern crate datafusion;

use std::{iter::FromIterator, sync::Arc};
use std::sync::Arc;

use arrow::{
array::{ArrayRef, Int64Array, StringArray},
array::{ArrayRef, Int64Array, Utf8Array},
record_batch::RecordBatch,
};
use tokio::runtime::Runtime;
Expand All @@ -39,7 +39,7 @@ use datafusion::physical_plan::{
// Initialise the operator using the provided record batches and the sort key
// as inputs. All record batches must have the same schema.
fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
let schema = batches[0].schema();
let schema = batches[0].schema().clone();

let sort = sort
.iter()
Expand All @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
schema.clone(),
schema,
None,
)
.unwrap();
Expand Down Expand Up @@ -104,9 +104,9 @@ fn batches(
col_b.sort();
col_c.sort();

let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a));
let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b));
let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c));
let col_a: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_a));
let col_b: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_b));
let col_c: ArrayRef = Arc::new(Utf8Array::<i32>::from(col_c));
let col_d: ArrayRef = Arc::new(Int64Array::from(col_d));

let rb = RecordBatch::try_from_iter(vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/arrow_temporal_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod tests {
// Note: Use chrono APIs that are different than
// naive_datetime_to_timestamp to compute the utc offset to
// try and double check the logic
let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) {
let utc_offset_secs = match Local.offset_from_local_datetime(naive_datetime) {
LocalResult::Single(local_offset) => {
local_offset.fix().local_minus_utc() as i64
}
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ impl DataFrame for DataFrameImpl {
/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
Ok(print::print(&results))
print::print(&results);
Ok(())
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
Ok(print::print(&results))
print::print(&results);
Ok(())
}

/// Convert the logical plan represented by this DataFrame into a physical plan and
Expand Down
11 changes: 5 additions & 6 deletions datafusion/src/physical_plan/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
use crate::error::{DataFusionError, Result};
use arrow::array::*;
use arrow::compute::concat;
use arrow::datatypes::DataType;
use std::sync::Arc;

use super::ColumnarValue;

Expand All @@ -35,7 +33,10 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {

macro_rules! array {
($PRIMITIVE: ty, $ARRAY: ty, $DATA_TYPE: path) => {{
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(first.len() * size, $DATA_TYPE);
let array = MutablePrimitiveArray::<$PRIMITIVE>::with_capacity_from(
first.len() * size,
$DATA_TYPE,
);
let mut array = MutableFixedSizeListArray::new(array, size);
// for each entry in the array
for index in 0..first.len() {
Expand Down Expand Up @@ -73,7 +74,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
}};
}


match first.data_type() {
DataType::Boolean => {
let array = MutableBooleanArray::with_capacity(first.len() * size);
Expand All @@ -91,7 +91,7 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
}
}
Ok(array.as_arc())
},
}
DataType::UInt8 => array!(u8, PrimitiveArray<u8>, DataType::UInt8),
DataType::UInt16 => array!(u16, PrimitiveArray<u16>, DataType::UInt16),
DataType::UInt32 => array!(u32, PrimitiveArray<u32>, DataType::UInt32),
Expand All @@ -109,7 +109,6 @@ fn array_array(arrays: &[&dyn Array]) -> Result<ArrayRef> {
data_type
))),
}

}

/// put values in an array.
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,18 @@ impl CsvExec {
filenames: &[String],
options: &CsvReadOptions,
) -> Result<Schema> {
Ok(infer_schema_from_files(
infer_schema_from_files(
filenames,
options.delimiter,
Some(options.schema_infer_max_records),
options.has_header,
)?)
)
}
}

type Payload = ArrowResult<RecordBatch>;

#[allow(clippy::too_many_arguments)]
fn producer_task<R: Read>(
reader: R,
response_tx: Sender<Payload>,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,8 @@ fn evaluate_scalar(
Ok(None)
}
}
} else if matches!(op, Or) {
// TODO: optimize scalar Or
Ok(None)
} else if matches!(op, And) {
// TODO: optimize scalar And
} else if matches!(op, Or | And) {
// TODO: optimize scalar Or | And
Ok(None)
} else {
match (lhs.data_type(), op) {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
};

use arrow::array::{BooleanArray, Array};
use arrow::array::{Array, BooleanArray};
use arrow::compute::filter::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use async_trait::async_trait;

use futures::stream::{Stream, StreamExt};
use arrow::compute::boolean::{and, is_not_null};
use futures::stream::{Stream, StreamExt};

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
/// include in its output batches.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ pub fn create_physical_fun(
))),
}),
BuiltinScalarFunction::BitLength => Arc::new(|args| match &args[0] {
ColumnarValue::Array(v) => todo!(),
ColumnarValue::Array(_v) => todo!(),
ColumnarValue::Scalar(v) => match v {
ScalarValue::Utf8(v) => Ok(ColumnarValue::Scalar(ScalarValue::Int32(
v.as_ref().map(|x| (x.len() * 8) as i32),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ impl RecordBatchStream for HashAggregateStream {

/// Given Vec<Vec<ArrayRef>>, concatenates the inners `Vec<ArrayRef>` into `ArrayRef`, returning `Vec<ArrayRef>`
/// This assumes that `arrays` is not empty.
#[allow(dead_code)]
fn concatenate(arrays: Vec<Vec<ArrayRef>>) -> ArrowResult<Vec<ArrayRef>> {
(0..arrays[0].len())
.map(|column| {
Expand Down Expand Up @@ -968,7 +969,7 @@ fn create_batch_from_map(
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| {
arrow::compute::cast::cast(col.as_ref(), desired_field.data_type())
.map(|v| Arc::from(v))
.map(Arc::from)
})
.collect::<ArrowResult<Vec<_>>>()?;

Expand Down
11 changes: 7 additions & 4 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Execution plan for reading Parquet files
/// FIXME: https://github.com/apache/arrow-datafusion/issues/1058
use fmt::Debug;
use std::fmt;
use std::fs::File;
Expand Down Expand Up @@ -47,7 +48,7 @@ use log::debug;
use parquet::statistics::{
BinaryStatistics as ParquetBinaryStatistics,
BooleanStatistics as ParquetBooleanStatistics,
PrimitiveStatistics as ParquetPrimitiveStatistics, Statistics as ParquetStatistics,
PrimitiveStatistics as ParquetPrimitiveStatistics,
};

use tokio::{
Expand Down Expand Up @@ -294,6 +295,7 @@ impl ParquetFileMetrics {

type Payload = ArrowResult<RecordBatch>;

#[allow(dead_code)]
fn producer_task(
path: &str,
response_tx: Sender<Payload>,
Expand Down Expand Up @@ -416,6 +418,7 @@ impl ExecutionPlan for ParquetExec {
}
}

#[allow(dead_code)]
fn send_result(
response_tx: &Sender<ArrowResult<RecordBatch>>,
result: ArrowResult<RecordBatch>,
Expand Down Expand Up @@ -520,7 +523,7 @@ macro_rules! get_min_max_values {
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v))
ScalarValue::iter_to_array(scalar_values).ok().map(Arc::from)
}}
}

Expand Down Expand Up @@ -575,7 +578,7 @@ fn read_partition(
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
predicate_builder: &Option<PruningPredicate>,
batch_size: usize,
_batch_size: usize,
response_tx: Sender<ArrowResult<RecordBatch>>,
limit: Option<usize>,
) -> Result<()> {
Expand All @@ -593,7 +596,7 @@ fn read_partition(
)?;

if let Some(predicate_builder) = predicate_builder {
let file_metadata = reader.metadata();
let _file_metadata = reader.metadata();
reader.set_groups_filter(Arc::new(build_row_group_predicate(
predicate_builder,
file_metrics,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{Array, ArrayRef, UInt32Array, UInt64Array, Utf8Array},
array::{Array, UInt64Array},
error::Result as ArrowResult,
};
use arrow::{compute::take, datatypes::SchemaRef};
Expand Down Expand Up @@ -462,6 +462,7 @@ mod tests {
physical_plan::{expressions::col, memory::MemoryExec},
test::exec::{BarrierExec, ErrorExec, MockExec},
};
use arrow::array::{ArrayRef, UInt32Array, Utf8Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,11 @@ mod tests {
let schema = partitions[0][0].schema();
let sort = vec![
PhysicalSortExpr {
expr: col("b", &schema).unwrap(),
expr: col("b", schema).unwrap(),
options: Default::default(),
},
PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
expr: col("c", schema).unwrap(),
options: Default::default(),
},
];
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/windows/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl AggregateWindowExpr {
.collect::<Vec<ArrayRef>>();
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat::concatenate(&results)
.map(|x| ArrayRef::from(x))
.map(ArrayRef::from)
.map_err(DataFusionError::ArrowError)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/windows/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl WindowExpr for BuiltInWindowExpr {
};
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat::concatenate(&results)
.map(|x| ArrayRef::from(x))
.map(ArrayRef::from)
.map_err(DataFusionError::ArrowError)
}
}
Loading