From 94a69ffe0d26369004dcca8d9cb30057d2dcbf96 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 4 Jan 2022 23:59:16 -0800 Subject: [PATCH] Revert "[SPARK-37779][SQL] Make ColumnarToRowExec plan canonicalizable after (de)serialization" This reverts commit e17ab6e995a213e442e91df168e87fb724672613. --- .../apache/spark/sql/execution/Columnar.scala | 3 +-- .../spark/sql/execution/SparkPlanSuite.scala | 19 ------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index e2bdf4e2d955b..ccb525d2e192e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -64,8 +64,7 @@ trait ColumnarToRowTransition extends UnaryExecNode * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport { - // supportsColumnar requires to be only called on driver side, see also SPARK-37779. - assert(TaskContext.get != null || child.supportsColumnar) + assert(child.supportsColumnar) override def output: Seq[Attribute] = child.output diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index dacf8fecbeeb6..56fff1107ae39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -88,23 +88,4 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-30780 empty LocalTableScan should use RDD without partitions") { assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0) } - - test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { path => - spark.range(1).write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - val columnarToRowExec = - df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get - try { - spark.range(1).foreach { _ => - columnarToRowExec.canonicalized - () - } - } catch { - case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) - } - } - } - } }