From a0c831b38cb58a3bfcf4ea529e188573140324aa Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 7 Aug 2023 09:07:48 +0900 Subject: [PATCH] [SPARK-41636][SQL] Make sure `selectFilters` returns predicates in deterministic order ### What changes were proposed in this pull request? Method `DataSourceStrategy#selectFilters`, which is used to determine "pushdown-able" filters, does not preserve the order of the input Seq[Expression] nor does it return the same order across the same plans. This is resulting in CodeGenerator cache misses even when the exact same LogicalPlan is executed. This PR to make sure `selectFilters` returns predicates in deterministic order. ### Why are the changes needed? Make sure `selectFilters` returns predicates in deterministic order, to reduce the probability of codegen cache misses. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #42265 from Hisoka-X/SPARK-41636_selectfilters_order. Authored-by: Jia Fan Signed-off-by: Hyukjin Kwon --- .../execution/datasources/DataSourceStrategy.scala | 6 ++++-- .../datasources/DataSourceStrategySuite.scala | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5e6e0ad039258..94c2d2ffaca59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.immutable.ListMap import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -670,9 +671,10 @@ object DataSourceStrategy // A map from original Catalyst expressions to corresponding translated data source filters. // If a predicate is not in this map, it means it cannot be pushed down. val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) - val translatedMap: Map[Expression, Filter] = predicates.flatMap { p => + // SPARK-41636: we keep the order of the predicates to avoid CodeGenerator cache misses + val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { p => translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f) - }.toMap + }: _*) val pushedFilters: Seq[Filter] = translatedMap.values.toSeq diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index a35fb5f627145..2b9ec97bace1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { DataSourceStrategy.translateFilter(catalystFilter, true) } } + + test("SPARK-41636: selectFilters returns predicates in deterministic order") { + + val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2), + EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6)) + + val (unhandledPredicates, pushedFilters, handledFilters) = + DataSourceStrategy.selectFilters(FakeRelation(), predicates) + assert(unhandledPredicates.equals(predicates)) + assert(pushedFilters.zipWithIndex.forall { case (f, i) => + f.equals(sources.EqualTo("id", i + 1)) + }) + assert(handledFilters.isEmpty) + } }