Skip to content

Commit

Permalink
[SPARK-39447][SQL][3.2] Avoid AssertionError in AdaptiveSparkPlanExec…
Browse files Browse the repository at this point in the history
….doExecuteBroadcast

This is a backport of #36974 for branch-3.2

### What changes were proposed in this pull request?

Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast exchange for DPP.

### Why are the changes needed?

The currentPhysicalPlan can be wrapped with broadcast query stage so it is not safe to match it. For example:
 The broadcast exchange which is added by DPP is running before than the normal broadcast exchange(e.g. introduced by join).

### Does this PR introduce _any_ user-facing change?

yes bug fix

### How was this patch tested?

add test

Closes #37087 from ulysses-you/inputplan-3.2.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
ulysses-you authored and cloud-fan committed Jul 7, 2022
1 parent be891ad commit 32aff86
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ case class AdaptiveSparkPlanExec(
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
// already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
val finalPlan = currentPhysicalPlan match {
val finalPlan = inputPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
case _ => newPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite {

test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") {
val df = sql(
"""
|WITH empty_result AS (
| SELECT * FROM fact_stats WHERE product_id < 0
|)
|SELECT *
|FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
| FROM fact_sk
| JOIN empty_result
| ON fact_sk.product_id = empty_result.product_id) t2
| JOIN empty_result
| ON t2.store_id = empty_result.store_id
""".stripMargin)

checkPartitionPruningPredicate(df, false, false)
checkAnswer(df, Nil)
}

test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " +
"rather than createSparkPlan to re-plan subquery") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down

0 comments on commit 32aff86

Please sign in to comment.