Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disable checking for uint_8 and uint_16 if complex type readers are enabled #1376

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.scan.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all datatypes. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We link to the Compatibility Guide here but there is no new information in that guide about handling for byte/short, so would be good to add that. This could be done in a follow on PR.

.booleanConf
.createWithDefault(false)

val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.expression.allowIncompatible")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Comet provides the following configuration settings.
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,11 @@ object CometSparkSessionExtensions extends Logging {
org.apache.spark.SPARK_VERSION >= "4.0"
}

def isComplexTypeReaderEnabled(conf: SQLConf): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the naming confusing here. This method determines if we are using native_datafusion or native_iceberg_compat (which both use DataFusion's ParquetExec). This is no logic related to complex types.

Complex type support was a big motivation for adding these new scans, but it doesn't seem to make sense to refer to complex types in the changes in this PR.

This is just a nit, and we can rename the methods in a future PR.

CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION
}

/** Calculates required memory overhead in MB per executor process for Comet. */
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
// `spark.executor.memory` default value is 1g
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

trait DataTypeSupport {
Expand All @@ -35,12 +36,15 @@ trait DataTypeSupport {
def isAdditionallySupported(dt: DataType): Boolean = false

private def isGloballySupported(dt: DataType): Boolean = dt match {
case ByteType | ShortType
if CometSparkSessionExtensions.isComplexTypeReaderEnabled(SQLConf.get) &&
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
false
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
case t: DataType if t.typeName == "timestamp_ntz" =>
true
true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled, 10000)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
checkSparkAnswerAndOperator(
sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null"))
checkSparkAnswerAndOperator(
sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null"))
checkSparkAnswerAndOperator(sql(
"SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1"))
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should default COMET_SCAN_ALLOW_INCOMPATIBLE=true in CometTestBase and then just disable it in specific tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay with that. Most Spark users will not have unsigned ints, and having it false creates a penalty for users who do not have any unsigned ints unless they explicitly set the allow incompatible flag.
Changing this and reverting the unit tests which had to explicitly set the flag.

checkSparkAnswerAndOperator(
sql("SELECT array_remove(array(_2, _3,_4), _2) from t1 where _2 is null"))
checkSparkAnswerAndOperator(
sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null"))
checkSparkAnswerAndOperator(sql(
"SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1"))
}
}
}
}
Expand Down Expand Up @@ -74,7 +76,9 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
sql(s"SELECT array($fieldName, $fieldName) as a, $fieldName as b FROM t1")
.createOrReplaceTempView("t2")
val df = sql("SELECT array_remove(a, b) FROM t2")
checkSparkAnswerAndOperator(df)
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
checkSparkAnswerAndOperator(df)
}
}
}
}
Expand Down Expand Up @@ -121,22 +125,26 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = true, 100)
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
sql("SELECT array(struct(_1, _2)) as a, struct(_1, _2) as b FROM t1")
.createOrReplaceTempView("t2")
val expectedFallbackReasons = HashSet(
"data type not supported: ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)")
// note that checkExtended is disabled here due to an unrelated issue
// https://github.com/apache/datafusion-comet/issues/1313
checkSparkAnswerAndCompareExplainPlan(
sql("SELECT array_remove(a, b) FROM t2"),
expectedFallbackReasons,
checkExplainString = false)
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
sql("SELECT array(struct(_1, _2)) as a, struct(_1, _2) as b FROM t1")
.createOrReplaceTempView("t2")
val expectedFallbackReasons = HashSet(
"data type not supported: ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)")
// note that checkExtended is disabled here due to an unrelated issue
// https://github.com/apache/datafusion-comet/issues/1313
checkSparkAnswerAndCompareExplainPlan(
sql("SELECT array_remove(a, b) FROM t2"),
expectedFallbackReasons,
checkExplainString = false)
}
}
}

test("array_append") {
assume(isSpark34Plus)
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down Expand Up @@ -220,7 +228,9 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_contains") {
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000)
Expand All @@ -234,7 +244,9 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_intersect") {
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand All @@ -252,7 +264,9 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("array_join") {
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand All @@ -273,7 +287,9 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
}

test("arrays_overlap") {
withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
withSQLConf(
CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true",
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand Down
Loading
Loading