Skip to content

Commit

Permalink
Fix detection of parquet filter pushdown (#910)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Feb 29, 2024
1 parent 73e19b7 commit 2d380c7
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
13 changes: 8 additions & 5 deletions dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
Literal,
Or,
Projection,
are_co_aligned,
determine_column_projection,
)
from dask_expr._reductions import Len
Expand Down Expand Up @@ -1173,15 +1172,19 @@ def combine(self, other: _DNF | _And | _Or | list | tuple | None) -> _DNF:
def extract_pq_filters(cls, pq_expr: ReadParquet, predicate_expr: Expr) -> _DNF:
_filters = None
if isinstance(predicate_expr, (LE, GE, LT, GT, EQ, NE)):
if are_co_aligned(pq_expr, predicate_expr.left) and not isinstance(
predicate_expr.right, Expr
if (
not isinstance(predicate_expr.right, Expr)
and isinstance(predicate_expr.left, Projection)
and predicate_expr.left.frame._name == pq_expr._name
):
op = predicate_expr._operator_repr
column = predicate_expr.left.columns[0]
value = predicate_expr.right
_filters = (column, op, value)
elif are_co_aligned(pq_expr, predicate_expr.right) and not isinstance(
predicate_expr.left, Expr
elif (
not isinstance(predicate_expr.left, Expr)
and isinstance(predicate_expr.left, Projection)
and predicate_expr.left.frame._name == pq_expr._name
):
# Simple dict to make sure field comes first in filter
flip = {LE: GE, LT: GT, GE: LE, GT: LT}
Expand Down
93 changes: 91 additions & 2 deletions dask_expr/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from pyarrow import fs

from dask_expr import from_pandas, read_parquet
from dask_expr._expr import Lengths, Literal
from dask_expr._expr import Filter, Lengths, Literal
from dask_expr._reductions import Len
from dask_expr.io import ReadParquet
from dask_expr.io import FusedIO, ReadParquet


def _make_file(dir, df=None):
Expand Down Expand Up @@ -121,3 +121,92 @@ def test_partition_pruning(tmpdir):
# FIXME ?
check_names=False,
)


def test_predicate_pushdown(tmpdir):
original = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5] * 10,
"b": [0, 1, 2, 3, 4] * 10,
"c": range(50),
"d": [6, 7] * 25,
"e": [8, 9] * 25,
}
)
fn = _make_file(tmpdir, df=original)
df = read_parquet(fn, filesystem="arrow")
assert_eq(df, original)
x = df[df.a == 5][df.c > 20]["b"]
y = x.optimize(fuse=False)
assert isinstance(y.expr.frame, FusedIO)
assert ("a", "==", 5) in y.expr.frame.operands[0].operand("filters")[0]
assert ("c", ">", 20) in y.expr.frame.operands[0].operand("filters")[0]
assert list(y.columns) == ["b"]

# Check computed result
y_result = y.compute()
assert y_result.name == "b"
assert len(y_result) == 6
assert (y_result == 4).all()

# Don't push down if replace is in there
x = df[df.replace(5, 50).a == 5]["b"]
y = x.optimize(fuse=False)
assert isinstance(y.expr, Filter)
assert len(y.compute()) == 0


def test_predicate_pushdown_compound(tmpdir):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5] * 10,
"b": [0, 1, 2, 3, 4] * 10,
"c": range(50),
"d": [6, 7] * 25,
"e": [8, 9] * 25,
}
)
fn = _make_file(tmpdir, df=pdf)
df = read_parquet(fn, filesystem="arrow")

# Test AND
x = df[(df.a == 5) & (df.c > 20)]["b"]
y = x.optimize(fuse=False)
assert isinstance(y.expr.frame, FusedIO)
assert {("c", ">", 20), ("a", "==", 5)} == set(y.expr.frame.operands[0].filters[0])
assert_eq(
y,
pdf[(pdf.a == 5) & (pdf.c > 20)]["b"],
check_index=False,
)

# Test OR
x = df[(df.a == 5) | (df.c > 20)]
x = x[x.b != 0]["b"]
y = x.optimize(fuse=False)
assert isinstance(y.expr.frame, FusedIO)
filters = [
set(y.expr.frame.operands[0].filters[0]),
set(y.expr.frame.operands[0].filters[1]),
]
assert {("c", ">", 20), ("b", "!=", 0)} in filters
assert {("a", "==", 5), ("b", "!=", 0)} in filters
expect = pdf[(pdf.a == 5) | (pdf.c > 20)]
expect = expect[expect.b != 0]["b"]
assert_eq(
y,
expect,
check_index=False,
)

# Test OR and AND
x = df[((df.a == 5) | (df.c > 20)) & (df.b != 0)]["b"]
z = x.optimize(fuse=False)
assert isinstance(z.expr.frame, FusedIO)
filters = [
set(z.expr.frame.operands[0].filters[0]),
set(z.expr.frame.operands[0].filters[1]),
]
assert {("c", ">", 20), ("b", "!=", 0)} in filters
assert {("a", "==", 5), ("b", "!=", 0)} in filters
assert_eq(y, z)

0 comments on commit 2d380c7

Please sign in to comment.