Skip to content

Commit

Permalink
add partition filter
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed Apr 28, 2024
1 parent e58446b commit 4998799
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 6 deletions.
8 changes: 5 additions & 3 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use arrow::{
compute::kernels::cast::{cast_with_options, CastOptions},
datatypes::{
i256, ArrowDictionaryKeyType, ArrowNativeType, ArrowTimestampType, DataType,
Field, Fields, Float32Type, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
Date32Type, Field, Fields, Float32Type, Int16Type, Int32Type, Int64Type,
Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
IntervalYearMonthType, SchemaBuilder, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
Expand Down Expand Up @@ -3054,7 +3054,9 @@ impl fmt::Display for ScalarValue {
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::Date32(e) => {
format_option!(f, e.map(|v| Date32Type::to_naive_date(v).to_string()))?
}
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::Time32Second(e) => format_option!(f, e)?,
ScalarValue::Time32Millisecond(e) => format_option!(f, e)?,
Expand Down
205 changes: 202 additions & 3 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Helper functions for the table implementation
use std::collections::HashMap;
use std::sync::Arc;

use arrow::compute::{and, cast, prep_null_mask_filter};
Expand All @@ -37,6 +38,7 @@ use crate::{error::Result, scalar::ScalarValue};
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use crate::logical_expr::{BinaryExpr, Operator};
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility};
Expand Down Expand Up @@ -188,9 +190,15 @@ async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
partition_prefix: Option<Path>,
) -> Result<Vec<Partition>> {
let partition = Partition {
path: table_path.prefix().clone(),
path: match partition_prefix {
Some(prefix) => {
Path::from_iter([table_path.prefix().as_ref(), prefix.as_ref()])
}
None => table_path.prefix().clone(),
},
depth: 0,
files: None,
};
Expand Down Expand Up @@ -324,6 +332,82 @@ async fn prune_partitions(
Ok(filtered)
}

#[derive(Debug)]
enum PartitionValue {
Single(String),
Multi,
}

fn populate_partition_values<'a>(
partition_values: &mut HashMap<&'a str, PartitionValue>,
filter: &'a Expr,
) {
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = filter
{
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { ref name, .. }), Expr::Literal(val))
| (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
{
partition_values.insert(name, PartitionValue::Multi);
}
}
_ => {}
},
Operator::And => {
populate_partition_values(partition_values, left);
populate_partition_values(partition_values, right);
}
_ => {}
}
}
}

fn evaluate_partition_prefix<'a>(
partition_cols: &'a [(String, DataType)],
filters: &'a [Expr],
) -> Option<Path> {
let mut partition_values = HashMap::new();

if filters.len() > 1 {
return None;
}

// TODO: only get first one
for filter in filters {
populate_partition_values(&mut partition_values, filter);
}

if partition_values.is_empty() {
return None;
}

let mut parts = vec![];
for (p, _) in partition_cols {
match partition_values.get(p.as_str()) {
Some(PartitionValue::Single(val)) => {
parts.push(format!("{p}={val}"));
}
_ => {
break;
}
}
}

if parts.is_empty() {
None
} else {
Some(Path::from_iter(parts))
}
}

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
Expand All @@ -346,7 +430,10 @@ pub async fn pruned_partition_list<'a>(
));
}

let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
let partitions =
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
.await?;
debug!("Listed {} partitions", partitions.len());

let pruned =
Expand Down Expand Up @@ -436,7 +523,7 @@ mod tests {

use futures::StreamExt;

use crate::logical_expr::{case, col, lit};
use crate::logical_expr::{case, col, lit, Expr, Operator};
use crate::test::object_store::make_test_store_and_state;

use super::*;
Expand Down Expand Up @@ -695,4 +782,116 @@ mod tests {
// this helper function
assert!(expr_applicable_for_cols(&[], &lit(true)));
}

#[test]
fn test_evaluate_partition_prefix() {
let partitions = &[
("a".to_string(), DataType::Utf8),
("b".to_string(), DataType::Int16),
("c".to_string(), DataType::Boolean),
];

assert_eq!(
evaluate_partition_prefix(partitions, &[Expr::eq(col("a"), lit("foo"))],),
Some(Path::from("a=foo")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(
Expr::eq(col("a"), lit("foo")),
Expr::eq(col("b"), lit("bar")),
)],
),
Some(Path::from("a=foo/b=bar")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(
Expr::eq(col("a"), lit("foo")),
Expr::and(
Expr::eq(col("b"), lit("1")),
Expr::eq(col("c"), lit("true")),
),
)],
),
Some(Path::from("a=foo/b=1/c=true")),
);

// no prefix when filter is empty
assert_eq!(evaluate_partition_prefix(partitions, &[]), None);

// b=foo results in no prefix because a is not restricted
assert_eq!(
evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))],),
None,
);

// a=foo and c=baz only results in preifx a=foo because b is not restricted
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(
Expr::eq(col("a"), lit("foo")),
Expr::eq(col("c"), lit("baz")),
)],
),
Some(Path::from("a=foo")),
);

// a=foo or b=bar results in no prefix
assert_eq!(
evaluate_partition_prefix(
partitions,
&[
Expr::eq(col("a"), lit("foo")),
Expr::eq(col("b"), lit("bar")),
],
),
None,
);

// partition with multiple values results in no prefix
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(
Expr::eq(col("a"), lit("foo")),
Expr::eq(col("a"), lit("bar")),
)],
),
None,
);

// no prefix because partition a is not restricted to a single literal
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::or(
Expr::eq(col("a"), lit("foo")),
Expr::eq(col("a"), lit("bar")),
)],
),
None,
);
}

#[test]
fn test_evaluate_date_partition_prefix() {
let partitions = &[("a".to_string(), DataType::Date32)];

assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::eq(
col("a"),
Expr::Literal(ScalarValue::Date32(Some(3)))
)],
),
Some(Path::from("a=1970-01-04")),
);
}
}

0 comments on commit 4998799

Please sign in to comment.