Skip to content

Commit

Permalink
Improve.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 28, 2021
1 parent a309433 commit 26a7e03
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 79 deletions.
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simd = ["arrow2/simd"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "b75f2b507d185f13ec37f81f9c01077a0069e755" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "afb233a" }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
async fn read_small_batches() -> Result<()> {
let table = load_table("alltypes_plain.parquet")?;
let projection = None;
let exec = table.scan(&projection, 2, &[], None)?;
let exec = table.scan(&projection, 2, &[], Some(2))?;
let stream = exec.execute(0).await?;

let _ = stream
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ mod tests {
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439785 | 13.860958726523547 | 21 | 21 |",
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549835 | 8.79396828975897 | 18 | 18 |",
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341557 | 10.206140546981727 | 21 | 21 |",
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
],
&df
Expand Down
160 changes: 92 additions & 68 deletions datafusion/src/physical_plan/distinct_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;

use ahash::RandomState;

use arrow2::array::new_empty_array;
use arrow2::{
array::{Array, ListArray},
datatypes::{DataType, Field},
Expand Down Expand Up @@ -170,34 +171,28 @@ impl Accumulator for DistinctCountAccumulator {
}

fn state(&self) -> Result<Vec<ScalarValue>> {
self.values
.iter()
.map(|distinct_values| {
if distinct_values.is_empty() {
Ok(None)
} else {
// create an array with all distinct values
let arrays = distinct_values
.iter()
.map(ScalarValue::from)
.map(|x| x.to_array())
.collect::<Vec<_>>();
let arrays = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(arrow2::compute::concat::concatenate(&arrays)
.map(|x| x.into())
.map(Some)?)
}
})
.zip(self.state_data_types.iter())
.map(|(x, type_): (Result<Option<Arc<dyn Array>>>, &DataType)| {
x.map(|x| {
ScalarValue::List(
x,
ListArray::<i32>::default_datatype(type_.clone()),
)
})
// create a ListArray for each `state_data_type`. The `ListArray`
let a = self.state_data_types.iter().enumerate().map(|(i, type_)| {
if self.values.is_empty() {
return Ok((new_empty_array(type_.clone()), type_));
};
let arrays = self
.values
.iter()
.map(|distinct_values| ScalarValue::from(&distinct_values[i]).to_array())
.collect::<Vec<_>>();
let arrays = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(arrow2::compute::concat::concatenate(&arrays).map(|x| (x, type_))?)
});
a.map(|values: Result<(Box<dyn Array>, &DataType)>| {
values.map(|(values, type_)| {
ScalarValue::List(
Some(values.into()),
ListArray::<i32>::default_datatype(type_.clone()),
)
})
.collect()
})
.collect()
}

fn evaluate(&self) -> Result<ScalarValue> {
Expand All @@ -211,9 +206,10 @@ impl Accumulator for DistinctCountAccumulator {
}
}

/*
#[cfg(test)]
mod tests {
type ArrayRef = Arc<dyn Array>;

use std::iter::FromIterator;

use super::*;
Expand All @@ -222,25 +218,52 @@ mod tests {
use arrow2::datatypes::DataType;

macro_rules! state_to_vec {
($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{
match $LIST {
ScalarValue::List(_, data_type) => assert_eq!(
ListArray::<i32>::get_child_type(data_type),
&DataType::$DATA_TYPE
),
_ => panic!("Expected a ScalarValue::List"),
}

match $LIST {
ScalarValue::List(None, _) => None,
ScalarValue::List(Some(values), _) => {
let vec = values
.as_any()
.downcast_ref::<$ARRAY_TY>()
.unwrap()
.iter()
.map(|x| x.map(|x| *x))
.collect::<Vec<_>>();

Some(vec)
}
_ => unreachable!(),
}
}};
}

macro_rules! state_to_vec_bool {
($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{
match $LIST {
ScalarValue::List(_, data_type) => match data_type {
DataType::$DATA_TYPE => (),
_ => panic!("Unexpected DataType for list"),
},
ScalarValue::List(_, data_type) => assert_eq!(
ListArray::<i32>::get_child_type(data_type),
&DataType::$DATA_TYPE
),
_ => panic!("Expected a ScalarValue::List"),
}

match $LIST {
ScalarValue::List(None, _) => None,
ScalarValue::List(Some(scalar_values), _) => {
let vec = scalar_values
ScalarValue::List(Some(values), _) => {
let vec = values
.as_any()
.downcast_ref::<$ARRAY_TY>()
.unwrap()
.iter()
.map(|scalar_value| match scalar_value {
ScalarValue::$DATA_TYPE(value) => *value,
_ => panic!("Unexpected ScalarValue variant"),
})
.collect::<Vec<Option<$PRIM_TY>>>();
.collect::<Vec<_>>();

Some(vec)
}
Expand Down Expand Up @@ -319,7 +342,7 @@ mod tests {

macro_rules! test_count_distinct_update_batch_numeric {
($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{
let values: Vec<Option<$PRIM_TYPE>> = vec![
let values = &[
Some(1),
Some(1),
None,
Expand All @@ -336,7 +359,7 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

let mut state_vec =
state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap();
state_vec.sort();

assert_eq!(states.len(), 1);
Expand Down Expand Up @@ -388,7 +411,7 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

let mut state_vec =
state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap();
state_vec.sort_by(|a, b| match (a, b) {
(Some(lhs), Some(rhs)) => {
OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs))
Expand Down Expand Up @@ -472,7 +495,8 @@ mod tests {
let get_count = |data: BooleanArray| -> Result<(Vec<Option<bool>>, u64)> {
let arrays = vec![Arc::new(data) as ArrayRef];
let (states, result) = run_update_batch(&arrays)?;
let mut state_vec = state_to_vec!(&states[0], Boolean, bool).unwrap();
let mut state_vec =
state_to_vec_bool!(&states[0], Boolean, BooleanArray).unwrap();
state_vec.sort();
let count = match result {
ScalarValue::UInt64(c) => c.ok_or_else(|| {
Expand Down Expand Up @@ -532,21 +556,20 @@ mod tests {
let (states, result) = run_update_batch(&arrays)?;

assert_eq!(states.len(), 1);
assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![]));
assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![]));
assert_eq!(result, ScalarValue::UInt64(Some(0)));

Ok(())
}

#[test]
fn count_distinct_update_batch_empty() -> Result<()> {
let arrays =
vec![Arc::new(Int32Array::from(vec![] as Vec<Option<i32>>)) as ArrayRef];
let arrays = vec![Arc::new(Int32Array::new_empty(DataType::Int32)) as ArrayRef];

let (states, result) = run_update_batch(&arrays)?;

assert_eq!(states.len(), 1);
assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![]));
assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![]));
assert_eq!(result, ScalarValue::UInt64(Some(0)));

Ok(())
Expand All @@ -560,8 +583,8 @@ mod tests {

let (states, result) = run_update_batch(&arrays)?;

let state_vec1 = state_to_vec!(&states[0], Int8, i8).unwrap();
let state_vec2 = state_to_vec!(&states[1], Int16, i16).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int8, Int8Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], Int16, Int16Array).unwrap();
let state_pairs = collect_states::<i8, i16>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand Down Expand Up @@ -590,8 +613,8 @@ mod tests {
],
)?;

let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand Down Expand Up @@ -627,8 +650,8 @@ mod tests {
],
)?;

let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_vec1, &state_vec2);

assert_eq!(states.len(), 2);
Expand All @@ -644,23 +667,25 @@ mod tests {

#[test]
fn count_distinct_merge_batch() -> Result<()> {
let state_in1 = ListPrimitive::<i32, Primitive<i32>, i32>::from_iter(vec![
Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]),
Some(vec![Some(-2_i32), Some(-3_i32)]),
])
.to(ListArray::default_datatype(DataType::Int32));
let state_in2 = ListPrimitive::<i32, Primitive<u64>, u64>::from_iter(vec![
Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]),
Some(vec![Some(5_u64), Some(7_u64)]),
])
.to(ListArray::default_datatype(DataType::UInt64));
let state_in1: ListArray<i32> =
ListPrimitive::<i32, Primitive<i32>, i32>::from_iter(vec![
Some(vec![Some(-1_i32), Some(-1_i32), Some(-2_i32), Some(-2_i32)]),
Some(vec![Some(-2_i32), Some(-3_i32)]),
])
.into();

let state_in2: ListArray<i32> =
ListPrimitive::<i32, Primitive<u64>, u64>::from_iter(vec![
Some(vec![Some(5_u64), Some(6_u64), Some(5_u64), Some(7_u64)]),
Some(vec![Some(5_u64), Some(7_u64)]),
])
.into();

let (states, result) =
run_merge_batch(&[Arc::new(state_in1), Arc::new(state_in2)])?;

let state_out_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap();
let state_out_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap();
let state_out_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap();
let state_out_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap();
let state_pairs = collect_states::<i32, u64>(&state_out_vec1, &state_out_vec2);

assert_eq!(
Expand All @@ -679,4 +704,3 @@ mod tests {
Ok(())
}
}
*/
21 changes: 17 additions & 4 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
};

use arrow2::{
array::*, datatypes::*, error::Result as ArrowResult, io::parquet::read,
datatypes::*, error::Result as ArrowResult, io::parquet::read,
record_batch::RecordBatch,
};

Expand Down Expand Up @@ -134,6 +134,12 @@ impl ParquetExec {
let schema = read::get_schema(&file_metadata)?;
let schema = Arc::new(schema);

let total_byte_size: i64 = (&file_metadata.row_groups)
.iter()
.map(|group| group.total_byte_size())
.sum();
let total_byte_size = total_byte_size as usize;

let row_count: i64 = (&file_metadata.row_groups)
.iter()
.map(|group| group.num_rows())
Expand All @@ -147,14 +153,14 @@ impl ParquetExec {

let statistics = Statistics {
num_rows: Some(row_count),
total_byte_size: None,
total_byte_size: Some(total_byte_size),
column_statistics: None,
};
// remove files that are not needed in case of limit
partitions.push(ParquetPartition {
filename: filename.to_string(),
statistics,
});
// remove files that are not needed in case of limit
if num_rows > limit {
break;
}
Expand Down Expand Up @@ -290,10 +296,11 @@ fn producer_task(
response_tx: Sender<Payload>,
projection: &[usize],
schema: SchemaRef,
_limit: usize,
limit: usize,
) -> Result<()> {
let mut file = File::open(path)?;
let metadata = read::read_metadata(&mut file)?;
let mut remaining = limit;
for row_group in 0..metadata.row_groups.len() {
let columns_metadata = metadata.row_groups[row_group].columns();

Expand All @@ -303,12 +310,18 @@ fn producer_task(
let pages = read::get_page_iterator(&metadata, row_group, column, &mut file)?;

let array = read::page_iter_to_array(pages, &columns_metadata[column])?;
let array = if array.len() > remaining {
array.slice(0, remaining)
} else {
array
};
columns.push(
arrow2::compute::cast::cast(array.as_ref(), field.data_type())
.map(|x| x.into())?,
);
}
let batch = RecordBatch::try_new(schema.clone(), columns);
remaining -= batch.as_ref().map(|x| x.num_rows() as usize).unwrap_or(0);
response_tx
.blocking_send(batch)
.map_err(|x| DataFusionError::Execution(format!("{}", x)))?;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ pub enum ScalarValue {
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
// 1st argument are the inner values
// 2st argument is datatype (i.e. it includes `Field`)
// 1st argument are the inner values (e.g. Int64Array)
// 2st argument is the Lists' datatype (i.e. it includes `Field`)
// to downcast inner values, use ListArray::<i32>::get_child()
List(Option<Arc<dyn Array>>, DataType),
/// Date stored as a signed 32bit int
Date32(Option<i32>),
Expand Down

0 comments on commit 26a7e03

Please sign in to comment.