Skip to content

Commit

Permalink
[SPARK-41636][SQL] Make sure selectFilters returns predicates in de…
Browse files Browse the repository at this point in the history
…terministic order

### What changes were proposed in this pull request?
Method `DataSourceStrategy#selectFilters`, which is used to determine "pushdown-able" filters, does not preserve the order of the input Seq[Expression] nor does it return the same order across the same plans. This is resulting in CodeGenerator cache misses even when the exact same LogicalPlan is executed.

This PR to make sure `selectFilters` returns predicates in deterministic order.

### Why are the changes needed?
Make sure `selectFilters` returns predicates in deterministic order, to reduce the probability of codegen cache misses.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

Closes apache#42265 from Hisoka-X/SPARK-41636_selectfilters_order.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
Hisoka-X authored and vpolet committed Aug 24, 2023
1 parent bbfbb1e commit a0c831b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources

import java.util.Locale

import scala.collection.immutable.ListMap
import scala.collection.mutable

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -670,9 +671,10 @@ object DataSourceStrategy
// A map from original Catalyst expressions to corresponding translated data source filters.
// If a predicate is not in this map, it means it cannot be pushed down.
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
// SPARK-41636: we keep the order of the predicates to avoid CodeGenerator cache misses
val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { p =>
translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f)
}.toMap
}: _*)

val pushedFilters: Seq[Filter] = translatedMap.values.toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
DataSourceStrategy.translateFilter(catalystFilter, true)
}
}

test("SPARK-41636: selectFilters returns predicates in deterministic order") {

val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6))

val (unhandledPredicates, pushedFilters, handledFilters) =
DataSourceStrategy.selectFilters(FakeRelation(), predicates)
assert(unhandledPredicates.equals(predicates))
assert(pushedFilters.zipWithIndex.forall { case (f, i) =>
f.equals(sources.EqualTo("id", i + 1))
})
assert(handledFilters.isEmpty)
}
}

0 comments on commit a0c831b

Please sign in to comment.